Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Sync skip deletes <do not merge> #2131

Open
wants to merge 13 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,13 @@ class AtCommitLog extends BaseAtCommitLog {
/// Returns the Iterator of [_commitLogCacheMap] from the commitId specified.
@server
Iterator<MapEntry<String, CommitEntry>> getEntries(int commitId,
{String? regex, int limit = 25}) {
{String? regex, int limit = 25, int? skipDeletesUntil}) {
// If regex is null or isEmpty set regex to match all keys
if (regex == null || regex.isEmpty) {
regex = '.*';
}
return _commitLogKeyStore.getEntries(commitId, regex: regex, limit: limit);
return _commitLogKeyStore.getEntries(commitId,
regex: regex, limit: limit, skipDeletesUntil: skipDeletesUntil);
}

Future<void> _publishChangeEvent(CommitEntry commitEntry) async {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import 'package:at_commons/at_commons.dart';
import 'package:at_persistence_secondary_server/at_persistence_secondary_server.dart';
import 'package:at_persistence_secondary_server/src/keystore/hive_base.dart';
import 'package:at_persistence_secondary_server/src/log/commitlog/sync/fetch_all_keys_strategy.dart';
import 'package:at_persistence_secondary_server/src/log/commitlog/sync/skip_deletes_strategy.dart';
import 'package:at_persistence_secondary_server/src/log/commitlog/sync/sync_keys_fetch_strategy.dart';
import 'package:at_utils/at_utils.dart';
import 'package:hive/hive.dart';
import 'package:meta/meta.dart';
Expand All @@ -10,10 +13,13 @@ class CommitLogKeyStore extends BaseCommitLogKeyStore {
final _logger = AtSignLogger('CommitLogKeyStore');
late CommitLogCache commitLogCache;

late SyncKeysFetchStrategy _syncKeysFetchStrategy;

int get latestCommitId => commitLogCache.latestCommitId;

CommitLogKeyStore(String currentAtSign) : super(currentAtSign) {
commitLogCache = CommitLogCache(this);
_syncKeysFetchStrategy = FetchAllKeysStrategy();
}

@override
Expand Down Expand Up @@ -184,51 +190,8 @@ class CommitLogKeyStore extends BaseCommitLogKeyStore {
/// if regex is passed, key has to match the regex or it has to be a special key.
bool _shouldIncludeKeyInSyncResponse(String atKey, String regex,
{List<String>? enrolledNamespace}) {
return _isNamespaceAuthorised(atKey, enrolledNamespace) &&
(_keyMatchesRegex(atKey, regex) || _alwaysIncludeInSync(atKey));
}

bool _isNamespaceAuthorised(
String atKeyAsString, List<String>? enrolledNamespace) {
// This is work-around for : https://github.com/atsign-foundation/at_server/issues/1570
if (atKeyAsString.toLowerCase() == 'configkey') {
return true;
}
late AtKey atKey;
try {
atKey = AtKey.fromString(atKeyAsString);
} on InvalidSyntaxException catch (_) {
_logger.warning(
'_isNamespaceAuthorized found an invalid key "$atKeyAsString" in the commit log. Returning false');
return false;
}
String? keyNamespace = atKey.namespace;
// If enrolledNamespace is null or keyNamespace is null, fallback to
// existing behaviour - the key is authorized for the client to receive. So return true.
if (enrolledNamespace == null ||
enrolledNamespace.isEmpty ||
(keyNamespace == null || keyNamespace.isEmpty)) {
return true;
}
if (enrolledNamespace.contains('*') ||
enrolledNamespace.contains(keyNamespace)) {
return true;
}
return false;
}

bool _keyMatchesRegex(String atKey, String regex) {
return RegExp(regex).hasMatch(atKey);
}

/// match keys which have to included in sync irrespective of whether regex matches
/// e.g @bob:shared_key@alice, shared_key.bob@alice, public:publickey@alice,
/// public:phone@alice (public key without namespace)
bool _alwaysIncludeInSync(String atKey) {
return (atKey.contains(AtConstants.atEncryptionSharedKey) &&
RegexUtil.keyType(atKey, false) == KeyType.reservedKey) ||
atKey.startsWith(AtConstants.atEncryptionPublicKey) ||
(atKey.startsWith('public:') && !atKey.contains('.'));
return _syncKeysFetchStrategy.shouldIncludeKeyInSyncResponse(atKey, regex,
enrolledNamespace: enrolledNamespace);
}

/// Returns the latest commitEntry of the key.
Expand All @@ -238,13 +201,16 @@ class CommitLogKeyStore extends BaseCommitLogKeyStore {

/// Returns the Iterator of entries as Key value pairs after the given the [commitId] for the keys that matches the [regex]
Iterator<MapEntry<String, CommitEntry>> getEntries(int commitId,
{String regex = '.*', int limit = 25}) {
{String regex = '.*', int limit = 25, int? skipDeletesUntil}) {
SyncKeysFetchStrategy syncKeysFetchStrategy = skipDeletesUntil != null
? SkipDeleteStrategy(skipDeletesUntil, commitLogCache.latestCommitId)
: _syncKeysFetchStrategy;
Iterable<MapEntry<String, CommitEntry>> commitEntriesIterable =
commitLogCache
.entriesList()
.where((element) =>
element.value.commitId! >= commitId &&
_shouldIncludeKeyInSyncResponse(element.value.atKey!, regex))
syncKeysFetchStrategy.shouldIncludeEntryInSyncResponse(
element.value, commitId, regex))
.take(limit);
return commitEntriesIterable.iterator;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import 'package:at_persistence_secondary_server/src/log/commitlog/commit_entry.dart';
import 'package:at_persistence_secondary_server/src/log/commitlog/sync/sync_keys_fetch_strategy.dart';

/// Returns the commit entries which have to be synced from server to client
class FetchAllKeysStrategy extends SyncKeysFetchStrategy {
@override
bool shouldIncludeEntryInSyncResponse(
CommitEntry commitEntry, int commitId, String regex,
{List<String>? enrolledNamespace}) {
return commitEntry.commitId! >= commitId &&
super.shouldIncludeKeyInSyncResponse(commitEntry.atKey!, regex,
enrolledNamespace: enrolledNamespace);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import 'package:at_persistence_secondary_server/src/log/commitlog/commit_entry.dart';
import 'package:at_persistence_secondary_server/src/log/commitlog/sync/sync_keys_fetch_strategy.dart';

/// Returns the commit entries to be returned in sync response from server to client except delete commit entries.
class SkipDeleteStrategy extends SyncKeysFetchStrategy {
late int skipDeletesUntil;
late int latestCommitId;
SkipDeleteStrategy(this.skipDeletesUntil, this.latestCommitId);
@override
bool shouldIncludeEntryInSyncResponse(
CommitEntry commitEntry, int commitId, String regex,
{List<String>? enrolledNamespace}) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dont we need call to super.shouldIncludeEntryInSyncResponse()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i kept the method in base class abstract

// do not include delete commit entries between commitId and skipDeletesUntil, except when delete is the last commit entry
if (commitEntry.operation == CommitOp.DELETE &&
commitEntry.commitId! <= skipDeletesUntil &&
commitEntry.commitId! >= commitId &&
commitEntry.commitId != latestCommitId) {
return false;
}
return commitEntry.commitId! >= commitId &&
super.shouldIncludeKeyInSyncResponse(commitEntry.atKey!, regex,
enrolledNamespace: enrolledNamespace);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import 'package:at_commons/at_commons.dart';
import 'package:at_persistence_secondary_server/at_persistence_secondary_server.dart';
import 'package:at_utils/at_utils.dart';

abstract class SyncKeysFetchStrategy {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call it NamespaceHonoringKeyFetchStrategy?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honoring namespace is one of the checks done. regex match and skipping delete commits are the other two checks. In the future if we have any other checks not related to namespace we may have to rename this class

final _logger = AtSignLogger('SyncKeysFetchStrategy');

/// Returns true if the commit entry should be included in sync response, false otherwise
bool shouldIncludeEntryInSyncResponse(
CommitEntry commitEntry, int commitId, String regex,
{List<String>? enrolledNamespace});

/// if enrolledNamespace is passed, key namespace should be in enrolledNamespace list and
/// atKey should match regex or should be a special key that is always included in sync.
bool shouldIncludeKeyInSyncResponse(String atKey, String regex,
{List<String>? enrolledNamespace}) {
return isNamespaceAuthorised(atKey, enrolledNamespace) &&
(keyMatchesRegex(atKey, regex) || alwaysIncludeInSync(atKey));
}

/// Returns true if atKey namespace is empty or null/ enrolledNamespace is empty or null
/// if enrolledNamespace contains atKey namespace, return true. false otherwise
bool isNamespaceAuthorised(
String atKeyAsString, List<String>? enrolledNamespace) {
// This is work-around for : https://github.com/atsign-foundation/at_server/issues/1570
if (atKeyAsString.toLowerCase() == 'configkey') {
return true;
}
late AtKey atKey;
try {
atKey = AtKey.fromString(atKeyAsString);
} on InvalidSyntaxException catch (_) {
_logger.warning(
'_isNamespaceAuthorized found an invalid key "$atKeyAsString" in the commit log. Returning false');
return false;
}
String? keyNamespace = atKey.namespace;
// If enrolledNamespace is null or keyNamespace is null, fallback to
// existing behaviour - the key is authorized for the client to receive. So return true.
if (enrolledNamespace == null ||
enrolledNamespace.isEmpty ||
(keyNamespace == null || keyNamespace.isEmpty)) {
return true;
}
if (enrolledNamespace.contains('*') ||
enrolledNamespace.contains(keyNamespace)) {
return true;
}
return false;
}

/// Returns true if atKey matches regex. false otherwise
bool keyMatchesRegex(String atKey, String regex) {
return RegExp(regex).hasMatch(atKey);
}

/// match keys which have to included in sync irrespective of whether regex matches
/// e.g @bob:shared_key@alice, shared_key.bob@alice, public:publickey@alice,
/// public:phone@alice (public key without namespace)
bool alwaysIncludeInSync(String atKey) {
return (atKey.contains(AtConstants.atEncryptionSharedKey) &&
RegexUtil.keyType(atKey, false) == KeyType.reservedKey) ||
atKey.startsWith(AtConstants.atEncryptionPublicKey) ||
(atKey.startsWith('public:') && !atKey.contains('.'));
}
}
86 changes: 86 additions & 0 deletions packages/at_persistence_secondary_server/test/commit_log_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,92 @@ void main() async {
expect(commitEntriesMap.containsKey('public:phone.wavi@alice'), false);
expect(commitEntriesMap.containsKey('public:location@alice'), true);
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add another test which both uses a regex and sets skipDeletes: true

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

test(
'A test to verify delete commit entries are NOT returned when skipDeletesUntil is set',
() async {
var commitLogInstance =
await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice'));
var commitLogKeystore = commitLogInstance!.commitLogKeyStore;
await commitLogKeystore.add(CommitEntry(
'test_key_true_1@alice', CommitOp.UPDATE, DateTime.now()));
await commitLogKeystore.add(CommitEntry(
'test_key_true_2@alice', CommitOp.DELETE, DateTime.now()));
await commitLogKeystore.add(CommitEntry(
'test_key_true_3@alice', CommitOp.DELETE, DateTime.now()));
await commitLogKeystore.add(CommitEntry(
'test_key_true_4@alice', CommitOp.UPDATE, DateTime.now()));
Iterator<MapEntry<String, CommitEntry>>? changes = commitLogInstance
.commitLogKeyStore
.getEntries(-1, skipDeletesUntil: 25);
Map<String?, CommitEntry> commitEntriesMap = {};
while (changes.moveNext()) {
var commitEntry = changes.current.value;
commitEntriesMap[commitEntry.atKey] = commitEntry;
}
expect(commitEntriesMap.containsKey('test_key_true_1@alice'), true);
expect(commitEntriesMap.containsKey('test_key_true_2@alice'), false);
expect(commitEntriesMap.containsKey('test_key_true_3@alice'), false);
expect(commitEntriesMap.containsKey('test_key_true_4@alice'), true);
});
test(
'A test to verify correct commit entries are returned when skipDeletesUntil is set and regex is passed',
() async {
var commitLogInstance =
await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice'));
var commitLogKeystore = commitLogInstance!.commitLogKeyStore;
await commitLogKeystore.add(CommitEntry(
'test_key_true_1.wavi@alice', CommitOp.UPDATE, DateTime.now()));
await commitLogKeystore.add(CommitEntry(
'test_key_true_2.buzz@alice', CommitOp.DELETE, DateTime.now()));
await commitLogKeystore.add(CommitEntry(
'test_key_true_3.wavi@alice', CommitOp.DELETE, DateTime.now()));
await commitLogKeystore.add(CommitEntry(
'test_key_true_4.buzz@alice', CommitOp.UPDATE, DateTime.now()));
Iterator<MapEntry<String, CommitEntry>>? changes = commitLogInstance
.commitLogKeyStore
.getEntries(-1, skipDeletesUntil: 25, regex: '.buzz');
Map<String?, CommitEntry> commitEntriesMap = {};
while (changes.moveNext()) {
var commitEntry = changes.current.value;
commitEntriesMap[commitEntry.atKey] = commitEntry;
}
expect(
commitEntriesMap.containsKey('test_key_true_1.wavi@alice'), false);
expect(
commitEntriesMap.containsKey('test_key_true_2.buzz@alice'), false);
expect(
commitEntriesMap.containsKey('test_key_true_3.wavi@alice'), false);
expect(
commitEntriesMap.containsKey('test_key_true_4.buzz@alice'), true);
});
test(
'A test to verify last delete commit entry is returned when its commitId is equal to latest commitId',
() async {
var commitLogInstance =
await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice'));
var commitLogKeystore = commitLogInstance!.commitLogKeyStore;
await commitLogKeystore.add(CommitEntry(
'test_key_true_1@alice', CommitOp.UPDATE, DateTime.now()));
await commitLogKeystore.add(CommitEntry(
'test_key_true_2@alice', CommitOp.DELETE, DateTime.now()));
await commitLogKeystore.add(CommitEntry(
'test_key_true_3@alice', CommitOp.DELETE, DateTime.now()));
await commitLogKeystore.add(CommitEntry(
'test_key_true_4@alice', CommitOp.DELETE, DateTime.now()));
int? latestCommitId = commitLogInstance.lastCommittedSequenceNumber();
Iterator<MapEntry<String, CommitEntry>>? changes = commitLogInstance
.commitLogKeyStore
.getEntries(-1, skipDeletesUntil: latestCommitId);
Map<String?, CommitEntry> commitEntriesMap = {};
while (changes.moveNext()) {
var commitEntry = changes.current.value;
commitEntriesMap[commitEntry.atKey] = commitEntry;
}
expect(commitEntriesMap.containsKey('test_key_true_1@alice'), true);
expect(commitEntriesMap.containsKey('test_key_true_2@alice'), false);
expect(commitEntriesMap.containsKey('test_key_true_3@alice'), false);
expect(commitEntriesMap.containsKey('test_key_true_4@alice'), true);
});
});
tearDown(() async => await tearDownFunc());
});
Expand Down
Loading