From be63b3509a38b08e841768c1fb26c6a5b95045dc Mon Sep 17 00:00:00 2001 From: Siamak Date: Sat, 21 Sep 2024 11:48:53 +0330 Subject: [PATCH] feat: implemented stream function --- Sources/Client/APIClient.swift | 171 +++++++++++++++++++++++++++++++++ 1 file changed, 171 insertions(+) diff --git a/Sources/Client/APIClient.swift b/Sources/Client/APIClient.swift index ff50b1a..275f31c 100644 --- a/Sources/Client/APIClient.swift +++ b/Sources/Client/APIClient.swift @@ -382,6 +382,94 @@ extension APIClient { } +// MARK: - APIClient+CombineStreamRequest + +extension APIClient { + // MARK: - Combine Stream Request + + /// Performs a streaming network request using Combine. + /// - Parameter endpoint: The NetworkRouter defining the request. + /// - Returns: A publisher that emits decoded responses as they arrive or an error. + public func streamRequest(_ endpoint: any NetworkRouter) -> AnyPublisher> { + guard let urlRequest = try? endpoint.asURLRequest() else { + return Fail(error: .unknown).eraseToAnyPublisher() + } + + return makeStreamRequest(urlRequest: urlRequest) + } + + /// Internal method to make the actual streaming network request. + private func makeStreamRequest(urlRequest: URLRequest) -> AnyPublisher> { + let sessionDelegate = StreamingSessionDelegate() + sessionDelegate.logLevel = logLevel + let session = configuredSession(delegate: sessionDelegate, configuration: configuration) + + sessionDelegate.startRequest(session: session, urlRequest: urlRequest) + + return sessionDelegate.subject + .mapError { [weak self] error in + self?.mapErrorToNetworkError(error) ?? .unknown + } + .eraseToAnyPublisher() + } +} + +// MARK: - StreamingSessionDelegate + +private class StreamingSessionDelegate: NSObject, URLSessionDataDelegate,@unchecked Sendable { + let subject = PassthroughSubject() + var dataBuffer = Data() + var logLevel: LogLevel = .none + + func startRequest(session: URLSession, urlRequest: URLRequest) { + URLSessionLogger.shared.logRequest(urlRequest, logLevel: logLevel) + let task = session.dataTask(with: urlRequest) + task.resume() + } + + // Handle data received + func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) { + dataBuffer.append(data) + + while let range = dataBuffer.range(of: Data("\n".utf8)) { + let lineData = dataBuffer.subdata(in: dataBuffer.startIndex..(_ endpoint: any NetworkRouter) -> AsyncThrowingStream { + guard let urlRequest = try? endpoint.asURLRequest() else { + return AsyncThrowingStream { continuation in + continuation.finish(throwing: NetworkError.unknown) + } + } + + return makeAsyncStreamRequest(urlRequest: urlRequest) + } + + /// Internal method to make the actual async streaming network request. + @available(iOS 15.0, *) + private func makeAsyncStreamRequest(urlRequest: URLRequest) -> AsyncThrowingStream { + return AsyncThrowingStream { [weak self] continuation in + guard let self = self else { + continuation.finish(throwing: NetworkError.unknown) + return + } + + let session = self.configuredSession(configuration: self.configuration) + + let task = Task { + do { + let (bytes, response) = try await session.bytes(for: urlRequest) + URLSessionLogger.shared.logResponse(response, data: nil, error: nil, logLevel: self.logLevel) + + var iterator = bytes.makeAsyncIterator() + var dataBuffer = Data() + + while let chunk = try await iterator.next() { + dataBuffer.append(chunk) + + while let range = dataBuffer.range(of: Data("\n".utf8)) { + let lineData = dataBuffer.subdata(in: dataBuffer.startIndex..