Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Sync skip deletes <do not merge> #2131

Open
wants to merge 13 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,13 @@ class AtCommitLog extends BaseAtCommitLog {
/// Returns the Iterator of [_commitLogCacheMap] from the commitId specified.
@server
Iterator<MapEntry<String, CommitEntry>> getEntries(int commitId,
{String? regex, int limit = 25}) {
{String? regex, int limit = 25, bool skipDeletes = false}) {
Copy link
Member

@VJag VJag Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can make this a strategy.. where FetchAllKeysStrategy is to get all and SkipDeletesStrategy is to skip deletes, instead of if-else code might look more readable.

// If regex is null or isEmpty set regex to match all keys
if (regex == null || regex.isEmpty) {
regex = '.*';
}
return _commitLogKeyStore.getEntries(commitId, regex: regex, limit: limit);
return _commitLogKeyStore.getEntries(commitId,
regex: regex, limit: limit, skipDeletes: skipDeletes);
}

Future<void> _publishChangeEvent(CommitEntry commitEntry) async {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import 'package:at_commons/at_commons.dart';
import 'package:at_persistence_secondary_server/at_persistence_secondary_server.dart';
import 'package:at_persistence_secondary_server/src/keystore/hive_base.dart';
import 'package:at_persistence_secondary_server/src/log/commitlog/sync/fetch_all_keys_strategy.dart';
import 'package:at_persistence_secondary_server/src/log/commitlog/sync/skip_deletes_strategy.dart';
import 'package:at_persistence_secondary_server/src/log/commitlog/sync/sync_keys_fetch_strategy.dart';
import 'package:at_utils/at_utils.dart';
import 'package:hive/hive.dart';
import 'package:meta/meta.dart';
Expand All @@ -10,10 +13,13 @@ class CommitLogKeyStore extends BaseCommitLogKeyStore {
final _logger = AtSignLogger('CommitLogKeyStore');
late CommitLogCache commitLogCache;

late SyncKeysFetchStrategy _syncKeysFetchStrategy;

int get latestCommitId => commitLogCache.latestCommitId;

CommitLogKeyStore(String currentAtSign) : super(currentAtSign) {
commitLogCache = CommitLogCache(this);
_syncKeysFetchStrategy = FetchAllKeysStrategy();
}

@override
Expand Down Expand Up @@ -184,51 +190,8 @@ class CommitLogKeyStore extends BaseCommitLogKeyStore {
/// if regex is passed, key has to match the regex or it has to be a special key.
bool _shouldIncludeKeyInSyncResponse(String atKey, String regex,
{List<String>? enrolledNamespace}) {
return _isNamespaceAuthorised(atKey, enrolledNamespace) &&
(_keyMatchesRegex(atKey, regex) || _alwaysIncludeInSync(atKey));
}

bool _isNamespaceAuthorised(
String atKeyAsString, List<String>? enrolledNamespace) {
// This is work-around for : https://github.com/atsign-foundation/at_server/issues/1570
if (atKeyAsString.toLowerCase() == 'configkey') {
return true;
}
late AtKey atKey;
try {
atKey = AtKey.fromString(atKeyAsString);
} on InvalidSyntaxException catch (_) {
_logger.warning(
'_isNamespaceAuthorized found an invalid key "$atKeyAsString" in the commit log. Returning false');
return false;
}
String? keyNamespace = atKey.namespace;
// If enrolledNamespace is null or keyNamespace is null, fallback to
// existing behaviour - the key is authorized for the client to receive. So return true.
if (enrolledNamespace == null ||
enrolledNamespace.isEmpty ||
(keyNamespace == null || keyNamespace.isEmpty)) {
return true;
}
if (enrolledNamespace.contains('*') ||
enrolledNamespace.contains(keyNamespace)) {
return true;
}
return false;
}

bool _keyMatchesRegex(String atKey, String regex) {
return RegExp(regex).hasMatch(atKey);
}

/// match keys which have to included in sync irrespective of whether regex matches
/// e.g @bob:shared_key@alice, shared_key.bob@alice, public:publickey@alice,
/// public:phone@alice (public key without namespace)
bool _alwaysIncludeInSync(String atKey) {
return (atKey.contains(AtConstants.atEncryptionSharedKey) &&
RegexUtil.keyType(atKey, false) == KeyType.reservedKey) ||
atKey.startsWith(AtConstants.atEncryptionPublicKey) ||
(atKey.startsWith('public:') && !atKey.contains('.'));
return _syncKeysFetchStrategy.shouldIncludeKeyInSyncResponse(atKey, regex,
enrolledNamespace: enrolledNamespace);
}

/// Returns the latest commitEntry of the key.
Expand All @@ -238,13 +201,15 @@ class CommitLogKeyStore extends BaseCommitLogKeyStore {

/// Returns the Iterator of entries as Key value pairs after the given the [commitId] for the keys that matches the [regex]
Iterator<MapEntry<String, CommitEntry>> getEntries(int commitId,
{String regex = '.*', int limit = 25}) {
{String regex = '.*', int limit = 25, bool skipDeletes = false}) {
SyncKeysFetchStrategy syncKeysFetchStrategy =
skipDeletes ? SkipDeleteStrategy() : _syncKeysFetchStrategy;
Iterable<MapEntry<String, CommitEntry>> commitEntriesIterable =
commitLogCache
.entriesList()
.where((element) =>
element.value.commitId! >= commitId &&
_shouldIncludeKeyInSyncResponse(element.value.atKey!, regex))
syncKeysFetchStrategy.shouldIncludeEntryInSyncResponse(
element.value, commitId, regex))
.take(limit);
return commitEntriesIterable.iterator;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import 'package:at_persistence_secondary_server/src/log/commitlog/commit_entry.dart';
import 'package:at_persistence_secondary_server/src/log/commitlog/sync/sync_keys_fetch_strategy.dart';

class FetchAllKeysStrategy extends SyncKeysFetchStrategy {
@override
bool shouldIncludeEntryInSyncResponse(
CommitEntry commitEntry, int commitId, String regex,
{List<String>? enrolledNamespace}) {
return commitEntry.commitId! >= commitId &&
super.shouldIncludeKeyInSyncResponse(commitEntry.atKey!, regex,
enrolledNamespace: enrolledNamespace);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import 'package:at_persistence_secondary_server/src/log/commitlog/commit_entry.dart';
import 'package:at_persistence_secondary_server/src/log/commitlog/sync/sync_keys_fetch_strategy.dart';

class SkipDeleteStrategy extends SyncKeysFetchStrategy {
@override
bool shouldIncludeEntryInSyncResponse(
CommitEntry commitEntry, int commitId, String regex,
{List<String>? enrolledNamespace}) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dont we need call to super.shouldIncludeEntryInSyncResponse()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i kept the method in base class abstract

return commitEntry.commitId! >= commitId &&
super.shouldIncludeKeyInSyncResponse(commitEntry.atKey!, regex,
enrolledNamespace: enrolledNamespace) &&
commitEntry.operation != CommitOp.DELETE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import 'package:at_commons/at_commons.dart';
import 'package:at_persistence_secondary_server/at_persistence_secondary_server.dart';
import 'package:at_utils/at_utils.dart';

abstract class SyncKeysFetchStrategy {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call it NamespaceHonoringKeyFetchStrategy?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honoring namespace is one of the checks done. regex match and skipping delete commits are the other two checks. In the future if we have any other checks not related to namespace we may have to rename this class

final _logger = AtSignLogger('SyncKeysFetchStrategy');
bool shouldIncludeEntryInSyncResponse(
CommitEntry commitEntry, int commitId, String regex,
{List<String>? enrolledNamespace});

bool shouldIncludeKeyInSyncResponse(String atKey, String regex,
{List<String>? enrolledNamespace}) {
return isNamespaceAuthorised(atKey, enrolledNamespace) &&
(keyMatchesRegex(atKey, regex) || alwaysIncludeInSync(atKey));
}

bool isNamespaceAuthorised(
String atKeyAsString, List<String>? enrolledNamespace) {
// This is work-around for : https://github.com/atsign-foundation/at_server/issues/1570
if (atKeyAsString.toLowerCase() == 'configkey') {
return true;
}
late AtKey atKey;
try {
atKey = AtKey.fromString(atKeyAsString);
} on InvalidSyntaxException catch (_) {
_logger.warning(
'_isNamespaceAuthorized found an invalid key "$atKeyAsString" in the commit log. Returning false');
return false;
}
String? keyNamespace = atKey.namespace;
// If enrolledNamespace is null or keyNamespace is null, fallback to
// existing behaviour - the key is authorized for the client to receive. So return true.
if (enrolledNamespace == null ||
enrolledNamespace.isEmpty ||
(keyNamespace == null || keyNamespace.isEmpty)) {
return true;
}
if (enrolledNamespace.contains('*') ||
enrolledNamespace.contains(keyNamespace)) {
return true;
}
return false;
}

bool keyMatchesRegex(String atKey, String regex) {
return RegExp(regex).hasMatch(atKey);
}

/// match keys which have to included in sync irrespective of whether regex matches
/// e.g @bob:shared_key@alice, shared_key.bob@alice, public:publickey@alice,
/// public:phone@alice (public key without namespace)
bool alwaysIncludeInSync(String atKey) {
return (atKey.contains(AtConstants.atEncryptionSharedKey) &&
RegexUtil.keyType(atKey, false) == KeyType.reservedKey) ||
atKey.startsWith(AtConstants.atEncryptionPublicKey) ||
(atKey.startsWith('public:') && !atKey.contains('.'));
}
}
27 changes: 27 additions & 0 deletions packages/at_persistence_secondary_server/test/commit_log_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,33 @@ void main() async {
expect(commitEntriesMap.containsKey('public:phone.wavi@alice'), false);
expect(commitEntriesMap.containsKey('public:location@alice'), true);
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add another test which both uses a regex and sets skipDeletes: true

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

test(
'A test to verify delete commit entries are NOT returned when skipDeletes is true',
() async {
var commitLogInstance =
await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice'));
var commitLogKeystore = commitLogInstance!.commitLogKeyStore;
await commitLogKeystore.add(CommitEntry(
'test_key_true_1@alice', CommitOp.UPDATE, DateTime.now()));
await commitLogKeystore.add(CommitEntry(
'test_key_true_2@alice', CommitOp.DELETE, DateTime.now()));
await commitLogKeystore.add(CommitEntry(
'test_key_true_3@alice', CommitOp.DELETE, DateTime.now()));
await commitLogKeystore.add(CommitEntry(
'test_key_true_4@alice', CommitOp.UPDATE, DateTime.now()));
Iterator<MapEntry<String, CommitEntry>>? changes = commitLogInstance
.commitLogKeyStore
.getEntries(-1, skipDeletes: true);
Map<String?, CommitEntry> commitEntriesMap = {};
while (changes.moveNext()) {
var commitEntry = changes.current.value;
commitEntriesMap[commitEntry.atKey] = commitEntry;
}
expect(commitEntriesMap.containsKey('test_key_true_1@alice'), true);
expect(commitEntriesMap.containsKey('test_key_true_2@alice'), false);
expect(commitEntriesMap.containsKey('test_key_true_3@alice'), false);
expect(commitEntriesMap.containsKey('test_key_true_4@alice'), true);
});
});
tearDown(() async => await tearDownFunc());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class SyncProgressiveVerbHandler extends AbstractVerbHandler {
// Get entries to sync
var commitEntryIterator = atCommitLog!.getEntries(
int.parse(verbParams[AtConstants.fromCommitSequence]!) + 1,
regex: verbParams['regex']);
regex: verbParams['regex'],
skipDeletes: verbParams['skipDeletes'] == 'true');
Copy link
Member

@VJag VJag Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VerbSyntax also needs this parameter, otherwise regex validation would fail.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


List<KeyStoreEntry> syncResponse = [];
await prepareResponse(capacity, syncResponse, commitEntryIterator,
Expand Down
9 changes: 9 additions & 0 deletions packages/at_secondary_server/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ dependencies:
yaml: 3.1.2
logging: 1.2.0

dependency_overrides:
at_commons:
git:
url: https://github.com/atsign-foundation/at_libraries.git
path: packages/at_commons
ref: sync_skip_delete
at_persistence_secondary_server:
path: ../at_persistence_secondary_server

dev_dependencies:
build_runner: ^2.3.3
test: ^1.24.4
Expand Down
33 changes: 33 additions & 0 deletions packages/at_secondary_server/test/sync_unit_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,39 @@ void main() {
expect(syncResponse[3]['operation'], '*');
});

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add another test which both uses a regex and sets skipDeletes: true

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

test(
'test to verify delete commit entries are not sent when skipDeletes is true',
() async {
await secondaryPersistenceStore!
.getSecondaryKeyStore()
?.put('test_key_1@alice', AtData()..data = 'alice');
await secondaryPersistenceStore!
.getSecondaryKeyStore()
?.remove('test_key_1@alice');
await secondaryPersistenceStore!
.getSecondaryKeyStore()
?.put('test_key_2@alice', AtData()..data = 'alice');
await secondaryPersistenceStore!
.getSecondaryKeyStore()
?.remove('test_key_2@alice');
var syncProgressiveVerbHandler = SyncProgressiveVerbHandler(
secondaryPersistenceStore!.getSecondaryKeyStore()!);
var response = Response();
var inBoundSessionId = '_6665436c-29ff-481b-8dc6-129e89199718';
var atConnection = InboundConnectionImpl(mockSocket, inBoundSessionId);
atConnection.metaData.isAuthenticated = true;
var syncVerbParams = HashMap<String, String>();
syncVerbParams.putIfAbsent(AtConstants.fromCommitSequence, () => '-1');
syncVerbParams.putIfAbsent('skipDeletes', () => 'true');
await syncProgressiveVerbHandler.processVerb(
response, syncVerbParams, atConnection);
List syncResponse = jsonDecode(response.data!);
for (var entry in syncResponse) {
expect(entry['atKey'] != 'test_key_1@alice', true);
expect(entry['atKey'] != 'test_key_2@alice', true);
}
});

test(
'test to verify only entries matching the regex are added to sync response',
() async {
Expand Down