Skip to content

Commit

Permalink
Fix deepsource smells
Browse files Browse the repository at this point in the history
  • Loading branch information
garlontas committed Jul 18, 2023
1 parent 6937dd3 commit ad9c2cc
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 9 deletions.
4 changes: 1 addition & 3 deletions pystreamapi/_itertools/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,16 @@ def dropwhile(predicate, iterable, handler: ErrorHandler=None):

_initial_missing = object()


def reduce(function, sequence, initial=_initial_missing, handler: ErrorHandler=None):
"""
reduce(function, iterable[, initial]) -> value
Apply a function of two arguments cumulatively to the items of a sequence
or iterable, from left to right, to reduce the iterable to a single
value. For example, reduce(lambda x, y: x+y, [1, 2, 3, 4, 5]) calculates
((((1+2)+3)+4)+5). If initial is present, it is placed before the items
of the iterable in the calculation, and serves as a default when the
iterable is empty.
"""

it = iter(sequence)

if initial is _initial_missing:
Expand Down
1 change: 1 addition & 0 deletions pystreamapi/_parallel/parallelizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pystreamapi._streams.error.__error import ErrorHandler
from pystreamapi._streams.error.__levels import ErrorLevel


class Parallel:
"""Wrapper for joblib.Parallel supporting error handling"""

Expand Down
2 changes: 2 additions & 0 deletions pystreamapi/_streams/__base_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@


def terminal(func):
"""Decorator to execute all the processes in the queue
before executing the decorated function. To be applied to terminal operations."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
self: BaseStream = args[0]
Expand Down
10 changes: 5 additions & 5 deletions pystreamapi/_streams/__parallel_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def find_any(self):
def _flat_map(self, predicate: Callable[[Any], stream.BaseStream]):
new_src = []
for element in Parallel(n_jobs=-1, prefer="threads", handler=self)\
(delayed(self.__mapper(predicate))(element) for element in self._source):
(delayed(self.__mapper(predicate))(element) for element in self._source):
new_src.extend(element.to_list())
self._source = new_src

Expand All @@ -55,16 +55,16 @@ def process_element(element):

@terminal
def for_each(self, action: Callable):
Parallel(n_jobs=-1, prefer="threads", handler=self)(delayed(self.__mapper(action))(element)
for element in self._source)
Parallel(n_jobs=-1, prefer="threads", handler=self)\
(delayed(self.__mapper(action))(element) for element in self._source)

def _map(self, mapper: Callable[[Any], Any]):
self._source = Parallel(n_jobs=-1, prefer="threads", handler=self)\
(delayed(self.__mapper(mapper))(element) for element in self._source)

def _peek(self, action: Callable):
Parallel(n_jobs=-1, prefer="threads", handler=self)(delayed(self.__mapper(action))(element)
for element in self._source)
Parallel(n_jobs=-1, prefer="threads", handler=self)\
(delayed(self.__mapper(action))(element) for element in self._source)

@terminal
def reduce(self, predicate: Callable[[Any, Any], Any], identity=_identity_missing,
Expand Down
5 changes: 4 additions & 1 deletion pystreamapi/_streams/error/__error.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,17 @@

_sentinel = Sentinel()


def nothing(sth):
"""Do not modify the input"""
return sth


def true_condition(_):
"""Always return True"""
return True


class ErrorHandler:
"""Handle errors in stream operations"""

Expand All @@ -40,7 +43,7 @@ def _get_error_level(self):
"""Get the error level"""
return self.__error_level

def _itr(self, src, mapper = nothing, condition = true_condition) -> list:
def _itr(self, src, mapper=nothing, condition=true_condition) -> list:
"""Iterate over the source and apply the mapper and condition"""
new_src = []
for i in src:
Expand Down
1 change: 1 addition & 0 deletions pystreamapi/_streams/error/__levels.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ class ErrorLevel:
IGNORE: ignore the error, skip the item
WARN: print a warning and ignore the error
"""

RAISE = 0
IGNORE = 1
WARN = 2
3 changes: 3 additions & 0 deletions pystreamapi/_streams/error/__sentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ def __eq__(self, other):

def __ne__(self, other):
return not isinstance(other, Sentinel)

def __hash__(self):
return hash(Sentinel)

0 comments on commit ad9c2cc

Please sign in to comment.