Skip to content

Commit

Permalink
Add decorators and add error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
garlontas committed Jul 18, 2023
1 parent 0c21643 commit a0d4fd6
Show file tree
Hide file tree
Showing 19 changed files with 672 additions and 97 deletions.
27 changes: 26 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Now you might be wondering why another library when there are already a few impl
* It boasts high speed and efficiency.
* The implementation achieves 100% test coverage.
* It follows Pythonic principles, resulting in clean and readable code.
* It adds some cool innovative features like conditions and an even more declarative look
* It adds some cool innovative features such as conditions or error handling and an even more declarative look.

Let's take a look at a small example:

Expand Down Expand Up @@ -102,6 +102,31 @@ Considering the above characteristics, a stream can be defined as follows:

Conditions provide a convenient means for performing logical operations within your Stream, such as using `filter()`, `take_while()`, `drop_while()`, and more. With PyStreamAPI, you have access to a staggering 111 diverse conditions that enable you to process various data types including strings, types, numbers, and dates. Additionally, PyStreamAPI offers a powerful combiner that allows you to effortlessly combine multiple conditions, facilitating the implementation of highly intricate pipelines.

## Error handling: Work with data that you don't know
PyStreamAPI offers a powerful error handling mechanism that allows you to handle errors in a declarative manner. This is especially useful when working with data that you don't know.

PyStreamAPI offers three different error levels:
- `ErrorLevel.RAISE`: This is the default error level. It will raise an exception if an error occurs.
- `ErrorLevel.IGNORE`: This error level will ignore any errors that occur and won't inform you.
- `ErrorLevel.WARN`: This error level will warn you about any errors that occur and logs them as a warning with default logger.


This is how you can use them:

```python
from pystreamapi import Stream, ErrorLevel

Stream.of([" ", '3', None, "2", 1, ""])
.error_level(ErrorLevel.IGNORE)
.map_to_int()
.sorted()
.for_each(print) # Output: 1 2 3
```

The code above will ignore all errors that occur during mapping to int and will just skip the elements.

For more details on how to use error handling, please refer to the documentation.

## Get started: Installation

To start using PyStreamAPI just install the module with this command:
Expand Down
5 changes: 3 additions & 2 deletions pystreamapi/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from pystreamapi.__stream import Stream
from pystreamapi._streams.error.__levels import ErrorLevel

__version__ = "0.2"
__all__ = ["Stream"]
__version__ = "0.3"
__all__ = ["Stream", "ErrorLevel"]
Empty file.
55 changes: 55 additions & 0 deletions pystreamapi/_itertools/tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# pylint: disable=protected-access
from pystreamapi._streams.error.__error import ErrorHandler, _sentinel


def dropwhile(predicate, iterable, handler: ErrorHandler=None):
"""
Drop items from the iterable while predicate(item) is true.
Afterward, return every element until the iterable is exhausted.
"""
it = iter(iterable)
for x in it:
if handler is not None:
res = handler._one(mapper=predicate, item=x)
else:
res = predicate(x)
if not res and res is not _sentinel:
yield x
break
yield from it


_initial_missing = object()

def reduce(function, sequence, initial=_initial_missing, handler: ErrorHandler=None):
"""
reduce(function, iterable[, initial]) -> value
Apply a function of two arguments cumulatively to the items of a sequence
or iterable, from left to right, to reduce the iterable to a single
value. For example, reduce(lambda x, y: x+y, [1, 2, 3, 4, 5]) calculates
((((1+2)+3)+4)+5). If initial is present, it is placed before the items
of the iterable in the calculation, and serves as a default when the
iterable is empty.
"""

it = iter(sequence)

if initial is _initial_missing:
try:
value = next(it)
except StopIteration:
raise TypeError(
"reduce() of empty iterable with no initial value") from None
else:
value = initial

for element in it:
if handler is not None:
new_value = handler._one(mapper=lambda x: function(value, x), item=element)
if new_value is not _sentinel:
value = new_value
else:
value = function(value, element)

return value
47 changes: 31 additions & 16 deletions pystreamapi/_parallel/fork_and_join.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
# pylint: disable=protected-access
import os

from functools import reduce
from typing import Callable, Any

from joblib import Parallel, delayed
from pystreamapi._parallel.parallelizer import Parallel, delayed
from pystreamapi._streams.error.__error import ErrorHandler
from pystreamapi._streams.error.__levels import ErrorLevel
from pystreamapi._itertools.tools import reduce


class Parallelizer:
Expand All @@ -21,31 +24,44 @@ class Parallelizer:

def __init__(self):
self.__src = None
self.__handler: ErrorHandler | None = None

def set_source(self, src: list):
"""Set the source list
def set_source(self, src: list, handler: ErrorHandler=None):
"""
Set the source list
:param handler: The error handler to be used
:param src: The source list
"""
self.__src = src
self.__handler = handler

def filter(self, function):
"""Parallel filter function"""
parts = self.fork()
result = self.__run_job_in_parallel(parts, self.__filter, function)
if self.__handler is not None and self.__handler._get_error_level() != ErrorLevel.RAISE:
result = self.__run_job_in_parallel(parts, self._filter_ignore_errors, function)
else:
result = self.__run_job_in_parallel(parts, self.__filter, function)
return [item for sublist in result for item in sublist]

@staticmethod
def __filter(function, src):
"""Filter function used in the fork-and-join technology"""
return [element for element in src if function(element)]

def _filter_ignore_errors(self, function, src):
"""Filter function used in the fork-and-join technology using an error handler"""
return [self.__handler._one(condition=function, item=element) for element in src]

def reduce(self, function: Callable[[Any, Any], Any]):
"""Parallel reduce function using functools.reduce behind"""
if len(self.__src) < 2:
return self.__src
parts = self.fork(min_nr_items=2)
result = self.__run_job_in_parallel(parts, reduce, function)
return reduce(function, result)

@staticmethod
def __filter(function, src):
"""Filter function used in the fork-and-join technology"""
return [element for element in src if function(element)]
result = self.__run_job_in_parallel(
parts, lambda x, y: reduce(function=x, sequence=y, handler=self.__handler), function
)
return reduce(function, result, handler=self.__handler)

def fork(self, min_nr_items=1):
"""
Expand Down Expand Up @@ -77,8 +93,7 @@ def __calculate_number_of_parts(self, min_nr_items=1):
return round(len(self.__src) / min_nr_items)
return os.cpu_count() - 2 if os.cpu_count() > 2 else os.cpu_count()

@staticmethod
def __run_job_in_parallel(src, operation, op_function):
def __run_job_in_parallel(self, src, operation, op_function):
"""Run the operation in parallel"""
return Parallel(n_jobs=-1, prefer="processes")(delayed(operation)(op_function, part)
for part in src)
return Parallel(n_jobs=-1, prefer="processes", handler=self.__handler)\
(delayed(operation)(op_function, part) for part in src)
19 changes: 19 additions & 0 deletions pystreamapi/_parallel/parallelizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from joblib import Parallel as _JoblibParallel, delayed # pylint: disable=unused-import

from pystreamapi._streams.error.__error import ErrorHandler
from pystreamapi._streams.error.__levels import ErrorLevel

class Parallel:
"""Wrapper for joblib.Parallel supporting error handling"""

def __init__(self, n_jobs=-1, prefer="processes", handler: ErrorHandler=None):
self.n_jobs = n_jobs
self.prefer = prefer
self.handler = handler

def __call__(self, iterable):
"""Call joblib.Parallel with error handling"""
res = _JoblibParallel(n_jobs=self.n_jobs, prefer=self.prefer)(iterable)
if self.handler and self.handler._get_error_level() != ErrorLevel.RAISE:
return ErrorHandler._remove_sentinel(res)
return res
59 changes: 37 additions & 22 deletions pystreamapi/_streams/__base_stream.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,38 @@
import functools
import itertools
from abc import abstractmethod
from builtins import reversed
from functools import cmp_to_key
from typing import Iterable, Callable, Any, TypeVar, Iterator

from pystreamapi.__optional import Optional
from pystreamapi._lazy.process import Process
from pystreamapi._lazy.queue import ProcessQueue
from pystreamapi.__optional import Optional
from pystreamapi._streams.error.__error import ErrorHandler
from pystreamapi._itertools.tools import dropwhile

K = TypeVar('K')
_V = TypeVar('_V')
_identity_missing = object()


class BaseStream(Iterable[K]):
def terminal(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
self: BaseStream = args[0]
# pylint: disable=protected-access
self._queue.execute_all()
return func(*args, **kwargs)

return wrapper


class BaseStream(Iterable[K], ErrorHandler):
"""
A sequence of elements supporting sequential and parallel aggregate operations.
To perform a computation, stream operations are composed into a stream pipeline. A stream
pipeline consists of a source (which might be an array, a collection, a generator function,
pipeline consists of a source (which might be an iterable, a collection, a generator function,
an I/O channel, etc.), zero or more intermediate operations (which transform a stream into
another stream, such as filter(Predicate)), and a terminal operation (which produces a result
or side effect, such as count() or forEach(Consumer)). Streams are lazy; computation on the
Expand All @@ -30,8 +44,8 @@ def __init__(self, source: Iterable[K]):
self._source = source
self._queue = ProcessQueue()

@terminal
def __iter__(self) -> Iterator[K]:
self._trigger_exec()
return iter(self._source)

@classmethod
Expand Down Expand Up @@ -66,7 +80,7 @@ def drop_while(self, predicate: Callable[[K], bool]) -> 'BaseStream[_V]':

def __drop_while(self, predicate: Callable[[Any], bool]):
"""Drops elements from the stream while the predicate is true."""
self._source = list(itertools.dropwhile(predicate, self._source))
self._source = list(dropwhile(predicate, self._source, self))

def filter(self, predicate: Callable[[K], bool]) -> 'BaseStream[K]':
"""
Expand Down Expand Up @@ -242,80 +256,84 @@ def __take_while(self, predicate: Callable[[Any], bool]):
# Terminal Operations:

@abstractmethod
@terminal
def all_match(self, predicate: Callable[[K], bool]):
"""
Returns whether all elements of this stream match the provided predicate.
:param predicate: The callable predicate
"""

@terminal
def any_match(self, predicate: Callable[[K], bool]):
"""
Returns whether any elements of this stream match the provided predicate.
:param predicate: The callable predicate
"""
self._trigger_exec()
return any(predicate(element) for element in self._source)
return any(self._itr(self._source, predicate))

@terminal
def count(self):
"""
Returns the count of elements in this stream.
:return: Number of elements in the stream
"""
self._trigger_exec()
return len(self._source)

@abstractmethod
@terminal
def find_any(self) -> Optional:
"""
Returns an Optional describing some element of the stream, or an empty Optional if the
stream is empty.
"""

@terminal
def find_first(self):
"""
Returns an Optional describing the first element of this stream, or an empty Optional if
the stream is empty. :return:
"""
self._trigger_exec()
if len(self._source) > 0:
return Optional.of(self._source[0])
return Optional.empty()

@abstractmethod
def for_each(self, predicate: Callable):
@terminal
def for_each(self, action: Callable):
"""
Performs an action for each element of this stream.
:param predicate:
:param action:
"""

@terminal
def none_match(self, predicate: Callable[[K], bool]):
"""
Returns whether no elements of this stream match the provided predicate.
:param predicate:
"""
self._trigger_exec()
return not any(predicate(element) for element in self._source)
return not any(self._itr(self._source, predicate))

@terminal
def min(self):
"""Returns the minimum element of this stream."""
self._trigger_exec()
if len(self._source) > 0:
return Optional.of(min(self._source))
return Optional.empty()

@terminal
def max(self):
"""Returns the maximum element of this stream."""
self._trigger_exec()
if len(self._source) > 0:
return Optional.of(max(self._source))
return Optional.empty()

@abstractmethod
@terminal
def reduce(self, predicate: Callable[[K, K], K], identity=_identity_missing,
depends_on_state=False) -> Optional:
"""
Expand All @@ -327,30 +345,27 @@ def reduce(self, predicate: Callable[[K, K], K], identity=_identity_missing,
:param identity: Default value
"""

@terminal
def to_list(self):
"""Accumulates the elements of this stream into a List."""
self._trigger_exec()
return list(self._source)

@terminal
def to_tuple(self):
"""Accumulates the elements of this stream into a Tuple."""
self._trigger_exec()
return tuple(self._source)

@terminal
def to_set(self):
"""Accumulates the elements of this stream into a Set."""
self._trigger_exec()
return set(self._source)

@abstractmethod
@terminal
def to_dict(self, key_mapper: Callable[[K], Any]) -> dict:
"""
Returns a dictionary consisting of the results of grouping the elements of this stream
by the given classifier.
:param key_mapper:
"""

def _trigger_exec(self):
"""Triggers execution of the stream."""
self._queue.execute_all()
Loading

0 comments on commit a0d4fd6

Please sign in to comment.