Skip to content

Commit

Permalink
Remove duplications across stream implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
garlontas committed Jun 19, 2023
1 parent 50f2267 commit d446023
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 97 deletions.
49 changes: 43 additions & 6 deletions pystreamapi/_streams/__base_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,19 @@ def __drop_while(self, predicate: Callable[[Any], bool]):
"""Drops elements from the stream while the predicate is true."""
self._source = list(itertools.dropwhile(predicate, self._source))

@abstractmethod
def filter(self, predicate: Callable[[K], bool]) -> 'BaseStream[_V]':
def filter(self, predicate: Callable[[K], bool]) -> 'BaseStream[K]':
"""
Returns a stream consisting of the elements of this stream that match the given predicate.
:param predicate:
"""
self._queue.append(Process(self._filter, predicate))
return self

@abstractmethod
def _filter(self, predicate: Callable[[K], bool]):
"""Implementation of filter. Should be implemented by subclasses."""

def flat_map(self, predicate: Callable[[K], Iterable[_V]]) -> 'BaseStream[_V]':
"""
Returns a stream consisting of the results of replacing each element of this stream with
Expand All @@ -88,15 +92,30 @@ def flat_map(self, predicate: Callable[[K], Iterable[_V]]) -> 'BaseStream[_V]':
:param predicate:
"""
self._queue.append(Process(self._flat_map, predicate))
return self

@abstractmethod
def group_by(self, key_mapper: Callable[[K], Any]) -> 'BaseStream[_V]':
def _flat_map(self, predicate: Callable[[K], Iterable[_V]]):
"""Implementation of flat_map. Should be implemented by subclasses."""

def group_by(self, key_mapper: Callable[[K], Any]) -> 'BaseStream[K]':
"""
Returns a Stream consisting of the results of grouping the elements of this stream
by the given classifier and extracting the key/value pairs.
:param key_mapper:
"""
self._queue.append(Process(self.__group_by, key_mapper))
return self

def __group_by(self, key_mapper: Callable[[Any], Any]):
groups = self._group_to_dict(key_mapper)
self._source = groups.items()

@abstractmethod
def _group_to_dict(self, key_mapper: Callable[[K], Any]) -> dict[K, list]:
"""Groups the stream into a dictionary. Should be implemented by subclasses."""

def limit(self, max_size: int) -> 'BaseStream[_V]':
"""
Expand All @@ -112,37 +131,55 @@ def __limit(self, max_size: int):
"""Limits the stream to the first n elements."""
self._source = itertools.islice(self._source, max_size)

@abstractmethod
def map(self, mapper: Callable[[K], _V]) -> 'BaseStream[_V]':
"""
Returns a stream consisting of the results of applying the given function to the elements
of this stream.
:param mapper:
"""
self._queue.append(Process(self._map, mapper))
return self

@abstractmethod
def _map(self, mapper: Callable[[K], _V]):
"""Implementation of map. Should be implemented by subclasses."""

def map_to_int(self) -> 'BaseStream[_V]':
"""
Returns a stream consisting of the results of converting the elements of this stream to
integers.
"""
self._queue.append(Process(self.__map_to_int))
return self

def __map_to_int(self):
self._map(int)

@abstractmethod
def map_to_str(self) -> 'BaseStream[_V]':
"""
Returns a stream consisting of the results of converting the elements of this stream to
strings.
"""
self._queue.append(Process(self.__map_to_str))
return self

def __map_to_str(self):
self._map(str)

@abstractmethod
def peek(self, action: Callable) -> 'BaseStream[_V]':
"""
Returns a stream consisting of the elements of this stream, additionally performing the
provided action on each element as elements are consumed from the resulting stream.
:param action:
"""
self._queue.append(Process(self._peek, action))
return self

@abstractmethod
def _peek(self, action: Callable):
"""Implementation of peek. Should be implemented by subclasses."""

def reversed(self) -> 'BaseStream[_V]':
"""
Expand Down
55 changes: 8 additions & 47 deletions pystreamapi/_streams/__parallel_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from optional import Optional

import pystreamapi._streams.__base_stream as stream
from pystreamapi._lazy.process import Process
from pystreamapi._parallel.fork_and_join import Parallelizer

_identity_missing = object()
Expand All @@ -23,11 +22,7 @@ def all_match(self, predicate: Callable[[Any], bool]):
return all(Parallel(n_jobs=-1, prefer="threads")(delayed(predicate)(element)
for element in self._source))

def filter(self, predicate: Callable[[Any], bool]):
self._queue.append(Process(self.__filter, predicate))
return self

def __filter(self, predicate: Callable[[Any], bool]):
def _filter(self, predicate: Callable[[Any], bool]):
self._set_parallelizer_src()
self._source = self.parallelizer.filter(predicate)

Expand All @@ -37,26 +32,14 @@ def find_any(self):
return Optional.of(self._source[0])
return Optional.empty()

def flat_map(self, predicate: Callable[[Any], stream.BaseStream]):
self._queue.append(Process(self.__flat_map, predicate))
return self

def __flat_map(self, predicate: Callable[[Any], stream.BaseStream]):
def _flat_map(self, predicate: Callable[[Any], stream.BaseStream]):
new_src = []
for element in Parallel(n_jobs=-1, prefer="threads")(delayed(predicate)(element)
for element in self._source):
new_src.extend(element.to_list())
self._source = new_src

def group_by(self, key_mapper: Callable[[Any], Any]):
self._queue.append(Process(self.__group_by, key_mapper))
return self

def __group_by(self, key_mapper: Callable[[Any], Any]):
groups = self.__group_to_dict(key_mapper)
self._source = groups.items()

def __group_to_dict(self, key_mapper: Callable[[Any], Any]):
def _group_to_dict(self, key_mapper: Callable[[Any], Any]):
groups = {}

def process_element(element):
Expand All @@ -65,42 +48,20 @@ def process_element(element):
groups[key] = []
groups[key].append(element)

Parallel(n_jobs=-1, prefer="threads") \
(delayed(process_element)(element) for element in self._source)
Parallel(n_jobs=-1, prefer="threads")(delayed(process_element)(element)
for element in self._source)
return groups

def for_each(self, predicate: Callable):
self._trigger_exec()
Parallel(n_jobs=-1, prefer="threads")(delayed(predicate)(element)
for element in self._source)

def map(self, mapper: Callable[[Any], Any]):
self._queue.append(Process(self.__map, mapper))
return self

def __map(self, predicate: Callable[[Any], Any]):
def _map(self, predicate: Callable[[Any], Any]):
self._source = Parallel(n_jobs=-1, prefer="threads")(delayed(predicate)(element)
for element in self._source)

def map_to_int(self):
self._queue.append(Process(self.__map_to_int))
return self

def __map_to_int(self):
self.__map(int)

def map_to_str(self):
self._queue.append(Process(self.__map_to_str))
return self

def __map_to_str(self):
self.__map(str)

def peek(self, action: Callable):
self._queue.append(Process(self.__peek, action))
return self

def __peek(self, predicate: Callable):
def _peek(self, predicate: Callable):
Parallel(n_jobs=-1, prefer="threads")(delayed(predicate)(element)
for element in self._source)

Expand All @@ -120,7 +81,7 @@ def __reduce(self, pred, _):

def to_dict(self, key_mapper: Callable[[Any], Any]) -> dict:
self._trigger_exec()
return dict(self.__group_to_dict(key_mapper))
return dict(self._group_to_dict(key_mapper))

def _set_parallelizer_src(self):
self.parallelizer.set_source(self._source)
50 changes: 6 additions & 44 deletions pystreamapi/_streams/__sequential_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@ def all_match(self, predicate: Callable[[Any], bool]):
self._trigger_exec()
return all(predicate(element) for element in self._source)

def filter(self, predicate: Callable[[Any], bool]):
self._queue.append(Process(self.__filter, predicate))
return self

def __filter(self, predicate: Callable[[Any], bool]):
def _filter(self, predicate: Callable[[Any], bool]):
self._source = [element for element in self._source if predicate(element)]

def find_any(self):
Expand All @@ -29,25 +25,13 @@ def find_any(self):
return Optional.of(self._source[0])
return Optional.empty()

def flat_map(self, predicate: Callable[[Any], stream.BaseStream]):
self._queue.append(Process(self.__flat_map, predicate))
return self

def __flat_map(self, predicate: Callable[[Any], stream.BaseStream]):
def _flat_map(self, predicate: Callable[[Any], stream.BaseStream]):
new_src = []
for element in [predicate(element) for element in self._source]:
new_src.extend(element.to_list())
self._source = new_src

def group_by(self, key_mapper: Callable[[Any], Any]):
self._queue.append(Process(self.__group_by, key_mapper))
return self

def __group_by(self, key_mapper: Callable[[Any], Any]):
groups = self.__group_to_dict(key_mapper)
self._source = groups.items()

def __group_to_dict(self, key_mapper: Callable[[Any], Any]):
def _group_to_dict(self, key_mapper: Callable[[Any], Any]):
groups = {}
for element in self._source:
key = key_mapper(element)
Expand All @@ -61,32 +45,10 @@ def for_each(self, predicate: Callable):
for element in self._source:
predicate(element)

def map(self, mapper: Callable[[Any], Any]):
self._queue.append(Process(self.__map, mapper))
return self

def __map(self, predicate: Callable[[Any], Any]):
def _map(self, predicate: Callable[[Any], Any]):
self._source = [predicate(element) for element in self._source]

def map_to_int(self):
self._queue.append(Process(self.__map_to_int))
return self

def __map_to_int(self):
self.__map(int)

def map_to_str(self):
self._queue.append(Process(self.__map_to_str))
return self

def __map_to_str(self):
self.__map(str)

def peek(self, action: Callable):
self._queue.append(Process(self.__peek, action))
return self

def __peek(self, predicate: Callable):
def _peek(self, predicate: Callable):
for element in self._source:
predicate(element)

Expand All @@ -100,4 +62,4 @@ def reduce(self, predicate: Callable, identity=_identity_missing, depends_on_sta

def to_dict(self, key_mapper: Callable[[Any], Any]) -> dict:
self._trigger_exec()
return self.__group_to_dict(key_mapper)
return self._group_to_dict(key_mapper)

0 comments on commit d446023

Please sign in to comment.