Skip to content

Commit

Permalink
Fix Whitespace before opening parenthesis (FLK-E211)
Browse files Browse the repository at this point in the history
  • Loading branch information
garlontas committed Jul 18, 2023
1 parent a89c2ac commit 7aa3775
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 12 deletions.
5 changes: 3 additions & 2 deletions pystreamapi/_parallel/fork_and_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,5 +95,6 @@ def __calculate_number_of_parts(self, min_nr_items=1):

def __run_job_in_parallel(self, src, operation, op_function):
"""Run the operation in parallel"""
return Parallel(n_jobs=-1, prefer="processes", handler=self.__handler)\
(delayed(operation)(op_function, part) for part in src)
return Parallel(n_jobs=-1, prefer="processes", handler=self.__handler)(
delayed(operation)(op_function, part) for part in src
)
6 changes: 4 additions & 2 deletions pystreamapi/_streams/__base_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@


def terminal(func):
"""Decorator to execute all the processes in the queue
before executing the decorated function. To be applied to terminal operations."""
"""
Decorator to execute all the processes in the queue before executing the decorated function.
To be applied to terminal operations.
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
self: BaseStream = args[0]
Expand Down
19 changes: 11 additions & 8 deletions pystreamapi/_streams/__parallel_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ def find_any(self):

def _flat_map(self, predicate: Callable[[Any], stream.BaseStream]):
new_src = []
for element in Parallel(n_jobs=-1, prefer="threads", handler=self)\
(delayed(self.__mapper(predicate))(element) for element in self._source):
for element in Parallel(n_jobs=-1, prefer="threads", handler=self)(
delayed(self.__mapper(predicate))(element) for element in self._source):
new_src.extend(element.to_list())
self._source = new_src

Expand All @@ -49,8 +49,9 @@ def process_element(element):
key = key_mapper(element)
groups[key].append(element)

Parallel(n_jobs=-1, prefer="threads", handler=self)\
(delayed(self.__mapper(process_element))(element) for element in self._source)
Parallel(n_jobs=-1, prefer="threads", handler=self)(
delayed(self.__mapper(process_element))(element) for element in self._source
)
return groups

@terminal
Expand All @@ -59,12 +60,14 @@ def for_each(self, action: Callable):
(delayed(self.__mapper(action))(element) for element in self._source)

def _map(self, mapper: Callable[[Any], Any]):
self._source = Parallel(n_jobs=-1, prefer="threads", handler=self)\
(delayed(self.__mapper(mapper))(element) for element in self._source)
self._source = Parallel(n_jobs=-1, prefer="threads", handler=self)(
delayed(self.__mapper(mapper))(element) for element in self._source
)

def _peek(self, action: Callable):
Parallel(n_jobs=-1, prefer="threads", handler=self)\
(delayed(self.__mapper(action))(element) for element in self._source)
Parallel(n_jobs=-1, prefer="threads", handler=self)(
delayed(self.__mapper(action))(element) for element in self._source
)

@terminal
def reduce(self, predicate: Callable[[Any, Any], Any], identity=_identity_missing,
Expand Down

0 comments on commit 7aa3775

Please sign in to comment.