diff --git a/pystreamapi/_parallel/fork_and_join.py b/pystreamapi/_parallel/fork_and_join.py index 619cb3b..ee74c9e 100644 --- a/pystreamapi/_parallel/fork_and_join.py +++ b/pystreamapi/_parallel/fork_and_join.py @@ -95,5 +95,6 @@ def __calculate_number_of_parts(self, min_nr_items=1): def __run_job_in_parallel(self, src, operation, op_function): """Run the operation in parallel""" - return Parallel(n_jobs=-1, prefer="processes", handler=self.__handler)\ - (delayed(operation)(op_function, part) for part in src) + return Parallel(n_jobs=-1, prefer="processes", handler=self.__handler)( + delayed(operation)(op_function, part) for part in src + ) diff --git a/pystreamapi/_streams/__base_stream.py b/pystreamapi/_streams/__base_stream.py index b15b828..768597d 100644 --- a/pystreamapi/_streams/__base_stream.py +++ b/pystreamapi/_streams/__base_stream.py @@ -17,8 +17,10 @@ def terminal(func): - """Decorator to execute all the processes in the queue - before executing the decorated function. To be applied to terminal operations.""" + """ + Decorator to execute all the processes in the queue before executing the decorated function. + To be applied to terminal operations. + """ @functools.wraps(func) def wrapper(*args, **kwargs): self: BaseStream = args[0] diff --git a/pystreamapi/_streams/__parallel_stream.py b/pystreamapi/_streams/__parallel_stream.py index e245c62..c763698 100644 --- a/pystreamapi/_streams/__parallel_stream.py +++ b/pystreamapi/_streams/__parallel_stream.py @@ -37,8 +37,8 @@ def find_any(self): def _flat_map(self, predicate: Callable[[Any], stream.BaseStream]): new_src = [] - for element in Parallel(n_jobs=-1, prefer="threads", handler=self)\ - (delayed(self.__mapper(predicate))(element) for element in self._source): + for element in Parallel(n_jobs=-1, prefer="threads", handler=self)( + delayed(self.__mapper(predicate))(element) for element in self._source): new_src.extend(element.to_list()) self._source = new_src @@ -49,8 +49,9 @@ def process_element(element): key = key_mapper(element) groups[key].append(element) - Parallel(n_jobs=-1, prefer="threads", handler=self)\ - (delayed(self.__mapper(process_element))(element) for element in self._source) + Parallel(n_jobs=-1, prefer="threads", handler=self)( + delayed(self.__mapper(process_element))(element) for element in self._source + ) return groups @terminal @@ -59,12 +60,14 @@ def for_each(self, action: Callable): (delayed(self.__mapper(action))(element) for element in self._source) def _map(self, mapper: Callable[[Any], Any]): - self._source = Parallel(n_jobs=-1, prefer="threads", handler=self)\ - (delayed(self.__mapper(mapper))(element) for element in self._source) + self._source = Parallel(n_jobs=-1, prefer="threads", handler=self)( + delayed(self.__mapper(mapper))(element) for element in self._source + ) def _peek(self, action: Callable): - Parallel(n_jobs=-1, prefer="threads", handler=self)\ - (delayed(self.__mapper(action))(element) for element in self._source) + Parallel(n_jobs=-1, prefer="threads", handler=self)( + delayed(self.__mapper(action))(element) for element in self._source + ) @terminal def reduce(self, predicate: Callable[[Any, Any], Any], identity=_identity_missing,