diff --git a/README.md b/README.md index 0ed74ef..080f286 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,7 @@ Now you might be wondering why another library when there are already a few impl * It boasts high speed and efficiency. * The implementation achieves 100% test coverage. * It follows Pythonic principles, resulting in clean and readable code. -* It adds some cool innovative features like conditions and an even more declarative look +* It adds some cool innovative features such as conditions or error handling and an even more declarative look. Let's take a look at a small example: @@ -102,6 +102,31 @@ Considering the above characteristics, a stream can be defined as follows: Conditions provide a convenient means for performing logical operations within your Stream, such as using `filter()`, `take_while()`, `drop_while()`, and more. With PyStreamAPI, you have access to a staggering 111 diverse conditions that enable you to process various data types including strings, types, numbers, and dates. Additionally, PyStreamAPI offers a powerful combiner that allows you to effortlessly combine multiple conditions, facilitating the implementation of highly intricate pipelines. +## Error handling: Work with data that you don't know +PyStreamAPI offers a powerful error handling mechanism that allows you to handle errors in a declarative manner. This is especially useful when working with data that you don't know. + +PyStreamAPI offers three different error levels: +- `ErrorLevel.RAISE`: This is the default error level. It will raise an exception if an error occurs. +- `ErrorLevel.IGNORE`: This error level will ignore any errors that occur and won't inform you. +- `ErrorLevel.WARN`: This error level will warn you about any errors that occur and logs them as a warning with default logger. + + +This is how you can use them: + +```python +from pystreamapi import Stream, ErrorLevel + +Stream.of([" ", '3', None, "2", 1, ""]) +.error_level(ErrorLevel.IGNORE) +.map_to_int() +.sorted() +.for_each(print) # Output: 1 2 3 +``` + +The code above will ignore all errors that occur during mapping to int and will just skip the elements. + +For more details on how to use error handling, please refer to the documentation. + ## Get started: Installation To start using PyStreamAPI just install the module with this command: diff --git a/pystreamapi/__init__.py b/pystreamapi/__init__.py index 9c9884a..62fbddb 100644 --- a/pystreamapi/__init__.py +++ b/pystreamapi/__init__.py @@ -1,4 +1,5 @@ from pystreamapi.__stream import Stream +from pystreamapi._streams.error.__levels import ErrorLevel -__version__ = "0.2" -__all__ = ["Stream"] +__version__ = "0.3" +__all__ = ["Stream", "ErrorLevel"] diff --git a/pystreamapi/_itertools/__init__.py b/pystreamapi/_itertools/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pystreamapi/_itertools/tools.py b/pystreamapi/_itertools/tools.py new file mode 100644 index 0000000..16a81ff --- /dev/null +++ b/pystreamapi/_itertools/tools.py @@ -0,0 +1,55 @@ +# pylint: disable=protected-access +from pystreamapi._streams.error.__error import ErrorHandler, _sentinel + + +def dropwhile(predicate, iterable, handler: ErrorHandler=None): + """ + Drop items from the iterable while predicate(item) is true. + Afterward, return every element until the iterable is exhausted. + """ + it = iter(iterable) + for x in it: + if handler is not None: + res = handler._one(mapper=predicate, item=x) + else: + res = predicate(x) + if not res and res is not _sentinel: + yield x + break + yield from it + + +_initial_missing = object() + +def reduce(function, sequence, initial=_initial_missing, handler: ErrorHandler=None): + """ + reduce(function, iterable[, initial]) -> value + + Apply a function of two arguments cumulatively to the items of a sequence + or iterable, from left to right, to reduce the iterable to a single + value. For example, reduce(lambda x, y: x+y, [1, 2, 3, 4, 5]) calculates + ((((1+2)+3)+4)+5). If initial is present, it is placed before the items + of the iterable in the calculation, and serves as a default when the + iterable is empty. + """ + + it = iter(sequence) + + if initial is _initial_missing: + try: + value = next(it) + except StopIteration: + raise TypeError( + "reduce() of empty iterable with no initial value") from None + else: + value = initial + + for element in it: + if handler is not None: + new_value = handler._one(mapper=lambda x: function(value, x), item=element) + if new_value is not _sentinel: + value = new_value + else: + value = function(value, element) + + return value diff --git a/pystreamapi/_parallel/fork_and_join.py b/pystreamapi/_parallel/fork_and_join.py index 02a7a97..d405ead 100644 --- a/pystreamapi/_parallel/fork_and_join.py +++ b/pystreamapi/_parallel/fork_and_join.py @@ -1,9 +1,12 @@ +# pylint: disable=protected-access import os -from functools import reduce from typing import Callable, Any -from joblib import Parallel, delayed +from pystreamapi._parallel.parallelizer import Parallel, delayed +from pystreamapi._streams.error.__error import ErrorHandler +from pystreamapi._streams.error.__levels import ErrorLevel +from pystreamapi._itertools.tools import reduce class Parallelizer: @@ -21,31 +24,44 @@ class Parallelizer: def __init__(self): self.__src = None + self.__handler: ErrorHandler | None = None - def set_source(self, src: list): - """Set the source list + def set_source(self, src: list, handler: ErrorHandler=None): + """ + Set the source list + :param handler: The error handler to be used :param src: The source list """ self.__src = src + self.__handler = handler def filter(self, function): """Parallel filter function""" parts = self.fork() - result = self.__run_job_in_parallel(parts, self.__filter, function) + if self.__handler is not None and self.__handler._get_error_level() != ErrorLevel.RAISE: + result = self.__run_job_in_parallel(parts, self._filter_ignore_errors, function) + else: + result = self.__run_job_in_parallel(parts, self.__filter, function) return [item for sublist in result for item in sublist] + @staticmethod + def __filter(function, src): + """Filter function used in the fork-and-join technology""" + return [element for element in src if function(element)] + + def _filter_ignore_errors(self, function, src): + """Filter function used in the fork-and-join technology using an error handler""" + return [self.__handler._one(condition=function, item=element) for element in src] + def reduce(self, function: Callable[[Any, Any], Any]): """Parallel reduce function using functools.reduce behind""" if len(self.__src) < 2: return self.__src parts = self.fork(min_nr_items=2) - result = self.__run_job_in_parallel(parts, reduce, function) - return reduce(function, result) - - @staticmethod - def __filter(function, src): - """Filter function used in the fork-and-join technology""" - return [element for element in src if function(element)] + result = self.__run_job_in_parallel( + parts, lambda x, y: reduce(function=x, sequence=y, handler=self.__handler), function + ) + return reduce(function, result, handler=self.__handler) def fork(self, min_nr_items=1): """ @@ -77,8 +93,7 @@ def __calculate_number_of_parts(self, min_nr_items=1): return round(len(self.__src) / min_nr_items) return os.cpu_count() - 2 if os.cpu_count() > 2 else os.cpu_count() - @staticmethod - def __run_job_in_parallel(src, operation, op_function): + def __run_job_in_parallel(self, src, operation, op_function): """Run the operation in parallel""" - return Parallel(n_jobs=-1, prefer="processes")(delayed(operation)(op_function, part) - for part in src) + return Parallel(n_jobs=-1, prefer="processes", handler=self.__handler)\ + (delayed(operation)(op_function, part) for part in src) diff --git a/pystreamapi/_parallel/parallelizer.py b/pystreamapi/_parallel/parallelizer.py new file mode 100644 index 0000000..2e444ad --- /dev/null +++ b/pystreamapi/_parallel/parallelizer.py @@ -0,0 +1,19 @@ +from joblib import Parallel as _JoblibParallel, delayed # pylint: disable=unused-import + +from pystreamapi._streams.error.__error import ErrorHandler +from pystreamapi._streams.error.__levels import ErrorLevel + +class Parallel: + """Wrapper for joblib.Parallel supporting error handling""" + + def __init__(self, n_jobs=-1, prefer="processes", handler: ErrorHandler=None): + self.n_jobs = n_jobs + self.prefer = prefer + self.handler = handler + + def __call__(self, iterable): + """Call joblib.Parallel with error handling""" + res = _JoblibParallel(n_jobs=self.n_jobs, prefer=self.prefer)(iterable) + if self.handler and self.handler._get_error_level() != ErrorLevel.RAISE: + return ErrorHandler._remove_sentinel(res) + return res diff --git a/pystreamapi/_streams/__base_stream.py b/pystreamapi/_streams/__base_stream.py index 9a7df55..6d82f58 100644 --- a/pystreamapi/_streams/__base_stream.py +++ b/pystreamapi/_streams/__base_stream.py @@ -1,24 +1,38 @@ +import functools import itertools from abc import abstractmethod from builtins import reversed from functools import cmp_to_key from typing import Iterable, Callable, Any, TypeVar, Iterator +from pystreamapi.__optional import Optional from pystreamapi._lazy.process import Process from pystreamapi._lazy.queue import ProcessQueue -from pystreamapi.__optional import Optional +from pystreamapi._streams.error.__error import ErrorHandler +from pystreamapi._itertools.tools import dropwhile K = TypeVar('K') _V = TypeVar('_V') _identity_missing = object() -class BaseStream(Iterable[K]): +def terminal(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + self: BaseStream = args[0] + # pylint: disable=protected-access + self._queue.execute_all() + return func(*args, **kwargs) + + return wrapper + + +class BaseStream(Iterable[K], ErrorHandler): """ A sequence of elements supporting sequential and parallel aggregate operations. To perform a computation, stream operations are composed into a stream pipeline. A stream - pipeline consists of a source (which might be an array, a collection, a generator function, + pipeline consists of a source (which might be an iterable, a collection, a generator function, an I/O channel, etc.), zero or more intermediate operations (which transform a stream into another stream, such as filter(Predicate)), and a terminal operation (which produces a result or side effect, such as count() or forEach(Consumer)). Streams are lazy; computation on the @@ -30,8 +44,8 @@ def __init__(self, source: Iterable[K]): self._source = source self._queue = ProcessQueue() + @terminal def __iter__(self) -> Iterator[K]: - self._trigger_exec() return iter(self._source) @classmethod @@ -66,7 +80,7 @@ def drop_while(self, predicate: Callable[[K], bool]) -> 'BaseStream[_V]': def __drop_while(self, predicate: Callable[[Any], bool]): """Drops elements from the stream while the predicate is true.""" - self._source = list(itertools.dropwhile(predicate, self._source)) + self._source = list(dropwhile(predicate, self._source, self)) def filter(self, predicate: Callable[[K], bool]) -> 'BaseStream[K]': """ @@ -242,6 +256,7 @@ def __take_while(self, predicate: Callable[[Any], bool]): # Terminal Operations: @abstractmethod + @terminal def all_match(self, predicate: Callable[[K], bool]): """ Returns whether all elements of this stream match the provided predicate. @@ -249,73 +264,76 @@ def all_match(self, predicate: Callable[[K], bool]): :param predicate: The callable predicate """ + @terminal def any_match(self, predicate: Callable[[K], bool]): """ Returns whether any elements of this stream match the provided predicate. :param predicate: The callable predicate """ - self._trigger_exec() - return any(predicate(element) for element in self._source) + return any(self._itr(self._source, predicate)) + @terminal def count(self): """ Returns the count of elements in this stream. :return: Number of elements in the stream """ - self._trigger_exec() return len(self._source) @abstractmethod + @terminal def find_any(self) -> Optional: """ Returns an Optional describing some element of the stream, or an empty Optional if the stream is empty. """ + @terminal def find_first(self): """ Returns an Optional describing the first element of this stream, or an empty Optional if the stream is empty. :return: """ - self._trigger_exec() if len(self._source) > 0: return Optional.of(self._source[0]) return Optional.empty() @abstractmethod - def for_each(self, predicate: Callable): + @terminal + def for_each(self, action: Callable): """ Performs an action for each element of this stream. - :param predicate: + :param action: """ + @terminal def none_match(self, predicate: Callable[[K], bool]): """ Returns whether no elements of this stream match the provided predicate. :param predicate: """ - self._trigger_exec() - return not any(predicate(element) for element in self._source) + return not any(self._itr(self._source, predicate)) + @terminal def min(self): """Returns the minimum element of this stream.""" - self._trigger_exec() if len(self._source) > 0: return Optional.of(min(self._source)) return Optional.empty() + @terminal def max(self): """Returns the maximum element of this stream.""" - self._trigger_exec() if len(self._source) > 0: return Optional.of(max(self._source)) return Optional.empty() @abstractmethod + @terminal def reduce(self, predicate: Callable[[K, K], K], identity=_identity_missing, depends_on_state=False) -> Optional: """ @@ -327,22 +345,23 @@ def reduce(self, predicate: Callable[[K, K], K], identity=_identity_missing, :param identity: Default value """ + @terminal def to_list(self): """Accumulates the elements of this stream into a List.""" - self._trigger_exec() return list(self._source) + @terminal def to_tuple(self): """Accumulates the elements of this stream into a Tuple.""" - self._trigger_exec() return tuple(self._source) + @terminal def to_set(self): """Accumulates the elements of this stream into a Set.""" - self._trigger_exec() return set(self._source) @abstractmethod + @terminal def to_dict(self, key_mapper: Callable[[K], Any]) -> dict: """ Returns a dictionary consisting of the results of grouping the elements of this stream @@ -350,7 +369,3 @@ def to_dict(self, key_mapper: Callable[[K], Any]) -> dict: :param key_mapper: """ - - def _trigger_exec(self): - """Triggers execution of the stream.""" - self._queue.execute_all() diff --git a/pystreamapi/_streams/__parallel_stream.py b/pystreamapi/_streams/__parallel_stream.py index 91099ef..e122671 100644 --- a/pystreamapi/_streams/__parallel_stream.py +++ b/pystreamapi/_streams/__parallel_stream.py @@ -1,7 +1,10 @@ +from collections import defaultdict from functools import reduce as seq_reduce from typing import Callable, Any, Iterable -from joblib import Parallel, delayed +from pystreamapi._streams.__base_stream import terminal +from pystreamapi._parallel.parallelizer import Parallel, delayed + from pystreamapi.__optional import Optional import pystreamapi._streams.__base_stream as stream @@ -17,57 +20,55 @@ def __init__(self, source: Iterable[stream.K]): super().__init__(source) self._parallelizer = Parallelizer() + @terminal def all_match(self, predicate: Callable[[Any], bool]): - self._trigger_exec() - return all(Parallel(n_jobs=-1, prefer="threads")(delayed(predicate)(element) - for element in self._source)) + return all(Parallel(n_jobs=-1, prefer="threads", handler=self) + (delayed(self.__mapper(predicate))(element) for element in self._source)) def _filter(self, predicate: Callable[[Any], bool]): self._set_parallelizer_src() self._source = self._parallelizer.filter(predicate) + @terminal def find_any(self): - self._trigger_exec() if len(self._source) > 0: return Optional.of(self._source[0]) return Optional.empty() def _flat_map(self, predicate: Callable[[Any], stream.BaseStream]): new_src = [] - for element in Parallel(n_jobs=-1, prefer="threads")(delayed(predicate)(element) - for element in self._source): + for element in Parallel(n_jobs=-1, prefer="threads", handler=self)\ + (delayed(self.__mapper(predicate))(element) for element in self._source): new_src.extend(element.to_list()) self._source = new_src def _group_to_dict(self, key_mapper: Callable[[Any], Any]): - groups = {} + groups = defaultdict(list) def process_element(element): key = key_mapper(element) - if key not in groups: - groups[key] = [] groups[key].append(element) - Parallel(n_jobs=-1, prefer="threads")(delayed(process_element)(element) - for element in self._source) + Parallel(n_jobs=-1, prefer="threads", handler=self)\ + (delayed(self.__mapper(process_element))(element) for element in self._source) return groups - def for_each(self, predicate: Callable): - self._trigger_exec() - Parallel(n_jobs=-1, prefer="threads")(delayed(predicate)(element) + @terminal + def for_each(self, action: Callable): + Parallel(n_jobs=-1, prefer="threads", handler=self)(delayed(self.__mapper(action))(element) for element in self._source) def _map(self, mapper: Callable[[Any], Any]): - self._source = Parallel(n_jobs=-1, prefer="threads")(delayed(mapper)(element) - for element in self._source) + self._source = Parallel(n_jobs=-1, prefer="threads", handler=self)\ + (delayed(self.__mapper(mapper))(element) for element in self._source) def _peek(self, action: Callable): - Parallel(n_jobs=-1, prefer="threads")(delayed(action)(element) + Parallel(n_jobs=-1, prefer="threads", handler=self)(delayed(self.__mapper(action))(element) for element in self._source) + @terminal def reduce(self, predicate: Callable[[Any, Any], Any], identity=_identity_missing, depends_on_state=False): - self._trigger_exec() self._set_parallelizer_src() reduce_func = seq_reduce if depends_on_state else self.__reduce if len(self._source) > 0: @@ -79,9 +80,12 @@ def reduce(self, predicate: Callable[[Any, Any], Any], identity=_identity_missin def __reduce(self, pred, _): return self._parallelizer.reduce(pred) + @terminal def to_dict(self, key_mapper: Callable[[Any], Any]) -> dict: - self._trigger_exec() return dict(self._group_to_dict(key_mapper)) def _set_parallelizer_src(self): - self._parallelizer.set_source(self._source) + self._parallelizer.set_source(self._source, self) + + def __mapper(self, mapper): + return lambda x: self._one(mapper=mapper, item=x) diff --git a/pystreamapi/_streams/__sequential_stream.py b/pystreamapi/_streams/__sequential_stream.py index 099ba18..ef876aa 100644 --- a/pystreamapi/_streams/__sequential_stream.py +++ b/pystreamapi/_streams/__sequential_stream.py @@ -1,9 +1,10 @@ -from functools import reduce +from collections import defaultdict from typing import Callable, Any -from pystreamapi.__optional import Optional - import pystreamapi._streams.__base_stream as stream +from pystreamapi.__optional import Optional +from pystreamapi._streams.error.__error import _sentinel +from pystreamapi._itertools.tools import reduce _identity_missing = object() @@ -11,54 +12,53 @@ class SequentialStream(stream.BaseStream): """The sequential implementation of BaseStream""" + @stream.terminal def all_match(self, predicate: Callable[[Any], bool]): - self._trigger_exec() - return all(predicate(element) for element in self._source) + return all(self._itr(self._source, mapper=predicate)) def _filter(self, predicate: Callable[[Any], bool]): - self._source = [element for element in self._source if predicate(element)] + self._source = self._itr(self._source, condition=predicate) + @stream.terminal def find_any(self): - self._trigger_exec() if len(self._source) > 0: return Optional.of(self._source[0]) return Optional.empty() def _flat_map(self, predicate: Callable[[Any], stream.BaseStream]): new_src = [] - for element in [predicate(element) for element in self._source]: + for element in self._itr(self._source, mapper=predicate): new_src.extend(element.to_list()) self._source = new_src def _group_to_dict(self, key_mapper: Callable[[Any], Any]): - groups = {} + groups = defaultdict(list) + for element in self._source: - key = key_mapper(element) - if key not in groups: - groups[key] = [] + key = self._one(mapper=key_mapper, item=element) + if key == _sentinel: + continue groups[key].append(element) return groups - def for_each(self, predicate: Callable): - self._trigger_exec() - for element in self._source: - predicate(element) + @stream.terminal + def for_each(self, action: Callable): + self._itr(self._source, mapper=action) def _map(self, mapper: Callable[[Any], Any]): - self._source = [mapper(element) for element in self._source] + self._source = self._itr(self._source, mapper=mapper) def _peek(self, action: Callable): - for element in self._source: - action(element) + self._itr(self._source, mapper=action) + @stream.terminal def reduce(self, predicate: Callable, identity=_identity_missing, depends_on_state=False): - self._trigger_exec() if len(self._source) > 0: if identity is not _identity_missing: return reduce(predicate, self._source) - return Optional.of(reduce(predicate, self._source)) + return Optional.of(reduce(predicate, self._source, handler=self)) return identity if identity is not _identity_missing else Optional.empty() + @stream.terminal def to_dict(self, key_mapper: Callable[[Any], Any]) -> dict: - self._trigger_exec() return self._group_to_dict(key_mapper) diff --git a/pystreamapi/_streams/error/__error.py b/pystreamapi/_streams/error/__error.py new file mode 100644 index 0000000..fc88a3c --- /dev/null +++ b/pystreamapi/_streams/error/__error.py @@ -0,0 +1,97 @@ +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING +from pystreamapi._streams.error.__levels import ErrorLevel +from pystreamapi._streams.error.__sentinel import Sentinel + +if TYPE_CHECKING: + # Avoid circular import + from pystreamapi._streams.__base_stream import BaseStream + +_sentinel = Sentinel() + +def nothing(sth): + """Do not modify the input""" + return sth + +def true_condition(_): + """Always return True""" + return True + +class ErrorHandler: + """Handle errors in stream operations""" + + __error_level = ErrorLevel.RAISE + __exceptions_to_ignore = (Exception,) + + def error_level(self, level: ErrorLevel, *exceptions) -> BaseStream: + """ + Set the error level + :param level: Error level from ErrorLevel + :param exceptions: Exceptions to ignore. If not provided, all exceptions will be ignored + :return: The stream itself + """ + self.__exceptions_to_ignore = exceptions or (Exception,) + self.__error_level = level + return self + + def _get_error_level(self): + """Get the error level""" + return self.__error_level + + def _itr(self, src, mapper = nothing, condition = true_condition) -> list: + """Iterate over the source and apply the mapper and condition""" + new_src = [] + for i in src: + try: + if condition(i): + new_src.append(mapper(i)) + except self.__exceptions_to_ignore as e: + if self.__error_level == ErrorLevel.RAISE: + raise e + if self.__error_level == ErrorLevel.IGNORE: + continue + if self.__error_level == ErrorLevel.WARN: + self.__log(e) + return new_src + + def _one(self, mapper=nothing, condition=true_condition, item=None): + """ + Apply the mapper and condition to the item. + If any exception occurs, handle it according to the error level + (IGNORE, WARN: return _sentinel, RAISE: raise the exception) + :param mapper: Method to apply to the item + :param condition: Condition to check before applying the mapper + :param item: Item to apply the mapper and condition + :return: The result of the mapper if the condition is True, otherwise return _sentinel + """ + try: + if condition(item): + return mapper(item) + except self.__exceptions_to_ignore as e: + if self.__error_level == ErrorLevel.RAISE: + raise e + if self.__error_level == ErrorLevel.IGNORE: + return _sentinel + if self.__error_level == ErrorLevel.WARN: + self.__log(e) + return _sentinel + + @staticmethod + def _remove_sentinel(src: list): + """Remove the sentinels from the list and its sublists""" + result = [] + for item in src: + if isinstance(item, list): + # Recursively remove sentinel from sublist + sublist = ErrorHandler._remove_sentinel(item) + result.append(sublist) + elif not isinstance(item, Sentinel): + result.append(item) + return result + + @staticmethod + def __log(exception: Exception): + """Log the exception""" + logging.warning("An exception has been ignored: %s", exception) diff --git a/pystreamapi/_streams/error/__levels.py b/pystreamapi/_streams/error/__levels.py new file mode 100644 index 0000000..59cb671 --- /dev/null +++ b/pystreamapi/_streams/error/__levels.py @@ -0,0 +1,10 @@ +class ErrorLevel: + """ + PyStreamAPI error levels. + RAISE: raise an exception + IGNORE: ignore the error, skip the item + WARN: print a warning and ignore the error + """ + RAISE = 0 + IGNORE = 1 + WARN = 2 diff --git a/pystreamapi/_streams/error/__sentinel.py b/pystreamapi/_streams/error/__sentinel.py new file mode 100644 index 0000000..6a8c963 --- /dev/null +++ b/pystreamapi/_streams/error/__sentinel.py @@ -0,0 +1,8 @@ +class Sentinel: + """A class used to represent a sentinel value.""" + + def __eq__(self, other): + return isinstance(other, Sentinel) + + def __ne__(self, other): + return not isinstance(other, Sentinel) diff --git a/pystreamapi/_streams/numeric/__numeric_base_stream.py b/pystreamapi/_streams/numeric/__numeric_base_stream.py index a42684f..f25985f 100644 --- a/pystreamapi/_streams/numeric/__numeric_base_stream.py +++ b/pystreamapi/_streams/numeric/__numeric_base_stream.py @@ -2,7 +2,7 @@ from collections import Counter from typing import Union -from pystreamapi._streams.__base_stream import BaseStream +from pystreamapi._streams.__base_stream import BaseStream, terminal class NumericBaseStream(BaseStream, ABC): @@ -12,36 +12,37 @@ class NumericBaseStream(BaseStream, ABC): to such data. """ + @terminal def interquartile_range(self) -> Union[float, int, None]: """ Calculates the iterquartile range of a numerical Stream :return: The iterquartile range, can be int or float """ - self._trigger_exec() return self.third_quartile() - self.first_quartile() if len(self._source) > 0 else None + @terminal def first_quartile(self) -> Union[float, int, None]: """ Calculates the first quartile of a numerical Stream :return: The first quartile, can be int or float """ - self._trigger_exec() self._source = sorted(self._source) return self.__median(self._source[:(len(self._source)) // 2]) @abstractmethod + @terminal def mean(self) -> Union[float, int]: """ Calculates the mean of a numerical Stream :return: The mean, can be int or float """ + @terminal def median(self) -> Union[float, int, None]: """ Calculates the median of a numerical Stream :return: The median, can be int or float """ - self._trigger_exec() return self.__median(self._source) @staticmethod @@ -55,38 +56,39 @@ def __median(source) -> Union[float, int, None]: return (source[midpoint] + source[midpoint - 1]) / 2 return source[midpoint] + @terminal def mode(self) -> Union[list[Union[int, float]], None]: """ Calculates the mode(s) (most frequently occurring element) of a numerical Stream :return: The mode, can be int or float """ - self._trigger_exec() frequency = Counter(self._source) if not frequency: return None max_frequency = max(frequency.values()) return [number for number, count in frequency.items() if count == max_frequency] + @terminal def range(self) -> Union[float, int, None]: """ Calculates the range of a numerical Stream :return: The range, can be int or float """ - self._trigger_exec() return max(self._source) - min(self._source) if len(self._source) > 0 else None @abstractmethod + @terminal def sum(self) -> Union[float, int, None]: """ Calculates the sum of all items of a numerical stream :return: The sum, can be int or float """ + @terminal def third_quartile(self) -> Union[float, int, None]: """ Calculates the third quartile of a numerical Stream :return: The third quartile, can be int or float """ - self._trigger_exec() self._source = sorted(self._source) return self.__median(self._source[(len(self._source) + 1) // 2:]) diff --git a/pystreamapi/_streams/numeric/__parallel_numeric_stream.py b/pystreamapi/_streams/numeric/__parallel_numeric_stream.py index b54ecdd..fd4aea9 100644 --- a/pystreamapi/_streams/numeric/__parallel_numeric_stream.py +++ b/pystreamapi/_streams/numeric/__parallel_numeric_stream.py @@ -1,5 +1,6 @@ from typing import Union +from pystreamapi._streams.__base_stream import terminal from pystreamapi._streams.__parallel_stream import ParallelStream from pystreamapi._streams.numeric.__numeric_base_stream import NumericBaseStream @@ -7,17 +8,18 @@ class ParallelNumericStream(NumericBaseStream, ParallelStream): """Numeric Stream with parallel implementation""" + @terminal def mean(self) -> Union[float, int, None]: """Calculates mean of values""" - self._trigger_exec() return self.__sum() / len(self._source) if len(self._source) > 0 else None + @terminal def sum(self) -> Union[float, int, None]: """Calculates the sum of values""" - self._trigger_exec() _sum = self.__sum() return 0 if _sum == [] else _sum + @terminal def __sum(self): """Parallel sum method""" self._set_parallelizer_src() diff --git a/pystreamapi/_streams/numeric/__sequential_numeric_stream.py b/pystreamapi/_streams/numeric/__sequential_numeric_stream.py index d6873a3..82af239 100644 --- a/pystreamapi/_streams/numeric/__sequential_numeric_stream.py +++ b/pystreamapi/_streams/numeric/__sequential_numeric_stream.py @@ -1,5 +1,6 @@ from typing import Union +from pystreamapi._streams.__base_stream import terminal from pystreamapi._streams.__sequential_stream import SequentialStream from pystreamapi._streams.numeric.__numeric_base_stream import NumericBaseStream @@ -7,12 +8,12 @@ class SequentialNumericStream(NumericBaseStream, SequentialStream): """Numeric Stream with sequential implementation""" + @terminal def mean(self) -> Union[float, int, None]: """Calculates mean of values""" - self._trigger_exec() return sum(self._source) / len(self._source) if len(self._source) > 0 else None + @terminal def sum(self) -> Union[float, int, None]: """Calculates the sum of values""" - self._trigger_exec() return sum(self._source) diff --git a/tests/test_error_handler.py b/tests/test_error_handler.py new file mode 100644 index 0000000..7bca211 --- /dev/null +++ b/tests/test_error_handler.py @@ -0,0 +1,122 @@ +# pylint: disable=protected-access +from unittest import TestCase + +from pystreamapi._streams.error.__error import ErrorHandler, _sentinel +from pystreamapi._streams.error.__sentinel import Sentinel +from pystreamapi._streams.error.__levels import ErrorLevel + + +class ErrorHandlerImpl(ErrorHandler): + pass + +class TestErrorLevelMeta(TestCase): + + def setUp(self) -> None: + self.handler = ErrorHandlerImpl() + + def test_iterate_raise(self): + self.handler.error_level(ErrorLevel.RAISE) + self.assertRaises(ValueError, lambda: self.handler._itr([1, 2, 3, 4, 5, "a"], int)) + + def test_iterate_raise_with_condition(self): + self.handler.error_level(ErrorLevel.RAISE) + self.assertRaises(ValueError, lambda: self.handler._itr( + [1, 2, 3, 4, 5, "a"], int, lambda x: x != "")) + + def test_iterate_ignore(self): + self.handler.error_level(ErrorLevel.IGNORE) + self.assertEqual(self.handler._itr([1, 2, 3, 4, 5, "a"], int), [1, 2, 3, 4, 5]) + + def test_iterate_ignore_with_condition(self): + self.handler.error_level(ErrorLevel.IGNORE) + self.assertEqual(self.handler._itr( + [1, 2, 3, 4, 5, "a"], int, lambda x: x != ""), [1, 2, 3, 4, 5]) + + + def test_iterate_ignore_specific_exceptions(self): + self.handler.error_level(ErrorLevel.IGNORE, ValueError, AttributeError) + self.assertEqual(self.handler._itr( + ["b", 2, 3, 4, 5, "a"], mapper=lambda x: x.split()), [["b"], ["a"]]) + + + def test_iterate_ignore_specific_exception_raise_another(self): + self.handler.error_level(ErrorLevel.IGNORE, ValueError) + self.assertRaises(AttributeError, lambda: self.handler._itr( + ["b", 2, 3, 4, 5, "a"], mapper=lambda x: x.split())) + + def test_iterate_warn(self): + self.handler.error_level(ErrorLevel.WARN) + self.assertEqual(self.handler._itr([1, 2, 3, 4, 5, "a"], int), [1, 2, 3, 4, 5]) + + def test_iterate_warn_with_condition(self): + self.handler.error_level(ErrorLevel.WARN) + self.assertEqual(self.handler._itr( + [1, 2, 3, 4, 5, "a"], int, lambda x: x != ""), [1, 2, 3, 4, 5]) + + def test_one_raise(self): + self.handler.error_level(ErrorLevel.RAISE) + self.assertRaises(ValueError, lambda: self.handler._one(mapper=int, item="a")) + + def test_one_raise_with_condition(self): + self.handler.error_level(ErrorLevel.RAISE) + self.assertRaises(ValueError, lambda: self.handler._one(int, lambda x: x != "", + "a")) + + def test_one_ignore(self): + self.handler.error_level(ErrorLevel.IGNORE) + self.assertEqual(self.handler._one(mapper=int, item="a"), _sentinel) + + def test_one_ignore_with_condition(self): + self.handler.error_level(ErrorLevel.IGNORE) + self.assertEqual(self.handler._one(int, lambda x: x != "", "a"), _sentinel) + + def test_one_ignore_specific_exceptions(self): + self.handler.error_level(ErrorLevel.IGNORE, ValueError, AttributeError) + self.assertEqual(self.handler._one( + mapper=lambda x: x.split(), item=1), _sentinel) + + def test_one_ignore_specific_exception_raise_another(self): + self.handler.error_level(ErrorLevel.IGNORE, ValueError) + self.assertRaises(AttributeError, lambda: self.handler._one( + mapper=lambda x: x.split(), item=1)) + + def test_one_warn(self): + self.handler.error_level(ErrorLevel.WARN) + self.assertEqual(self.handler._one(mapper=int, item="a"), _sentinel) + + def test_one_warn_with_condition(self): + self.handler.error_level(ErrorLevel.WARN) + self.assertEqual(self.handler._one(int, lambda x: x != "", "a"), _sentinel) + + def test_remove_sentinels(self): + self.handler.error_level(ErrorLevel.IGNORE) + src = ["1", 2, "3", "a"] + self.assertEqual(self.handler._remove_sentinel( + self.handler._one(mapper=int, item=item) for item in src), + [1, 2, 3] + ) + + def test_remove_sentinels_no_sentinels(self): + self.handler.error_level(ErrorLevel.IGNORE) + src = ["1", 2, "3", "a"] + self.assertEqual(self.handler._remove_sentinel(src), src) + + def test_sentinel_eq(self): + s1 = Sentinel() + s2 = Sentinel() + self.assertTrue(s1 == s2) + + def test_sentinel_eq_false(self): + s1 = Sentinel() + s2 = object() + self.assertFalse(s1 == s2) + + def test_sentinel_ne(self): + s1 = Sentinel() + s2 = object() + self.assertTrue(s1 != s2) + + def test_sentinel_ne_false(self): + s1 = Sentinel() + s2 = Sentinel() + self.assertFalse(s1 != s2) diff --git a/tests/test_error_handler_streams.py b/tests/test_error_handler_streams.py new file mode 100644 index 0000000..e2d3f1b --- /dev/null +++ b/tests/test_error_handler_streams.py @@ -0,0 +1,158 @@ +import unittest + +from parameterized import parameterized_class + +from pystreamapi._streams.error.__levels import ErrorLevel +from pystreamapi._streams.__parallel_stream import ParallelStream +from pystreamapi._streams.__sequential_stream import SequentialStream +from pystreamapi._streams.numeric.__parallel_numeric_stream import ParallelNumericStream +from pystreamapi._streams.numeric.__sequential_numeric_stream import SequentialNumericStream + + +class NoToString: + def __str__(self): + raise ValueError("No to string") + +@parameterized_class("stream", [ + [SequentialStream], + [ParallelStream], + [SequentialNumericStream], + [ParallelNumericStream]]) +class TestStreamImplementation(unittest.TestCase): + + def test_drop_while_raise(self): + with self.assertRaises(ValueError): + self.stream([1, 2, 3, 4, 5, "a", 6])\ + .drop_while(lambda x: int(x) < 6).to_list() + + def test_drop_while_ignore(self): + result = self.stream([1, 2, 3, 4, "a", 5, 6, 7, 8])\ + .error_level(ErrorLevel.IGNORE) \ + .drop_while(lambda x: int(x) < 5).to_list() + self.assertListEqual(result, [5, 6, 7, 8]) + + def test_filter_raise(self): + with self.assertRaises(ValueError): + self.stream([1, 2, 3, 4, 5, "a", 6])\ + .filter(lambda x: int(x) < 6).to_list() + + def test_filter_ignore(self): + result = self.stream([1, "a", "3"]).error_level(ErrorLevel.IGNORE)\ + .filter(lambda x: int(x) < 6).to_list() + self.assertListEqual(result, [1, "3"]) + + def test_flat_map_raise(self): + with self.assertRaises(ValueError): + self.stream([1, 2, 3, "a"]).error_level(ErrorLevel.RAISE) \ + .flat_map(lambda x: self.stream([int(x), int(x)])).to_list() + + def test_flat_map_ignore(self): + result = self.stream([1, 2, 3, "a"]).error_level(ErrorLevel.IGNORE)\ + .flat_map(lambda x: self.stream([int(x), int(x)])).to_list() + self.assertListEqual(result, [1, 1, 2, 2, 3, 3]) + + def test_group_by_raise(self): + with self.assertRaises(AttributeError): + self.stream([1, "b", "a"])\ + .error_level(ErrorLevel.RAISE)\ + .group_by(lambda x: x.isalnum())\ + .to_list() + + def test_group_by_ignore(self): + result = self.stream([1, "b", "a"])\ + .error_level(ErrorLevel.IGNORE)\ + .group_by(lambda x: x.isalnum())\ + .to_list() + self.assertListEqual(result, [(True, ["b", "a"])]) + + def test_map_str_to_int_raise(self): + with self.assertRaises(ValueError): + self.stream(["1", "2", "3", "a"]).error_level(ErrorLevel.RAISE) \ + .map(int).to_list() + + def test_map_str_to_int_ignore(self): + result = self.stream(["1", "2", "3", "a"])\ + .error_level(ErrorLevel.IGNORE).map(int).to_list() + self.assertListEqual(result, [1, 2, 3]) + + def test_map_to_int_raise(self): + with self.assertRaises(ValueError): + self.stream([1, 2, 3, "a"])\ + .error_level(ErrorLevel.RAISE).map_to_int().to_list() + + def test_map_to_int_ignore(self): + result = self.stream([1, 2, 3, "a"])\ + .error_level(ErrorLevel.IGNORE).map_to_int().to_list() + self.assertListEqual(result, [1, 2, 3]) + + def test_map_to_str_raise(self): + with self.assertRaises(ValueError): + self.stream([1, 2, NoToString(), "a"])\ + .error_level(ErrorLevel.RAISE).map_to_str().to_list() + + def test_map_to_str_ignore(self): + result = self.stream([1, 2, NoToString(), "a"])\ + .error_level(ErrorLevel.IGNORE).map_to_str().to_list() + self.assertListEqual(result, ["1", "2", "a"]) + + def peek_raise(self): + with self.assertRaises(ValueError): + self.stream([1, 2, 3, "a"])\ + .error_level(ErrorLevel.RAISE).peek(int) + + def peek_ignore(self): + result = self.stream([1, 2, 3, "a"])\ + .error_level(ErrorLevel.IGNORE).peek(int).to_list() + self.assertListEqual(result, [1, 2, 3, "a"]) + + def test_take_while_raise(self): + with self.assertRaises(ValueError): + self.stream([1, 2, 3, "a", 4])\ + .error_level(ErrorLevel.RAISE).take_while(lambda x: int(x) < 4).to_list() + + def test_take_while_ignore(self): + result = self.stream([1, 2, 3, "a"])\ + .error_level(ErrorLevel.IGNORE).take_while(lambda x: int(x) < 3).to_list() + self.assertListEqual(result, [1, 2]) + + def test_all_match_raise(self): + with self.assertRaises(ValueError): + self.stream([1, 2, 3, "a"]).all_match(lambda x: int(x) > 0) + + def test_all_match_ignore(self): + self.assertFalse(self.stream([1, 2, 3, "a", "-1"]) + .error_level(ErrorLevel.IGNORE) + .all_match(lambda x: int(x) > 0)) + + def test_any_match_raise(self): + with self.assertRaises(ValueError): + self.stream([1, 2, "a", 3]).any_match(lambda x: int(x) > 2) + + def test_any_match_ignore(self): + self.assertTrue(self.stream([1, 2, 3, "a", "-1"]) + .error_level(ErrorLevel.IGNORE) + .any_match(lambda x: int(x) < 0)) + + def test_for_each_raise(self): + with self.assertRaises(ValueError): + self.stream([1, 2, 3, "a"]).for_each(int) + + def test_for_each_ignore(self): + self.stream([1, 2, 3, "a"]).error_level(ErrorLevel.IGNORE).for_each(int) + + def test_none_match_raise(self): + with self.assertRaises(ValueError): + self.stream([1, 2, 3, "a"]).none_match(lambda x: int(x) < 0) + + def test_none_match_ignore(self): + self.assertFalse(self.stream([1, 2, 3, "a", "-1"]) + .error_level(ErrorLevel.IGNORE) + .none_match(lambda x: int(x) < 0)) + + def test_reduce_raise(self): + with self.assertRaises(TypeError): + self.stream([1, 2, 3, "a"]).reduce(lambda x, y: x + y) + + def test_reduce_ignore(self): + self.assertEqual(self.stream([1, 2, 3, "a"]).error_level(ErrorLevel.IGNORE) + .reduce(lambda x, y: x + y).get(), 6) diff --git a/tests/test_itertools.py b/tests/test_itertools.py new file mode 100644 index 0000000..20bb6d5 --- /dev/null +++ b/tests/test_itertools.py @@ -0,0 +1,39 @@ +import unittest + +from pystreamapi._itertools.tools import reduce, dropwhile + + +class TestReduce(unittest.TestCase): + def test_reduce_with_empty_sequence_and_no_initial_value(self): + with self.assertRaises(TypeError) as cm: + reduce(lambda x, y: x + y, [], handler=None) + self.assertEqual( + str(cm.exception), + "reduce() of empty iterable with no initial value" + ) + + def test_reduce_with_empty_sequence_and_initial_value(self): + result = reduce(lambda x, y: x + y, [], initial=10, handler=None) + self.assertEqual(result, 10) + + def test_reduce_with_sequence_and_no_initial_value(self): + sequence = [1, 2, 3, 4, 5] + result = reduce(lambda x, y: x + y, sequence, handler=None) + self.assertEqual(result, 15) + + def test_reduce_with_sequence_and_initial_value(self): + sequence = [1, 2, 3, 4, 5] + result = reduce(lambda x, y: x + y, sequence, initial=10, handler=None) + self.assertEqual(result, 25) + + +class TestDropWhile(unittest.TestCase): + def test_dropwhile_with_empty_iterable(self): + iterable = [] + result = list(dropwhile(lambda x: x < 5, iterable, handler=None)) + self.assertEqual(result, []) + + def test_dropwhile_with_non_empty_iterable(self): + iterable = [1, 2, 3, 4, 5, 6, 7] + result = list(dropwhile(lambda x: x < 5, iterable, handler=None)) + self.assertEqual(result, [5, 6, 7]) diff --git a/tests/test_stream_implementation.py b/tests/test_stream_implementation.py index 9326954..ac1df60 100644 --- a/tests/test_stream_implementation.py +++ b/tests/test_stream_implementation.py @@ -17,8 +17,8 @@ class TestStreamImplementation(unittest.TestCase): def test_for_each(self): out = [] - self.stream([1, 2, 3, 9]).for_each(out.append) - self.assertListEqual(out, [1, 2, 3, 9]) + self.stream([1, 2, 3, 9]).map_to_str().for_each(out.append) + self.assertListEqual(out, ["1", "2", "3", "9"]) def test_map_str_to_int(self): result = self.stream(["1", "2", "3", "9"]).map(int).to_list() @@ -79,6 +79,8 @@ def test_all_match(self): self.assertTrue(result) result = self.stream([1, 2, 3, 9]).all_match(lambda x: x > 1) self.assertFalse(result) + result = self.stream([1, 2, 3, 9]).map_to_str().all_match(lambda x: isinstance(x, str)) + self.assertTrue(result) def test_all_match_empty(self): result = self.stream([]).all_match(lambda x: x > 0)