-
I'm considering using one single ThreadPoolScheduler for all pipelines in my program, seems that In the document I see the following:
Since my pipeline contains combining operators such as Briefly, if I add only one BTW, I'm using explicitly blocking execution in the very end, so the # the final consumption
source.pipe(op.subscribe_on(pool_scheduler), op.do(rx.Observer(on_next=process_plan_result))).run() Thanks ahead. |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
It looks like that the answer is true, Observables at the start the the "execution tree" will follow the setting set at end: def my_concurrency_test():
pool_scheduler = ThreadPoolScheduler(2)
def num_map(num: int):
print(f'[num] {current_thread().name} : {num}')
return num
num = rx.range(1, 5).pipe(op.map(num_map))
def name_map(name: str):
print(f'[name] {current_thread().name} : {name}')
return name
name = rx.from_iterable(["Alpha", "Beta", "Gamma", "Delta", "Epsilon"]).pipe(op.map(name_map))
zipped = rx.zip(num, name)
merged = rx.merge(num, name)
merged.pipe(op.subscribe_on(pool_scheduler)).subscribe(
on_next=lambda i: print("consumer: {0} {1}".format(current_thread().name, i))
)
input("Press Enter key to exit\n")
# Press Enter key to exit
# [num] ThreadPoolExecutor-0_0 : 1
# consumer: ThreadPoolExecutor-0_0 1
# [name] ThreadPoolExecutor-0_0 : Alpha
# consumer: ThreadPoolExecutor-0_0 Alpha
# [name] ThreadPoolExecutor-0_0 : Beta
# consumer: ThreadPoolExecutor-0_0 Beta
# [name] ThreadPoolExecutor-0_0 : Gamma
# consumer: ThreadPoolExecutor-0_0 Gamma
# [name] ThreadPoolExecutor-0_0 : Delta
# consumer: ThreadPoolExecutor-0_0 Delta
# [name] ThreadPoolExecutor-0_0 : Epsilon
# consumer: ThreadPoolExecutor-0_0 Epsilon
# [num] ThreadPoolExecutor-0_0 : 2
# consumer: ThreadPoolExecutor-0_0 2
# [num] ThreadPoolExecutor-0_0 : 3
# consumer: ThreadPoolExecutor-0_0 3
# [num] ThreadPoolExecutor-0_0 : 4
# consumer: ThreadPoolExecutor-0_0 4 |
Beta Was this translation helpful? Give feedback.
It looks like that the answer is true, Observables at the start the the "execution tree" will follow the setting set at end: