Skip to content

Commit

Permalink
fix: connection should be idle after failed commit (#1837)
Browse files Browse the repository at this point in the history
A connection would be put in the ABORTED state if a COMMIT
statement failed with an error. This is not consistent with
real PostgreSQL, which puts the connection in the IDLE state
after a COMMIT fails. The same applies to ROLLBACK.

Fixes #1836
  • Loading branch information
olavloite authored May 22, 2024
1 parent fec2e7e commit 65e4dd4
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1168,15 +1168,18 @@ private void flush(boolean isSync) {
sessionState.rollback();
}
closeAllPortals.run();
transactionMode = TransactionMode.IMPLICIT;
connectionState = ConnectionState.IDLE;
currentTransactionId = null;
clearCurrentTransaction();
}
index++;
}
}
} catch (Exception exception) {
connectionState = ConnectionState.ABORTED;
// The connection should not transition to the ABORTED state if a COMMIT or ROLLBACK fails.
if (isCommit(index) || isRollback(index)) {
clearCurrentTransaction();
} else {
connectionState = ConnectionState.ABORTED;
}
closeAllPortals.run();
sessionState.rollback();
if (spannerConnection.isInTransaction()) {
Expand All @@ -1193,6 +1196,12 @@ private void flush(boolean isSync) {
}
}

private void clearCurrentTransaction() {
transactionMode = TransactionMode.IMPLICIT;
connectionState = ConnectionState.IDLE;
currentTransactionId = null;
}

/** Starts an implicit transaction if that is necessary. */
private void maybeBeginImplicitTransaction(int index, boolean isSync) {
if (connectionState != ConnectionState.IDLE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.connection.RandomResultSetGenerator;
import com.google.cloud.spanner.pgadapter.error.SQLState;
import com.google.common.collect.ImmutableList;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ResultSetStats;
Expand Down Expand Up @@ -68,6 +69,7 @@
import org.postgresql.PGStatement;
import org.postgresql.core.Oid;
import org.postgresql.jdbc.PgStatement;
import org.postgresql.util.PSQLException;

@RunWith(JUnit4.class)
public class AbortedMockServerTest extends AbstractMockServerTest {
Expand Down Expand Up @@ -937,6 +939,32 @@ public void testRandomResults() throws SQLException {
}
}

@Test
public void testCommitFailsDueToConcurrentModification() throws SQLException {
String sql = "insert into foo (id, value) values (1, 'One') on conflict do nothing";
mockSpanner.putStatementResult(StatementResult.update(Statement.of(sql), 1L));

try (Connection connection = createConnection()) {
connection.setAutoCommit(false);
assertEquals(1, connection.createStatement().executeUpdate(sql));

// Change the update count that is returned and abort the transaction. This will cause the
// commit() to fail due to a concurrent modification.
mockSpanner.putStatementResult(StatementResult.update(Statement.of(sql), 0));
mockSpanner.abortNextStatement();

PSQLException psqlException = assertThrows(PSQLException.class, connection::commit);
assertNotNull(psqlException.getServerErrorMessage());
assertEquals(
SQLState.SerializationFailure.toString(),
psqlException.getServerErrorMessage().getSQLState());

// Verify that the connection is usable for a new transaction after a failed commit.
assertEquals(0, connection.createStatement().executeUpdate(sql));
connection.commit();
}
}

private void assertEqual(com.google.cloud.spanner.ResultSet spannerResult, ResultSet pgResult)
throws SQLException {
assertEquals(spannerResult.getColumnCount(), pgResult.getMetaData().getColumnCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.google.cloud.spanner.connection.AbstractStatementParser.StatementType;
import com.google.cloud.spanner.connection.Connection;
import com.google.cloud.spanner.connection.StatementResult;
import com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType;
import com.google.cloud.spanner.connection.StatementResult.ResultType;
import com.google.cloud.spanner.connection.TransactionMode;
import com.google.cloud.spanner.pgadapter.AbstractMockServerTest;
Expand All @@ -53,6 +54,7 @@
import com.google.cloud.spanner.pgadapter.error.SQLState;
import com.google.cloud.spanner.pgadapter.metadata.ConnectionMetadata;
import com.google.cloud.spanner.pgadapter.metadata.OptionsMetadata;
import com.google.cloud.spanner.pgadapter.statements.BackendConnection.ConnectionState;
import com.google.cloud.spanner.pgadapter.statements.BackendConnection.NoResult;
import com.google.cloud.spanner.pgadapter.statements.BackendConnection.QueryResult;
import com.google.cloud.spanner.pgadapter.statements.local.ListDatabasesStatement;
Expand Down Expand Up @@ -494,6 +496,133 @@ public void testGeneralException() {
assertEquals(executionException.getCause(), PGExceptionFactory.toPGException(error));
}

@Test
public void testUpdateException_leavesConnectionInAbortedState() {
Connection connection = mock(Connection.class);
when(connection.isInTransaction()).thenReturn(true);

Statement statement = Statement.of("insert into foo (id) values (1)");
ParsedStatement parsedStatement = mock(ParsedStatement.class);
when(parsedStatement.getSqlWithoutComments()).thenReturn(statement.getSql());
RuntimeException error = new RuntimeException("test error");
when(connection.execute(statement)).thenThrow(error);

BackendConnection backendConnection =
new BackendConnection(
NOOP_OTEL,
NOOP_OTEL_METER,
METRIC_ATTRIBUTES,
UUID.randomUUID().toString(),
DO_NOTHING,
DATABASE_ID,
connection,
() -> WellKnownClient.UNSPECIFIED,
mock(OptionsMetadata.class),
() -> EMPTY_LOCAL_STATEMENTS);
Future<StatementResult> resultFuture =
backendConnection.execute("INSERT", parsedStatement, statement, Function.identity());
backendConnection.flush();

ExecutionException executionException =
assertThrows(ExecutionException.class, resultFuture::get);
assertEquals(PGException.class, executionException.getCause().getClass());
PGException pgException = (PGException) executionException.getCause();
assertTrue(pgException.getMessage().contains("test error"));

// Verify that the connection is now in the Aborted state.
assertEquals(ConnectionState.ABORTED, backendConnection.getConnectionState());
}

@Test
public void testCommitException_leavesConnectionInIdleState() {
Connection connection = mock(Connection.class);
when(connection.isInTransaction()).thenReturn(true);

Statement commitStatement = Statement.of("commit");
ParsedStatement parsedCommitStatement = mock(ParsedStatement.class);
when(parsedCommitStatement.getSqlWithoutComments()).thenReturn(commitStatement.getSql());
when(parsedCommitStatement.getType()).thenReturn(StatementType.CLIENT_SIDE);
when(parsedCommitStatement.getClientSideStatementType())
.thenReturn(ClientSideStatementType.COMMIT);
StatementResult commitResult = mock(StatementResult.class);
when(commitResult.getResultType()).thenReturn(ResultType.NO_RESULT);
when(connection.execute(commitStatement))
.thenThrow(
SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION, "internal error"));

BackendConnection backendConnection =
new BackendConnection(
NOOP_OTEL,
NOOP_OTEL_METER,
METRIC_ATTRIBUTES,
UUID.randomUUID().toString(),
DO_NOTHING,
DATABASE_ID,
connection,
() -> WellKnownClient.UNSPECIFIED,
mock(OptionsMetadata.class),
() -> EMPTY_LOCAL_STATEMENTS);
Future<StatementResult> commitFuture =
backendConnection.execute(
"COMMIT", parsedCommitStatement, commitStatement, Function.identity());
backendConnection.flush();

ExecutionException executionException =
assertThrows(ExecutionException.class, commitFuture::get);
assertEquals(PGException.class, executionException.getCause().getClass());
PGException pgException = (PGException) executionException.getCause();
assertTrue(pgException.getMessage().contains("internal error"));

// Verify that the connection is now in the idle state.
assertEquals(ConnectionState.IDLE, backendConnection.getConnectionState());
}

@Test
public void testRollbackException_leavesConnectionInIdleState() {
Connection connection = mock(Connection.class);
when(connection.isInTransaction()).thenReturn(true);

Statement rollbackStatement = Statement.of("rollback");
ParsedStatement parsedRollbackStatement = mock(ParsedStatement.class);
when(parsedRollbackStatement.getSqlWithoutComments()).thenReturn(rollbackStatement.getSql());
when(parsedRollbackStatement.getType()).thenReturn(StatementType.CLIENT_SIDE);
when(parsedRollbackStatement.getClientSideStatementType())
.thenReturn(ClientSideStatementType.ROLLBACK);
StatementResult rollbackResult = mock(StatementResult.class);
when(rollbackResult.getResultType()).thenReturn(ResultType.NO_RESULT);
when(connection.execute(rollbackStatement))
.thenThrow(
SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION, "internal error"));

BackendConnection backendConnection =
new BackendConnection(
NOOP_OTEL,
NOOP_OTEL_METER,
METRIC_ATTRIBUTES,
UUID.randomUUID().toString(),
DO_NOTHING,
DATABASE_ID,
connection,
() -> WellKnownClient.UNSPECIFIED,
mock(OptionsMetadata.class),
() -> EMPTY_LOCAL_STATEMENTS);
Future<StatementResult> rollbackFuture =
backendConnection.execute(
"ROLLBACK", parsedRollbackStatement, rollbackStatement, Function.identity());
backendConnection.flush();

ExecutionException executionException =
assertThrows(ExecutionException.class, rollbackFuture::get);
assertEquals(PGException.class, executionException.getCause().getClass());
PGException pgException = (PGException) executionException.getCause();
assertTrue(pgException.getMessage().contains("internal error"));

// Verify that the connection is now in the idle state.
assertEquals(ConnectionState.IDLE, backendConnection.getConnectionState());
}

@Test
public void testCancelledException() {
Connection connection = mock(Connection.class);
Expand Down

0 comments on commit 65e4dd4

Please sign in to comment.