Skip to content

Commit

Permalink
feat: automatic fallback to Partitioned DML (#2593)
Browse files Browse the repository at this point in the history
Adds a third option for the `spanner.autocommit_dml_mode`. This third option
first tries to execute a DML statement using a standard, atomic transaction.
If that fails because the transaction mutation limit was exceeded, the
statement is retried using a Partitioned DML transaction.

Usage:

```
set spanner.autocommit_dml_mode='TRANSACTIONAL_WITH_FALLBACK_TO_PARTITIONED_NON_ATOMIC';

-- The following statement is executed as an atomic DML statement first.
-- If that fails due to exceeding the Spanner mutation limit, then this statement
-- will automatically be retried using Partitioned DML.
update large_table set active=true where active is null;
```
  • Loading branch information
olavloite authored Dec 6, 2024
1 parent 03757d6 commit 3d0c471
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import com.google.protobuf.ListValue;
import com.google.protobuf.NullValue;
import com.google.protobuf.Value;
import com.google.rpc.Help;
import com.google.rpc.Help.Link;
import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata;
import com.google.spanner.v1.Partition;
import com.google.spanner.v1.PartitionQueryRequest;
Expand All @@ -61,6 +63,7 @@
import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.Metadata;
import io.grpc.Metadata.Key;
import io.grpc.Server;
import io.grpc.ServerCall;
import io.grpc.ServerCall.Listener;
Expand Down Expand Up @@ -1375,6 +1378,25 @@ protected void closeSpannerPool(boolean ignoreException) {
}
}

static StatusRuntimeException createTransactionMutationLimitExceededException() {
Metadata.Key<byte[]> key = Key.of("grpc-status-details-bin", Metadata.BINARY_BYTE_MARSHALLER);
Help help =
Help.newBuilder()
.addLinks(
Link.newBuilder()
.setDescription("Cloud Spanner limits documentation.")
.setUrl("https://cloud.google.com/spanner/docs/limits")
.build())
.build();
com.google.rpc.Status status =
com.google.rpc.Status.newBuilder().addDetails(Any.pack(help)).build();
Metadata trailers = new Metadata();
trailers.put(key, status.toByteArray());
return io.grpc.Status.INVALID_ARGUMENT
.withDescription("The transaction contains too many mutations.")
.asRuntimeException(trailers);
}

@Before
public void clearRequests() {
mockSpanner.clearRequests();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5599,6 +5599,33 @@ public void testGSSAPI() throws SQLException {
exception.getMessage().contains("The server does not support GSS Encryption"));
}

@Test
public void testRetryDmlAsPdml() throws SQLException {
String sql = "update foo set bar=0 where bar is null";
mockSpanner.setExecuteSqlExecutionTime(
SimulatedExecutionTime.ofException(createTransactionMutationLimitExceededException()));
mockSpanner.putStatementResult(StatementResult.update(Statement.of(sql), 100_000L));

try (Connection connection = DriverManager.getConnection(createUrl());
java.sql.Statement statement = connection.createStatement()) {
assertTrue(connection.getAutoCommit());
statement.execute(
"set spanner.autocommit_dml_mode='TRANSACTIONAL_WITH_FALLBACK_TO_PARTITIONED_NON_ATOMIC'");
assertEquals(100_000, statement.executeUpdate(sql));
}
assertEquals(1, mockSpanner.countRequestsOfType(BeginTransactionRequest.class));
BeginTransactionRequest beginTransactionRequest =
mockSpanner.getRequestsOfType(BeginTransactionRequest.class).get(0);
assertTrue(beginTransactionRequest.getOptions().hasPartitionedDml());
assertEquals(2, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class));
ExecuteSqlRequest atomicRequest = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class).get(0);
assertTrue(atomicRequest.getTransaction().hasBegin());
assertTrue(atomicRequest.getTransaction().getBegin().hasReadWrite());
ExecuteSqlRequest pdmlRequest = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class).get(1);
assertTrue(pdmlRequest.getTransaction().hasId());
assertEquals(0, mockSpanner.countRequestsOfType(CommitRequest.class));
}

@Test
public void testShutdown_failsByDefault() throws SQLException {
try (Connection connection = DriverManager.getConnection(createUrl());
Expand Down

0 comments on commit 3d0c471

Please sign in to comment.