Skip to content

Commit

Permalink
feat: implemented stream function
Browse files Browse the repository at this point in the history
  • Loading branch information
siamakrostami committed Sep 21, 2024
1 parent a1135fa commit be63b35
Showing 1 changed file with 171 additions and 0 deletions.
171 changes: 171 additions & 0 deletions Sources/Client/APIClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,94 @@ extension APIClient {

}

// MARK: - APIClient+CombineStreamRequest

extension APIClient {
// MARK: - Combine Stream Request

/// Performs a streaming network request using Combine.
/// - Parameter endpoint: The NetworkRouter defining the request.
/// - Returns: A publisher that emits decoded responses as they arrive or an error.
public func streamRequest<T: Codable>(_ endpoint: any NetworkRouter) -> AnyPublisher<T, NetworkError<ErrorType>> {
guard let urlRequest = try? endpoint.asURLRequest() else {
return Fail(error: .unknown).eraseToAnyPublisher()
}

return makeStreamRequest(urlRequest: urlRequest)
}

/// Internal method to make the actual streaming network request.
private func makeStreamRequest<T: Codable>(urlRequest: URLRequest) -> AnyPublisher<T, NetworkError<ErrorType>> {
let sessionDelegate = StreamingSessionDelegate<T>()
sessionDelegate.logLevel = logLevel
let session = configuredSession(delegate: sessionDelegate, configuration: configuration)

sessionDelegate.startRequest(session: session, urlRequest: urlRequest)

return sessionDelegate.subject
.mapError { [weak self] error in
self?.mapErrorToNetworkError(error) ?? .unknown
}
.eraseToAnyPublisher()
}
}

// MARK: - StreamingSessionDelegate

private class StreamingSessionDelegate<T: Codable>: NSObject, URLSessionDataDelegate,@unchecked Sendable {
let subject = PassthroughSubject<T, Error>()
var dataBuffer = Data()
var logLevel: LogLevel = .none

func startRequest(session: URLSession, urlRequest: URLRequest) {
URLSessionLogger.shared.logRequest(urlRequest, logLevel: logLevel)
let task = session.dataTask(with: urlRequest)
task.resume()
}

// Handle data received
func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) {
dataBuffer.append(data)

while let range = dataBuffer.range(of: Data("\n".utf8)) {
let lineData = dataBuffer.subdata(in: dataBuffer.startIndex..<range.lowerBound)
dataBuffer.removeSubrange(dataBuffer.startIndex..<range.upperBound)

do {
let decoder = JSONDecoder()
let decodedObject = try decoder.decode(T.self, from: lineData)
subject.send(decodedObject)
} catch {
// Handle decoding error for this line
subject.send(completion: .failure(error))
return
}
}
}

// Handle errors and completion
func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) {
if let error = error {
URLSessionLogger.shared.logResponse(nil, data: nil, error: error, logLevel: logLevel)
subject.send(completion: .failure(error))
} else {
// If there's any data left in buffer after the stream ends
if !dataBuffer.isEmpty {
do {
let decoder = JSONDecoder()
let decodedObject = try decoder.decode(T.self, from: dataBuffer)
subject.send(decodedObject)
} catch {
subject.send(completion: .failure(error))
return
}
}
subject.send(completion: .finished)
}
}
}


// MARK: - APIClient+ErrorHandling

extension APIClient {
Expand Down Expand Up @@ -422,6 +510,89 @@ extension APIClient {

}

// MARK: - APIClient+AsyncStreamRequest

extension APIClient {
// MARK: - Async/Await Stream Request

/// Performs a streaming network request using async/await.
/// - Parameter endpoint: The NetworkRouter defining the request.
/// - Returns: An AsyncThrowingStream that yields decoded responses as they arrive.
@available(iOS 15.0, *)
public func asyncStreamRequest<T: Codable>(_ endpoint: any NetworkRouter) -> AsyncThrowingStream<T, Error> {
guard let urlRequest = try? endpoint.asURLRequest() else {
return AsyncThrowingStream { continuation in
continuation.finish(throwing: NetworkError<ErrorType>.unknown)
}
}

return makeAsyncStreamRequest(urlRequest: urlRequest)
}

/// Internal method to make the actual async streaming network request.
@available(iOS 15.0, *)
private func makeAsyncStreamRequest<T: Codable>(urlRequest: URLRequest) -> AsyncThrowingStream<T, Error> {
return AsyncThrowingStream { [weak self] continuation in
guard let self = self else {
continuation.finish(throwing: NetworkError<ErrorType>.unknown)
return
}

let session = self.configuredSession(configuration: self.configuration)

let task = Task {
do {
let (bytes, response) = try await session.bytes(for: urlRequest)
URLSessionLogger.shared.logResponse(response, data: nil, error: nil, logLevel: self.logLevel)

var iterator = bytes.makeAsyncIterator()
var dataBuffer = Data()

while let chunk = try await iterator.next() {
dataBuffer.append(chunk)

while let range = dataBuffer.range(of: Data("\n".utf8)) {
let lineData = dataBuffer.subdata(in: dataBuffer.startIndex..<range.lowerBound)
dataBuffer.removeSubrange(dataBuffer.startIndex..<range.upperBound)

do {
let decoder = JSONDecoder()
let decodedObject = try decoder.decode(T.self, from: lineData)
continuation.yield(decodedObject)
} catch {
// Handle decoding error for this chunk
continuation.finish(throwing: error)
return
}
}
}

// If there's any data left in buffer after the stream ends
if !dataBuffer.isEmpty {
do {
let decoder = JSONDecoder()
let decodedObject = try decoder.decode(T.self, from: dataBuffer)
continuation.yield(decodedObject)
} catch {
continuation.finish(throwing: error)
return
}
}

continuation.finish()
} catch {
URLSessionLogger.shared.logResponse(nil, data: nil, error: error, logLevel: self.logLevel)
continuation.finish(throwing: error)
}
}

continuation.onTermination = { @Sendable _ in
task.cancel()
}
}
}
}

extension APIClient {
// MARK: - Helper Methods

Expand Down

0 comments on commit be63b35

Please sign in to comment.