Skip to content

daliclass/rxpy-backpressure

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

17 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

rxpy-backpressure

Back pressure strategies for use with RxPy

Goals

This projects goal is to provide a simple api that allows the addition of backpressure strategies with RxPy.

Getting Started

To get started pull the code from pypi with a pip install rxpy_backpressure the project can be found here

API Examples

Example of usage of the API with rxpy 1.6

from rxpy_backpressure import BackPressure

image_observable
    .observe_on(scheduler)
    .subscribe(BackPressure.LATEST(observer))

Example of usage of the API with rxpy ~3

from rxpy_backpressure import BackPressure

image_observable
    .subscribe(BackPressure.LATEST(observer), scheduler)

If you would like to quickly get started with the library a runnable example can be found here rxpy-backpressure-example

Strategies

Latest

Latest strategy will remember the next most recent message to process and will call the observer with it when the observer has finished processing its current message.

Drop

Drop strategy accepts a cache_size, the strategy will remember the most recent messages and remove older messages from the cache. The strategy guarantees that the oldest messages in the cache are passed to the observer first.

Buffer

Buffer strategy has a unbounded cache and will pass all messages to its consumer in the order it received them beware of Memory leaks due to a build up of messages.

Sized Buffer

Sized buffer has a fix sized cache, the strategy will perform opposite of Drop and will refuse new messages as long as the buffer is full and will accept them only once the buffer has available space.

Considerations

This project supports python 3.6+ as rxpy 3.0.0+ does not support earlier versions. If there is a case to support previous versions this can be discussed however the typing introduced in python 3.6 is very nice.