From ad9c2ccbccefad72555011e169c65df70e5443fb Mon Sep 17 00:00:00 2001 From: Stefan Garlonta Date: Tue, 18 Jul 2023 16:57:38 +0200 Subject: [PATCH] Fix deepsource smells --- pystreamapi/_itertools/tools.py | 4 +--- pystreamapi/_parallel/parallelizer.py | 1 + pystreamapi/_streams/__base_stream.py | 2 ++ pystreamapi/_streams/__parallel_stream.py | 10 +++++----- pystreamapi/_streams/error/__error.py | 5 ++++- pystreamapi/_streams/error/__levels.py | 1 + pystreamapi/_streams/error/__sentinel.py | 3 +++ 7 files changed, 17 insertions(+), 9 deletions(-) diff --git a/pystreamapi/_itertools/tools.py b/pystreamapi/_itertools/tools.py index 16a81ff..5ebdd40 100644 --- a/pystreamapi/_itertools/tools.py +++ b/pystreamapi/_itertools/tools.py @@ -21,10 +21,9 @@ def dropwhile(predicate, iterable, handler: ErrorHandler=None): _initial_missing = object() + def reduce(function, sequence, initial=_initial_missing, handler: ErrorHandler=None): """ - reduce(function, iterable[, initial]) -> value - Apply a function of two arguments cumulatively to the items of a sequence or iterable, from left to right, to reduce the iterable to a single value. For example, reduce(lambda x, y: x+y, [1, 2, 3, 4, 5]) calculates @@ -32,7 +31,6 @@ def reduce(function, sequence, initial=_initial_missing, handler: ErrorHandler=N of the iterable in the calculation, and serves as a default when the iterable is empty. """ - it = iter(sequence) if initial is _initial_missing: diff --git a/pystreamapi/_parallel/parallelizer.py b/pystreamapi/_parallel/parallelizer.py index 2e444ad..01dad67 100644 --- a/pystreamapi/_parallel/parallelizer.py +++ b/pystreamapi/_parallel/parallelizer.py @@ -3,6 +3,7 @@ from pystreamapi._streams.error.__error import ErrorHandler from pystreamapi._streams.error.__levels import ErrorLevel + class Parallel: """Wrapper for joblib.Parallel supporting error handling""" diff --git a/pystreamapi/_streams/__base_stream.py b/pystreamapi/_streams/__base_stream.py index 6d82f58..b15b828 100644 --- a/pystreamapi/_streams/__base_stream.py +++ b/pystreamapi/_streams/__base_stream.py @@ -17,6 +17,8 @@ def terminal(func): + """Decorator to execute all the processes in the queue + before executing the decorated function. To be applied to terminal operations.""" @functools.wraps(func) def wrapper(*args, **kwargs): self: BaseStream = args[0] diff --git a/pystreamapi/_streams/__parallel_stream.py b/pystreamapi/_streams/__parallel_stream.py index e122671..e245c62 100644 --- a/pystreamapi/_streams/__parallel_stream.py +++ b/pystreamapi/_streams/__parallel_stream.py @@ -38,7 +38,7 @@ def find_any(self): def _flat_map(self, predicate: Callable[[Any], stream.BaseStream]): new_src = [] for element in Parallel(n_jobs=-1, prefer="threads", handler=self)\ - (delayed(self.__mapper(predicate))(element) for element in self._source): + (delayed(self.__mapper(predicate))(element) for element in self._source): new_src.extend(element.to_list()) self._source = new_src @@ -55,16 +55,16 @@ def process_element(element): @terminal def for_each(self, action: Callable): - Parallel(n_jobs=-1, prefer="threads", handler=self)(delayed(self.__mapper(action))(element) - for element in self._source) + Parallel(n_jobs=-1, prefer="threads", handler=self)\ + (delayed(self.__mapper(action))(element) for element in self._source) def _map(self, mapper: Callable[[Any], Any]): self._source = Parallel(n_jobs=-1, prefer="threads", handler=self)\ (delayed(self.__mapper(mapper))(element) for element in self._source) def _peek(self, action: Callable): - Parallel(n_jobs=-1, prefer="threads", handler=self)(delayed(self.__mapper(action))(element) - for element in self._source) + Parallel(n_jobs=-1, prefer="threads", handler=self)\ + (delayed(self.__mapper(action))(element) for element in self._source) @terminal def reduce(self, predicate: Callable[[Any, Any], Any], identity=_identity_missing, diff --git a/pystreamapi/_streams/error/__error.py b/pystreamapi/_streams/error/__error.py index fc88a3c..b1ff8a9 100644 --- a/pystreamapi/_streams/error/__error.py +++ b/pystreamapi/_streams/error/__error.py @@ -11,14 +11,17 @@ _sentinel = Sentinel() + def nothing(sth): """Do not modify the input""" return sth + def true_condition(_): """Always return True""" return True + class ErrorHandler: """Handle errors in stream operations""" @@ -40,7 +43,7 @@ def _get_error_level(self): """Get the error level""" return self.__error_level - def _itr(self, src, mapper = nothing, condition = true_condition) -> list: + def _itr(self, src, mapper=nothing, condition=true_condition) -> list: """Iterate over the source and apply the mapper and condition""" new_src = [] for i in src: diff --git a/pystreamapi/_streams/error/__levels.py b/pystreamapi/_streams/error/__levels.py index 59cb671..2e2a61f 100644 --- a/pystreamapi/_streams/error/__levels.py +++ b/pystreamapi/_streams/error/__levels.py @@ -5,6 +5,7 @@ class ErrorLevel: IGNORE: ignore the error, skip the item WARN: print a warning and ignore the error """ + RAISE = 0 IGNORE = 1 WARN = 2 diff --git a/pystreamapi/_streams/error/__sentinel.py b/pystreamapi/_streams/error/__sentinel.py index 6a8c963..c38a464 100644 --- a/pystreamapi/_streams/error/__sentinel.py +++ b/pystreamapi/_streams/error/__sentinel.py @@ -6,3 +6,6 @@ def __eq__(self, other): def __ne__(self, other): return not isinstance(other, Sentinel) + + def __hash__(self): + return hash(Sentinel) \ No newline at end of file