Skip to content

Commit

Permalink
Subscriptions Updater
Browse files Browse the repository at this point in the history
  • Loading branch information
flypaper0 committed Jan 23, 2024
1 parent 880b82a commit f1688e5
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 144 deletions.
5 changes: 4 additions & 1 deletion Sources/WalletConnectNotify/Client/Wallet/NotifyClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class NotifyClient {
private let notifyWatchSubscriptionsResponseSubscriber: NotifyWatchSubscriptionsResponseSubscriber
private let notifyWatcherAgreementKeysProvider: NotifyWatcherAgreementKeysProvider
private let notifySubscriptionsChangedRequestSubscriber: NotifySubscriptionsChangedRequestSubscriber
private let notifySubscriptionsUpdater: NotifySubsctiptionsUpdater
private let subscriptionWatcher: SubscriptionWatcher

init(logger: ConsoleLogging,
Expand All @@ -58,6 +59,7 @@ public class NotifyClient {
notifyWatchSubscriptionsResponseSubscriber: NotifyWatchSubscriptionsResponseSubscriber,
notifyWatcherAgreementKeysProvider: NotifyWatcherAgreementKeysProvider,
notifySubscriptionsChangedRequestSubscriber: NotifySubscriptionsChangedRequestSubscriber,
notifySubscriptionsUpdater: NotifySubsctiptionsUpdater,
subscriptionWatcher: SubscriptionWatcher
) {
self.logger = logger
Expand All @@ -78,6 +80,7 @@ public class NotifyClient {
self.notifyWatchSubscriptionsResponseSubscriber = notifyWatchSubscriptionsResponseSubscriber
self.notifyWatcherAgreementKeysProvider = notifyWatcherAgreementKeysProvider
self.notifySubscriptionsChangedRequestSubscriber = notifySubscriptionsChangedRequestSubscriber
self.notifySubscriptionsUpdater = notifySubscriptionsUpdater
self.subscriptionWatcher = subscriptionWatcher
}

Expand Down Expand Up @@ -180,7 +183,7 @@ private extension NotifyClient {
extension NotifyClient {

public var subscriptionChangedPublisher: AnyPublisher<[NotifySubscription], Never> {
return notifySubscriptionsChangedRequestSubscriber.subscriptionChangedPublisher
return notifySubscriptionsUpdater.subscriptionChangedPublisher
}

public func register(deviceToken: String) async throws {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,22 @@ public struct NotifyClientFactory {

let notifySubscribeRequester = NotifySubscribeRequester(keyserverURL: keyserverURL, networkingInteractor: networkInteractor, identityClient: identityClient, logger: logger, kms: kms, webDidResolver: webDidResolver, notifyConfigProvider: notifyConfigProvider)

let notifySubscribeResponseSubscriber = NotifySubscribeResponseSubscriber(networkingInteractor: networkInteractor, kms: kms, logger: logger, groupKeychainStorage: groupKeychainStorage, notifyStorage: notifyStorage, notifyConfigProvider: notifyConfigProvider)
let notifySubscriptionsUpdater = NotifySubsctiptionsUpdater(networkingInteractor: networkInteractor, kms: kms, logger: logger, notifyStorage: notifyStorage, groupKeychainStorage: groupKeychainStorage)

let notifySubscriptionsBuilder = NotifySubscriptionsBuilder(notifyConfigProvider: notifyConfigProvider)

let notifySubscribeResponseSubscriber = NotifySubscribeResponseSubscriber(networkingInteractor: networkInteractor, logger: logger, notifySubscriptionsBuilder: notifySubscriptionsBuilder, notifySubscriptionsUpdater: notifySubscriptionsUpdater)

let notifyUpdateRequester = NotifyUpdateRequester(keyserverURL: keyserverURL, identityClient: identityClient, networkingInteractor: networkInteractor, notifyConfigProvider: notifyConfigProvider, logger: logger, notifyStorage: notifyStorage)

let notifyUpdateResponseSubscriber = NotifyUpdateResponseSubscriber(networkingInteractor: networkInteractor, logger: logger, notifyConfigProvider: notifyConfigProvider, notifyStorage: notifyStorage)
let notifyUpdateResponseSubscriber = NotifyUpdateResponseSubscriber(networkingInteractor: networkInteractor, logger: logger)

let subscriptionsAutoUpdater = SubscriptionsAutoUpdater(notifyUpdateRequester: notifyUpdateRequester, logger: logger, notifyStorage: notifyStorage)

let notifyWatcherAgreementKeysProvider = NotifyWatcherAgreementKeysProvider(kms: kms)
let notifyWatchSubscriptionsRequester = NotifyWatchSubscriptionsRequester(keyserverURL: keyserverURL, networkingInteractor: networkInteractor, identityClient: identityClient, logger: logger, webDidResolver: webDidResolver, notifyAccountProvider: notifyAccountProvider, notifyWatcherAgreementKeysProvider: notifyWatcherAgreementKeysProvider, notifyHost: notifyHost)
let notifySubscriptionsBuilder = NotifySubscriptionsBuilder(notifyConfigProvider: notifyConfigProvider)
let notifyWatchSubscriptionsResponseSubscriber = NotifyWatchSubscriptionsResponseSubscriber(networkingInteractor: networkInteractor, kms: kms, logger: logger, notifyStorage: notifyStorage, groupKeychainStorage: groupKeychainStorage, notifySubscriptionsBuilder: notifySubscriptionsBuilder)
let notifySubscriptionsChangedRequestSubscriber = NotifySubscriptionsChangedRequestSubscriber(keyserver: keyserverURL, networkingInteractor: networkInteractor, kms: kms, identityClient: identityClient, logger: logger, groupKeychainStorage: groupKeychainStorage, notifyStorage: notifyStorage, notifySubscriptionsBuilder: notifySubscriptionsBuilder)
let notifyWatchSubscriptionsResponseSubscriber = NotifyWatchSubscriptionsResponseSubscriber(networkingInteractor: networkInteractor, logger: logger, notifySubscriptionsBuilder: notifySubscriptionsBuilder, notifySubscriptionsUpdater: notifySubscriptionsUpdater)
let notifySubscriptionsChangedRequestSubscriber = NotifySubscriptionsChangedRequestSubscriber(keyserver: keyserverURL, networkingInteractor: networkInteractor, identityClient: identityClient, logger: logger, notifySubscriptionsUpdater: notifySubscriptionsUpdater, notifySubscriptionsBuilder: notifySubscriptionsBuilder)
let subscriptionWatcher = SubscriptionWatcher(notifyWatchSubscriptionsRequester: notifyWatchSubscriptionsRequester, logger: logger)
let historyService = HistoryService(keyserver: keyserverURL, networkingClient: networkInteractor, identityClient: identityClient)

Expand All @@ -89,7 +92,8 @@ public struct NotifyClientFactory {
subscriptionsAutoUpdater: subscriptionsAutoUpdater,
notifyWatchSubscriptionsResponseSubscriber: notifyWatchSubscriptionsResponseSubscriber,
notifyWatcherAgreementKeysProvider: notifyWatcherAgreementKeysProvider,
notifySubscriptionsChangedRequestSubscriber: notifySubscriptionsChangedRequestSubscriber,
notifySubscriptionsChangedRequestSubscriber: notifySubscriptionsChangedRequestSubscriber,
notifySubscriptionsUpdater: notifySubscriptionsUpdater,
subscriptionWatcher: subscriptionWatcher
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import Foundation
import Combine

final class NotifySubsctiptionsUpdater {
private let networkingInteractor: NetworkInteracting
private let kms: KeyManagementServiceProtocol
private let logger: ConsoleLogging
private let notifyStorage: NotifyStorage
private let groupKeychainStorage: KeychainStorageProtocol

private let subscriptionChangedSubject = PassthroughSubject<[NotifySubscription], Never>()

var subscriptionChangedPublisher: AnyPublisher<[NotifySubscription], Never> {
return subscriptionChangedSubject.eraseToAnyPublisher()
}

init(networkingInteractor: NetworkInteracting, kms: KeyManagementServiceProtocol, logger: ConsoleLogging, notifyStorage: NotifyStorage, groupKeychainStorage: KeychainStorageProtocol) {
self.networkingInteractor = networkingInteractor
self.kms = kms
self.logger = logger
self.notifyStorage = notifyStorage
self.groupKeychainStorage = groupKeychainStorage
}

func update(subscriptions newSubscriptions: [NotifySubscription], for account: Account) async throws {
let oldSubscriptions = notifyStorage.getSubscriptions(account: account)

subscriptionChangedSubject.send(newSubscriptions)

try Task.checkCancellation()

let subscriptions = oldSubscriptions.difference(from: newSubscriptions)

logger.debug("Received: \(newSubscriptions.count), changed: \(subscriptions.count)")

if subscriptions.count > 0 {
try notifyStorage.replaceAllSubscriptions(newSubscriptions)

for subscription in newSubscriptions {
let symKey = try SymmetricKey(hex: subscription.symKey)
try groupKeychainStorage.add(symKey, forKey: subscription.topic)
try kms.setSymmetricKey(symKey, for: subscription.topic)
}

let topics = newSubscriptions.map { $0.topic }

try await networkingInteractor.batchSubscribe(topics: topics)

try Task.checkCancellation()

logger.debug("Updated Subscriptions by Subscriptions Changed Request", properties: [
"topics": newSubscriptions.map { $0.topic }.joined(separator: ",")
])
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,25 @@ class NotifySubscriptionsChangedRequestSubscriber {
private let keyserver: URL
private let networkingInteractor: NetworkInteracting
private let identityClient: IdentityClient
private let kms: KeyManagementServiceProtocol
private let logger: ConsoleLogging
private let groupKeychainStorage: KeychainStorageProtocol
private let notifyStorage: NotifyStorage
private let notifySubscriptionsUpdater: NotifySubsctiptionsUpdater
private let notifySubscriptionsBuilder: NotifySubscriptionsBuilder

private let subscriptionChangedSubject = PassthroughSubject<[NotifySubscription], Never>()

var subscriptionChangedPublisher: AnyPublisher<[NotifySubscription], Never> {
return subscriptionChangedSubject.eraseToAnyPublisher()
}

init(
keyserver: URL,
networkingInteractor: NetworkInteracting,
kms: KeyManagementServiceProtocol,
identityClient: IdentityClient,
logger: ConsoleLogging,
groupKeychainStorage: KeychainStorageProtocol,
notifyStorage: NotifyStorage,
notifySubscriptionsUpdater: NotifySubsctiptionsUpdater,
notifySubscriptionsBuilder: NotifySubscriptionsBuilder
) {
self.keyserver = keyserver
self.networkingInteractor = networkingInteractor
self.kms = kms
self.logger = logger
self.identityClient = identityClient
self.groupKeychainStorage = groupKeychainStorage
self.notifyStorage = notifyStorage
self.notifySubscriptionsUpdater = notifySubscriptionsUpdater
self.notifySubscriptionsBuilder = notifySubscriptionsBuilder

subscribeForNofifyChangedRequests()
}

Expand All @@ -44,54 +33,20 @@ class NotifySubscriptionsChangedRequestSubscriber {
protocolMethod: NotifySubscriptionsChangedProtocolMethod(),
requestOfType: NotifySubscriptionsChangedRequestPayload.Wrapper.self,
errorHandler: logger) { [unowned self] payload in

logger.debug("Received Subscriptions Changed Request")

let (jwtPayload, _) = try NotifySubscriptionsChangedRequestPayload.decodeAndVerify(from: payload.request)
let account = jwtPayload.account

// TODO: varify signature with notify server diddoc authentication key

let oldSubscriptions = notifyStorage.getSubscriptions(account: account)
let newSubscriptions = try await notifySubscriptionsBuilder.buildSubscriptions(jwtPayload.subscriptions)

subscriptionChangedSubject.send(newSubscriptions)

try Task.checkCancellation()

let subscriptions = oldSubscriptions.difference(from: newSubscriptions)
let subscriptions = try await notifySubscriptionsBuilder.buildSubscriptions(jwtPayload.subscriptions)

logger.debug("Received: \(newSubscriptions.count), changed: \(subscriptions.count)")

if subscriptions.count > 0 {
try notifyStorage.replaceAllSubscriptions(newSubscriptions)

for subscription in newSubscriptions {
let symKey = try SymmetricKey(hex: subscription.symKey)
try groupKeychainStorage.add(symKey, forKey: subscription.topic)
try kms.setSymmetricKey(symKey, for: subscription.topic)
}

let topics = newSubscriptions.map { $0.topic }

try await networkingInteractor.batchSubscribe(topics: topics)

try Task.checkCancellation()

var logProperties = ["rpcId": payload.id.string]
for (index, subscription) in newSubscriptions.enumerated() {
let key = "subscription_\(index + 1)"
logProperties[key] = subscription.topic
}

logger.debug("Updated Subscriptions by Subscriptions Changed Request", properties: logProperties)
}
try await notifySubscriptionsUpdater.update(subscriptions: subscriptions, for: jwtPayload.account)

try await respond(topic: payload.topic, account: jwtPayload.account, rpcId: payload.id, notifyServerAuthenticationKey: jwtPayload.notifyServerAuthenticationKey)
}
}

private func respond(topic: String, account: Account, rpcId: RPCID, notifyServerAuthenticationKey: DIDKey) async throws {

let receiptPayload = NotifySubscriptionsChangedResponsePayload(account: account, keyserver: keyserver, notifyServerAuthenticationKey: notifyServerAuthenticationKey)

let wrapper = try identityClient.signAndCreateWrapper(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,21 @@ import Foundation
import Combine

class NotifyUpdateResponseSubscriber {

private let networkingInteractor: NetworkInteracting
private var publishers = [AnyCancellable]()
private let logger: ConsoleLogging
private let notifyStorage: NotifyStorage
private let nofityConfigProvider: NotifyConfigProvider

init(networkingInteractor: NetworkInteracting,
logger: ConsoleLogging,
notifyConfigProvider: NotifyConfigProvider,
notifyStorage: NotifyStorage
) {

init(networkingInteractor: NetworkInteracting, logger: ConsoleLogging) {
self.networkingInteractor = networkingInteractor
self.logger = logger
self.notifyStorage = notifyStorage
self.nofityConfigProvider = notifyConfigProvider

subscribeForUpdateResponse()
}

// TODO: handle error response
}

private extension NotifyUpdateResponseSubscriber {
enum Errors: Error {
case subscriptionDoesNotExist
case selectedScopeNotFound
}

func subscribeForUpdateResponse() {
networkingInteractor.subscribeOnResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,20 @@ import Combine

class NotifyWatchSubscriptionsResponseSubscriber {
private let networkingInteractor: NetworkInteracting
private let kms: KeyManagementServiceProtocol
private let logger: ConsoleLogging
private let notifyStorage: NotifyStorage
private let groupKeychainStorage: KeychainStorageProtocol
private let notifySubscriptionsBuilder: NotifySubscriptionsBuilder
private let notifySubscriptionsUpdater: NotifySubsctiptionsUpdater

init(networkingInteractor: NetworkInteracting,
kms: KeyManagementServiceProtocol,
logger: ConsoleLogging,
notifyStorage: NotifyStorage,
groupKeychainStorage: KeychainStorageProtocol,
notifySubscriptionsBuilder: NotifySubscriptionsBuilder
notifySubscriptionsBuilder: NotifySubscriptionsBuilder,
notifySubscriptionsUpdater: NotifySubsctiptionsUpdater
) {
self.networkingInteractor = networkingInteractor
self.kms = kms
self.logger = logger
self.notifyStorage = notifyStorage
self.groupKeychainStorage = groupKeychainStorage
self.notifySubscriptionsBuilder = notifySubscriptionsBuilder
self.notifySubscriptionsUpdater = notifySubscriptionsUpdater

subscribeForWatchSubscriptionsResponse()
}

Expand All @@ -32,45 +27,15 @@ class NotifyWatchSubscriptionsResponseSubscriber {
requestOfType: NotifyWatchSubscriptionsPayload.Wrapper.self,
responseOfType: NotifyWatchSubscriptionsResponsePayload.Wrapper.self,
errorHandler: logger) { [unowned self] payload in

logger.debug("Received Notify Watch Subscriptions response")

let (requestPayload, _) = try NotifyWatchSubscriptionsPayload.decodeAndVerify(from: payload.request)
let (responsePayload, _) = try NotifyWatchSubscriptionsResponsePayload.decodeAndVerify(from: payload.response)
let (watchSubscriptionPayloadRequest, _) = try NotifyWatchSubscriptionsPayload.decodeAndVerify(from: payload.request)

let account = watchSubscriptionPayloadRequest.subscriptionAccount
// TODO: varify signature with notify server diddoc authentication key

let oldSubscriptions = notifyStorage.getSubscriptions(account: account)
let newSubscriptions = try await notifySubscriptionsBuilder.buildSubscriptions(responsePayload.subscriptions)

try Task.checkCancellation()

let subscriptions = oldSubscriptions.difference(from: newSubscriptions)

logger.debug("Received: \(newSubscriptions.count), changed: \(subscriptions.count)")

if subscriptions.count > 0 {
// TODO: unsubscribe for oldSubscriptions topics that are not included in new subscriptions
try notifyStorage.replaceAllSubscriptions(newSubscriptions)

for subscription in newSubscriptions {
let symKey = try SymmetricKey(hex: subscription.symKey)
try groupKeychainStorage.add(symKey, forKey: subscription.topic)
try kms.setSymmetricKey(symKey, for: subscription.topic)
}

try await networkingInteractor.batchSubscribe(topics: newSubscriptions.map { $0.topic })

try Task.checkCancellation()

var logProperties = [String: String]()
for (index, subscription) in newSubscriptions.enumerated() {
let key = "subscription_\(index + 1)"
logProperties[key] = subscription.topic
}
let subscriptions = try await notifySubscriptionsBuilder.buildSubscriptions(responsePayload.subscriptions)

logger.debug("Updated Subscriptions with Watch Subscriptions Update, number of subscriptions: \(newSubscriptions.count)", properties: logProperties)
}
try await notifySubscriptionsUpdater.update(subscriptions: subscriptions, for: requestPayload.subscriptionAccount)
}
}

Expand Down
Loading

0 comments on commit f1688e5

Please sign in to comment.