Skip to content

Commit

Permalink
improve web locks and transactions according to test spec
Browse files Browse the repository at this point in the history
  • Loading branch information
stevensJourney committed Feb 1, 2024
1 parent 6ffb129 commit 02c2d9f
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 117 deletions.
50 changes: 43 additions & 7 deletions lib/src/web/database/web_db_context.dart
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
import 'dart:async';

import 'package:meta/meta.dart';
import 'package:sqlite_async/sqlite3_common.dart';
import 'package:sqlite_async/src/sqlite_connection.dart';
import 'executor/sqlite_executor.dart';

/// Custom function which exposes CommonDatabase.autocommit
const sqliteAsyncAutoCommitCommand = 'sqlite_async_autocommit';

class WebReadContext implements SqliteReadContext {
SQLExecutor db;
bool _closed = false;

@protected
bool isTransaction;

WebReadContext(this.db);
WebReadContext(this.db, {this.isTransaction = false});

@override
Future<T> computeWithDatabase<T>(
Expand All @@ -17,40 +25,68 @@ class WebReadContext implements SqliteReadContext {

@override
Future<Row> get(String sql, [List<Object?> parameters = const []]) async {
return (await db.select(sql, parameters)).first;
return (await getAll(sql, parameters)).first;
}

@override
Future<ResultSet> getAll(String sql,
[List<Object?> parameters = const []]) async {
if (_closed) {
throw SqliteException(0, 'Transaction closed', null, sql);
}
return db.select(sql, parameters);
}

@override
Future<Row?> getOptional(String sql,
[List<Object?> parameters = const []]) async {
try {
return (await db.select(sql, parameters)).first;
return (await getAll(sql, parameters)).first;
} catch (ex) {
return null;
}
}

@override
bool get closed => throw UnimplementedError();
bool get closed => _closed;

close() {
_closed = true;
}

@override
Future<bool> getAutoCommit() {
throw UnimplementedError();
Future<bool> getAutoCommit() async {
final response = await db.select('select $sqliteAsyncAutoCommitCommand()');
if (response.isEmpty) {
return false;
}

return response.first.values.first != 0;
}
}

class WebWriteContext extends WebReadContext implements SqliteWriteContext {
WebWriteContext(super.db);
WebWriteContext(super.db, {super.isTransaction});

@override
Future<ResultSet> execute(String sql,
[List<Object?> parameters = const []]) async {
return getAll(sql, parameters);
}

@override
Future<ResultSet> getAll(String sql,
[List<Object?> parameters = const []]) async {
if (_closed) {
throw SqliteException(0, 'Transaction closed', null, sql);
}
final isAutoCommit = await getAutoCommit();

/// Statements in read/writeTransactions should not execute after ROLLBACK
if (isTransaction && !sql.toLowerCase().contains('begin') && isAutoCommit) {
throw SqliteException(0,
'Transaction rolled back by earlier statement. Cannot execute: $sql');
}
return db.select(sql, parameters);
}

Expand Down
64 changes: 56 additions & 8 deletions lib/src/web/database/web_sqlite_connection_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import 'package:sqlite_async/src/common/abstract_open_factory.dart';
import 'package:sqlite_async/src/sqlite_connection.dart';
import 'package:sqlite_async/src/sqlite_queries.dart';
import 'package:sqlite_async/src/update_notification.dart';
import 'package:sqlite_async/src/utils/shared_utils.dart';
import 'package:sqlite_async/src/web/web_mutex.dart';
import 'package:sqlite_async/src/web/web_sqlite_open_factory.dart';

Expand Down Expand Up @@ -50,24 +51,70 @@ class WebSqliteConnectionImpl with SqliteQueries implements SqliteConnection {

@override
Future<T> readLock<T>(Future<T> Function(SqliteReadContext tx) callback,
{Duration? lockTimeout, String? debugContext}) async {
{Duration? lockTimeout,
String? debugContext,
bool isTransaction = false}) async {
await isInitialized;
return _runZoned(
() => mutex.lock(() => callback(WebReadContext(executor!)),
timeout: lockTimeout),
() => mutex.lock(() async {
final context =
WebReadContext(executor!, isTransaction: isTransaction);
try {
final result = await callback(context);
return result;
} finally {
context.close();
}
}, timeout: lockTimeout),
debugContext: debugContext ?? 'execute()');
}

@override
Future<T> writeLock<T>(Future<T> Function(SqliteWriteContext tx) callback,
{Duration? lockTimeout, String? debugContext}) async {
{Duration? lockTimeout,
String? debugContext,
bool isTransaction = false}) async {
await isInitialized;
return _runZoned(
() => mutex.lock(() => callback(WebWriteContext(executor!)),
timeout: lockTimeout),
() => mutex.lock(() async {
final context =
WebWriteContext(executor!, isTransaction: isTransaction);
try {
final result = await callback(context);
return result;
} finally {
context.close();
}
}, timeout: lockTimeout),
debugContext: debugContext ?? 'execute()');
}

@override
Future<T> readTransaction<T>(
Future<T> Function(SqliteReadContext tx) callback,
{Duration? lockTimeout}) async {
return readLock((ctx) async {
return await internalReadTransaction(ctx, callback);
},
lockTimeout: lockTimeout,
debugContext: 'readTransaction()',
isTransaction: true);
}

@override
Future<T> writeTransaction<T>(
Future<T> Function(SqliteWriteContext tx) callback,
{Duration? lockTimeout}) async {
return writeLock((
ctx,
) async {
return await internalWriteTransaction(ctx, callback);
},
lockTimeout: lockTimeout,
debugContext: 'writeTransaction()',
isTransaction: true);
}

/// The [Mutex] on individual connections do already error in recursive locks.
///
/// We duplicate the same check here, to:
Expand All @@ -90,7 +137,8 @@ class WebSqliteConnectionImpl with SqliteQueries implements SqliteConnection {
}

@override
Future<bool> getAutoCommit() {
throw UnimplementedError();
Future<bool> getAutoCommit() async {
await isInitialized;
return WebWriteContext(executor!).getAutoCommit();
}
}
16 changes: 16 additions & 0 deletions lib/src/web/database/web_sqlite_database.dart
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,22 @@ class SqliteDatabase extends AbstractSqliteDatabase {
lockTimeout: lockTimeout, debugContext: debugContext);
}

@override
Future<T> readTransaction<T>(
Future<T> Function(SqliteReadContext tx) callback,
{Duration? lockTimeout,
String? debugContext}) async {
return _connection.readTransaction(callback, lockTimeout: lockTimeout);
}

@override
Future<T> writeTransaction<T>(
Future<T> Function(SqliteWriteContext tx) callback,
{Duration? lockTimeout,
String? debugContext}) async {
return _connection.writeTransaction(callback, lockTimeout: lockTimeout);
}

@override
Future<void> close() async {
return _connection.close();
Expand Down
4 changes: 3 additions & 1 deletion lib/src/web/worker/drift_worker.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import 'package:sqlite_async/sqlite3_common.dart';

/// Use this function to register any custom DB functionality
/// which requires direct access to the connection
void setupDatabase(CommonDatabase database) {}
void setupDatabase(CommonDatabase database) {
setupCommonWorkerDB(database);
}

void main() {
WasmDatabase.workerMainForOpen(
Expand Down
11 changes: 10 additions & 1 deletion lib/src/web/worker/worker_utils.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
import 'package:sqlite_async/sqlite3_common.dart';
import 'package:sqlite_async/src/web/database/web_db_context.dart';

void setupCommonWorkerDB(CommonDatabase database) {}
void setupCommonWorkerDB(CommonDatabase database) {
/// Exposes autocommit via a query function
database.createFunction(
functionName: sqliteAsyncAutoCommitCommand,
argumentCount: const AllowedArgumentCount(0),
function: (args) {
return database.autocommit;
});
}
99 changes: 0 additions & 99 deletions test/basic_native_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,6 @@ void main() {
// });
});

test('should allow PRAMGAs', () async {
final db = await testUtils.setupDatabase(path: path);
await createTables(db);
// Not allowed in transactions, but does work as a direct statement.
await db.execute('PRAGMA wal_checkpoint(TRUNCATE)');
await db.execute('VACUUM');
});

test('should allow ignoring errors', () async {
final db = await testUtils.setupDatabase(path: path);
await createTables(db);
Expand All @@ -221,50 +213,6 @@ void main() {
'INSERT INTO test_data(description) VALUES(json(?))', ['test3']));
});

test('should properly report errors in transactions', () async {
final db = await testUtils.setupDatabase(path: path);
await createTables(db);

var tp = db.writeTransaction((tx) async {
await tx.execute(
'INSERT OR ROLLBACK INTO test_data(id, description) VALUES(?, ?)',
[1, 'test1']);
await tx.execute(
'INSERT OR ROLLBACK INTO test_data(id, description) VALUES(?, ?)',
[2, 'test2']);
expect(await tx.getAutoCommit(), equals(false));
try {
await tx.execute(
'INSERT OR ROLLBACK INTO test_data(id, description) VALUES(?, ?)',
[2, 'test3']);
} catch (e) {
// Ignore
}
expect(await tx.getAutoCommit(), equals(true));
expect(tx.closed, equals(false));

// Will not be executed because of the above rollback
ignore(tx.execute(
'INSERT OR ROLLBACK INTO test_data(id, description) VALUES(?, ?)',
[4, 'test4']));
});

// The error propagates up to the transaction
await expectLater(
tp,
throwsA((e) =>
e is sqlite.SqliteException &&
e.message
.contains('Transaction rolled back by earlier statement') &&
e.message.contains('UNIQUE constraint failed')));

expect(await db.get('SELECT count() count FROM test_data'),
equals({'count': 0}));

// Check that we can open another transaction afterwards
await db.writeTransaction((tx) async {});
});

test('should error on dangling transactions', () async {
final db = await testUtils.setupDatabase(path: path);
await createTables(db);
Expand All @@ -273,25 +221,6 @@ void main() {
}, throwsA((e) => e is sqlite.SqliteException));
});

test('should handle normal errors', () async {
final db = await testUtils.setupDatabase(path: path);
await createTables(db);
Error? caughtError;
final syntheticError = ArgumentError('foobar');
await db.computeWithDatabase<void>((db) async {
throw syntheticError;
}).catchError((error) {
caughtError = error;
});
expect(caughtError.toString(), equals(syntheticError.toString()));

// Check that we can still continue afterwards
final computed = await db.computeWithDatabase((db) async {
return 5;
});
expect(computed, equals(5));
});

test('should handle uncaught errors', () async {
final db = await testUtils.setupDatabase(path: path);
await createTables(db);
Expand Down Expand Up @@ -344,34 +273,6 @@ void main() {
});
expect(computed, equals(5));
});

test('should allow resuming transaction after errors', () async {
final db = await testUtils.setupDatabase(path: path);
await createTables(db);
SqliteWriteContext? savedTx;
await db.writeTransaction((tx) async {
savedTx = tx;
var caught = false;
try {
// This error does not rollback the transaction
await tx.execute('NOT A VALID STATEMENT');
} catch (e) {
// Ignore
caught = true;
}
expect(caught, equals(true));

expect(await tx.getAutoCommit(), equals(false));
expect(tx.closed, equals(false));

final rs = await tx.execute(
'INSERT INTO test_data(description) VALUES(?) RETURNING description',
['Test Data']);
expect(rs.rows[0], equals(['Test Data']));
});
expect(await savedTx!.getAutoCommit(), equals(true));
expect(savedTx!.closed, equals(true));
});
});
}

Expand Down
Loading

0 comments on commit 02c2d9f

Please sign in to comment.