Skip to content

Commit

Permalink
fix possible fail to dispose and re-lock
Browse files Browse the repository at this point in the history
  • Loading branch information
geoffmacd authored and freak4pc committed Oct 4, 2024
1 parent 2ba3c6a commit 168f887
Showing 1 changed file with 25 additions and 15 deletions.
40 changes: 25 additions & 15 deletions RxSwift/Observables/Merge.swift
Original file line number Diff line number Diff line change
Expand Up @@ -169,20 +169,7 @@ private final class MergeLimitedSinkIter<SourceElement, SourceSequence: Observab
self.parent.dispose()
case .completed:
self.parent.group.remove(for: self.disposeKey)
if let next = self.parent.queue.dequeue() {
_ = CurrentThreadScheduler.instance.schedule(()) { _ in
self.parent.subscribe(next, group: self.parent.group)
return self.parent.group
}
}
else {
self.parent.activeCount -= 1

if self.parent.stopped && self.parent.activeCount == 0 {
self.parent.forwardOn(.completed)
self.parent.dispose()
}
}
self.parent.dequeueNextAndSubscribe()
}
}
}
Expand Down Expand Up @@ -239,7 +226,8 @@ private class MergeLimitedSink<SourceElement, SourceSequence: ObservableConverti
return self.group
}

func subscribe(_ innerSource: SourceSequence, group: CompositeDisposable) {
@discardableResult
func subscribe(_ innerSource: SourceSequence, group: CompositeDisposable) -> Disposable {
let subscription = SingleAssignmentDisposable()

let key = group.insert(subscription)
Expand All @@ -250,6 +238,28 @@ private class MergeLimitedSink<SourceElement, SourceSequence: ObservableConverti
let disposable = innerSource.asObservable().subscribe(observer)
subscription.setDisposable(disposable)
}
return subscription
}

func dequeueNextAndSubscribe() {
if let next = queue.dequeue() {
// subscribing immediately can produce values immediately which can re-enter and cause stack overflows
let disposable = CurrentThreadScheduler.instance.schedule(()) { _ in
// lock again
self.lock.performLocked {
self.subscribe(next, group: self.group)
}
}
_ = group.insert(disposable)
}
else {
activeCount -= 1

if stopped && activeCount == 0 {
forwardOn(.completed)
dispose()
}
}
}

func performMap(_ element: SourceElement) throws -> SourceSequence {
Expand Down

0 comments on commit 168f887

Please sign in to comment.