Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Networking error publisher #1070

Merged
merged 5 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions Sources/WalletConnectNetworking/NetworkInteracting.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,21 @@ public protocol NetworkInteracting {
on request: ProtocolMethod
) -> AnyPublisher<ResponseSubscriptionErrorPayload<Request>, Never>

func subscribeOnRequest<RequestParams: Codable>(
protocolMethod: ProtocolMethod,
requestOfType: RequestParams.Type,
errorHandler: ErrorHandler?,
subscription: @escaping (RequestSubscriptionPayload<RequestParams>) async throws -> Void
)

func subscribeOnResponse<Request: Codable, Response: Codable>(
protocolMethod: ProtocolMethod,
requestOfType: Request.Type,
responseOfType: Response.Type,
errorHandler: ErrorHandler?,
subscription: @escaping (ResponseSubscriptionPayload<Request, Response>) async throws -> Void
)

func getClientId() throws -> String
}

Expand Down
2 changes: 1 addition & 1 deletion Sources/WalletConnectNetworking/NetworkingClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import Combine

public protocol NetworkingClient {
var socketConnectionStatusPublisher: AnyPublisher<SocketConnectionStatus, Never> { get }
var logsPublisher: AnyPublisher<Log, Never> {get}
var logsPublisher: AnyPublisher<Log, Never> { get }
func setLogging(level: LoggingLevel)
func connect() throws
func disconnect(closeCode: URLSessionWebSocketTask.CloseCode) throws
Expand Down
37 changes: 37 additions & 0 deletions Sources/WalletConnectNetworking/NetworkingInteractor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,43 @@ public class NetworkingInteractor: NetworkInteracting {
rpcHistory.deleteAll(forTopics: topics)
}

public func subscribeOnRequest<RequestParams: Codable>(
protocolMethod: ProtocolMethod,
requestOfType: RequestParams.Type,
errorHandler: ErrorHandler?,
subscription: @escaping (RequestSubscriptionPayload<RequestParams>) async throws -> Void
) {
requestSubscription(on: protocolMethod)
.sink { (payload: RequestSubscriptionPayload<RequestParams>) in
Task(priority: .high) {
do {
try await subscription(payload)
} catch {
errorHandler?.handle(error: error)
}
}
}.store(in: &publishers)
}

public func subscribeOnResponse<Request: Codable, Response: Codable>(
protocolMethod: ProtocolMethod,
requestOfType: Request.Type,
responseOfType: Response.Type,
errorHandler: ErrorHandler?,
subscription: @escaping (ResponseSubscriptionPayload<Request, Response>) async throws -> Void
) {
responseSubscription(on: protocolMethod)
.sink { (payload: ResponseSubscriptionPayload<Request, Response>) in
Task(priority: .high) {
do {
try await subscription(payload)
} catch {
errorHandler?.handle(error: error)
}
}
}.store(in: &publishers)
}

public func requestSubscription<RequestParams: Codable>(on request: ProtocolMethod) -> AnyPublisher<RequestSubscriptionPayload<RequestParams>, Never> {
return requestPublisher
.filter { rpcRequest in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ class DeleteNotifySubscriptionSubscriber {
private let networkingInteractor: NetworkInteracting
private let kms: KeyManagementServiceProtocol
private let logger: ConsoleLogging
private var publishers = [AnyCancellable]()
private let notifyStorage: NotifyStorage

init(networkingInteractor: NetworkInteracting,
Expand All @@ -21,14 +20,13 @@ class DeleteNotifySubscriptionSubscriber {
}

private func subscribeForDeleteSubscription() {
let protocolMethod = NotifyDeleteProtocolMethod()
networkingInteractor.requestSubscription(on: protocolMethod)
.sink { [unowned self] (payload: RequestSubscriptionPayload<NotifyDeleteResponsePayload.Wrapper>) in

guard let (_, _) = try? NotifyDeleteResponsePayload.decodeAndVerify(from: payload.request)
else { fatalError() /* TODO: Handle error */ }

logger.debug("Peer deleted subscription")
}.store(in: &publishers)
networkingInteractor.subscribeOnRequest(
protocolMethod: NotifyDeleteProtocolMethod(),
requestOfType: NotifyDeleteResponsePayload.Wrapper.self,
errorHandler: logger
) { [unowned self] payload in
let (_, _) = try NotifyDeleteResponsePayload.decodeAndVerify(from: payload.request)
logger.debug("Peer deleted subscription")
}
}
}
4 changes: 0 additions & 4 deletions Sources/WalletConnectNotify/Client/Wallet/NotifyClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@ public class NotifyClient {
return notifyStorage.newSubscriptionPublisher
}

public var subscriptionErrorPublisher: AnyPublisher<Error, Never> {
return notifySubscribeResponseSubscriber.subscriptionErrorPublisher
}

public var deleteSubscriptionPublisher: AnyPublisher<String, Never> {
return notifyStorage.deleteSubscriptionPublisher
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ class NotifyMessageSubscriber {
private let notifyStorage: NotifyStorage
private let crypto: CryptoProvider
private let logger: ConsoleLogging
private var publishers = [AnyCancellable]()
private let notifyMessagePublisherSubject = PassthroughSubject<NotifyMessageRecord, Never>()

public var notifyMessagePublisher: AnyPublisher<NotifyMessageRecord, Never> {
Expand All @@ -26,45 +25,42 @@ class NotifyMessageSubscriber {
}

private func subscribeForNotifyMessages() {
let protocolMethod = NotifyMessageProtocolMethod()
networkingInteractor.requestSubscription(on: protocolMethod)
.sink { [unowned self] (payload: RequestSubscriptionPayload<NotifyMessagePayload.Wrapper>) in
networkingInteractor.subscribeOnRequest(
protocolMethod: NotifyMessageProtocolMethod(),
requestOfType: NotifyMessagePayload.Wrapper.self,
errorHandler: logger
) { [unowned self] payload in
logger.debug("Received Notify Message on topic: \(payload.topic)", properties: ["topic": payload.topic])

logger.debug("Received Notify Message on topic: \(payload.topic)", properties: ["topic": payload.topic])
let (messagePayload, claims) = try NotifyMessagePayload.decodeAndVerify(from: payload.request)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this throws will it be handled in Networking Client?

if so, isn't it strange that lower level client handles higher level client's errors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Networking client not handling, just aggregating. Handling will be in Networking client publisher's subscribers

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, the idea of having the "catch block" is really good but to me it's still strange that for expample:
in notify engine in networkingInteractor.subscribeOnRequest block let's say function throws a notify specific error.
for example "notify subscription not found" or whatever and that error is passed down to Networking Client.

so let's say later an app want's to listen to networking only errors and it gets "notify subscription not found"

don't you think something smells here?

Copy link
Contributor Author

@flypaper0 flypaper0 Sep 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is the only way to have single error publisher. Depends on what we trying to acheive. By that PR I want to reduce error handling code for SDK's. One thing. May be we can delete error publisher for now and only log inside catch block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we are mostly just logging errors

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still not convinced 🤔

logger.debug("Decoded Notify Message: \(payload.topic)", properties: ["topic": payload.topic, "messageBody": messagePayload.message.body, "messageTitle": messagePayload.message.title, "publishedAt": payload.publishedAt.description, "id": payload.id.string])
let dappPubKey = try DIDKey(did: claims.iss)
let messageData = try JSONEncoder().encode(messagePayload.message)

Task(priority: .high) {
let (messagePayload, claims) = try NotifyMessagePayload.decodeAndVerify(from: payload.request)
logger.debug("Decoded Notify Message: \(payload.topic)", properties: ["topic": payload.topic, "messageBody": messagePayload.message.body, "messageTitle": messagePayload.message.title, "publishedAt": payload.publishedAt.description, "id": payload.id.string])
let dappPubKey = try DIDKey(did: claims.iss)
let messageData = try JSONEncoder().encode(messagePayload.message)
let record = NotifyMessageRecord(id: payload.id.string, topic: payload.topic, message: messagePayload.message, publishedAt: payload.publishedAt)
notifyStorage.setMessage(record)
notifyMessagePublisherSubject.send(record)

let record = NotifyMessageRecord(id: payload.id.string, topic: payload.topic, message: messagePayload.message, publishedAt: payload.publishedAt)
notifyStorage.setMessage(record)
notifyMessagePublisherSubject.send(record)
let receiptPayload = NotifyMessageReceiptPayload(
keyserver: keyserver, dappPubKey: dappPubKey,
messageHash: crypto.keccak256(messageData).toHexString(),
app: messagePayload.app
)

let receiptPayload = NotifyMessageReceiptPayload(
keyserver: keyserver, dappPubKey: dappPubKey,
messageHash: crypto.keccak256(messageData).toHexString(),
app: messagePayload.app
)
let wrapper = try identityClient.signAndCreateWrapper(
payload: receiptPayload,
account: messagePayload.account
)

let wrapper = try identityClient.signAndCreateWrapper(
payload: receiptPayload,
account: messagePayload.account
)
let response = RPCResponse(id: payload.id, result: wrapper)

let response = RPCResponse(id: payload.id, result: wrapper)

try await networkingInteractor.respond(
topic: payload.topic,
response: response,
protocolMethod: NotifyMessageProtocolMethod()
)

logger.debug("Sent Notify Message Response on topic: \(payload.topic)", properties: ["topic" : payload.topic, "messageBody": messagePayload.message.body, "messageTitle": messagePayload.message.title, "id": payload.id.string, "result": wrapper.jwtString])
}

}.store(in: &publishers)
try await networkingInteractor.respond(
topic: payload.topic,
response: response,
protocolMethod: NotifyMessageProtocolMethod()
)

logger.debug("Sent Notify Message Response on topic: \(payload.topic)", properties: ["topic" : payload.topic, "messageBody": messagePayload.message.body, "messageTitle": messagePayload.message.title, "id": payload.id.string, "result": wrapper.jwtString])
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,29 +30,30 @@ private extension NotifyUpdateResponseSubscriber {
}

func subscribeForUpdateResponse() {
let protocolMethod = NotifyUpdateProtocolMethod()
networkingInteractor.responseSubscription(on: protocolMethod)
.sink {[unowned self] (payload: ResponseSubscriptionPayload<NotifyUpdatePayload.Wrapper, NotifyUpdateResponsePayload.Wrapper>) in
Task(priority: .high) {
logger.debug("Received Notify Update response")
networkingInteractor.subscribeOnResponse(
protocolMethod: NotifyUpdateProtocolMethod(),
requestOfType: NotifyUpdatePayload.Wrapper.self,
responseOfType: NotifyUpdateResponsePayload.Wrapper.self,
errorHandler: logger
) { [unowned self] payload in
logger.debug("Received Notify Update response")

let subscriptionTopic = payload.topic
let subscriptionTopic = payload.topic

let (requestPayload, requestClaims) = try NotifyUpdatePayload.decodeAndVerify(from: payload.request)
let (_, _) = try NotifyUpdateResponsePayload.decodeAndVerify(from: payload.response)
let (requestPayload, requestClaims) = try NotifyUpdatePayload.decodeAndVerify(from: payload.request)
let (_, _) = try NotifyUpdateResponsePayload.decodeAndVerify(from: payload.response)

let scope = try await buildScope(selected: requestPayload.scope, dappUrl: requestPayload.dappUrl)
let scope = try await buildScope(selected: requestPayload.scope, dappUrl: requestPayload.dappUrl)

guard let oldSubscription = notifyStorage.getSubscription(topic: subscriptionTopic) else {
logger.debug("NotifyUpdateResponseSubscriber Subscription does not exist")
return
}
guard let oldSubscription = notifyStorage.getSubscription(topic: subscriptionTopic) else {
logger.debug("NotifyUpdateResponseSubscriber Subscription does not exist")
return
}

notifyStorage.updateSubscription(oldSubscription, scope: scope, expiry: requestClaims.exp)
notifyStorage.updateSubscription(oldSubscription, scope: scope, expiry: requestClaims.exp)

logger.debug("Updated Subscription")
}
}.store(in: &publishers)
logger.debug("Updated Subscription")
}
}

func buildScope(selected: String, dappUrl: String) async throws -> [String: ScopeValue] {
Expand Down
Loading