Skip to content

Commit

Permalink
Merge pull request #35 from PickwickSoft/feature/add-grouping-operations
Browse files Browse the repository at this point in the history
Feature/add grouping operations
  • Loading branch information
garlontas authored Jun 19, 2023
2 parents 6e910ed + 79a486e commit cdb941d
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 81 deletions.
68 changes: 63 additions & 5 deletions pystreamapi/_streams/__base_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,19 @@ def __drop_while(self, predicate: Callable[[Any], bool]):
"""Drops elements from the stream while the predicate is true."""
self._source = list(itertools.dropwhile(predicate, self._source))

@abstractmethod
def filter(self, predicate: Callable[[K], bool]) -> 'BaseStream[_V]':
def filter(self, predicate: Callable[[K], bool]) -> 'BaseStream[K]':
"""
Returns a stream consisting of the elements of this stream that match the given predicate.
:param predicate:
"""
self._queue.append(Process(self._filter, predicate))
return self

@abstractmethod
def _filter(self, predicate: Callable[[K], bool]):
"""Implementation of filter. Should be implemented by subclasses."""

def flat_map(self, predicate: Callable[[K], Iterable[_V]]) -> 'BaseStream[_V]':
"""
Returns a stream consisting of the results of replacing each element of this stream with
Expand All @@ -88,6 +92,31 @@ def flat_map(self, predicate: Callable[[K], Iterable[_V]]) -> 'BaseStream[_V]':
:param predicate:
"""
self._queue.append(Process(self._flat_map, predicate))
return self

@abstractmethod
def _flat_map(self, predicate: Callable[[K], Iterable[_V]]):
"""Implementation of flat_map. Should be implemented by subclasses."""

def group_by(self, key_mapper: Callable[[K], Any]) -> 'BaseStream[K]':
"""
Returns a Stream consisting of the results of grouping the elements of this stream
by the given classifier and extracting the key/value pairs.
:param key_mapper:
"""
self._queue.append(Process(self.__group_by, key_mapper))
return self

def __group_by(self, key_mapper: Callable[[Any], Any]):
"""Groups the stream by the given key mapper. Uses the implementation of _group_to_dict."""
groups = self._group_to_dict(key_mapper)
self._source = groups.items()

@abstractmethod
def _group_to_dict(self, key_mapper: Callable[[K], Any]) -> dict[K, list]:
"""Groups the stream into a dictionary. Should be implemented by subclasses."""

def limit(self, max_size: int) -> 'BaseStream[_V]':
"""
Expand All @@ -103,37 +132,57 @@ def __limit(self, max_size: int):
"""Limits the stream to the first n elements."""
self._source = itertools.islice(self._source, max_size)

@abstractmethod
def map(self, mapper: Callable[[K], _V]) -> 'BaseStream[_V]':
"""
Returns a stream consisting of the results of applying the given function to the elements
of this stream.
:param mapper:
"""
self._queue.append(Process(self._map, mapper))
return self

@abstractmethod
def _map(self, mapper: Callable[[K], _V]):
"""Implementation of map. Should be implemented by subclasses."""

def map_to_int(self) -> 'BaseStream[_V]':
"""
Returns a stream consisting of the results of converting the elements of this stream to
integers.
"""
self._queue.append(Process(self.__map_to_int))
return self

def __map_to_int(self):
"""Converts the stream to integers."""
self._map(int)

@abstractmethod
def map_to_str(self) -> 'BaseStream[_V]':
"""
Returns a stream consisting of the results of converting the elements of this stream to
strings.
"""
self._queue.append(Process(self.__map_to_str))
return self

def __map_to_str(self):
"""Converts the stream to strings."""
self._map(str)

@abstractmethod
def peek(self, action: Callable) -> 'BaseStream[_V]':
"""
Returns a stream consisting of the elements of this stream, additionally performing the
provided action on each element as elements are consumed from the resulting stream.
:param action:
"""
self._queue.append(Process(self._peek, action))
return self

@abstractmethod
def _peek(self, action: Callable):
"""Implementation of peek. Should be implemented by subclasses."""

def reversed(self) -> 'BaseStream[_V]':
"""
Expand Down Expand Up @@ -296,6 +345,15 @@ def to_set(self):
self._trigger_exec()
return set(self._source)

@abstractmethod
def to_dict(self, key_mapper: Callable[[K], Any]) -> dict:
"""
Returns a dictionary consisting of the results of grouping the elements of this stream
by the given classifier.
:param key_mapper:
"""

def _trigger_exec(self):
"""Triggers execution of the stream."""
self._queue.execute_all()
60 changes: 23 additions & 37 deletions pystreamapi/_streams/__parallel_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from optional import Optional

import pystreamapi._streams.__base_stream as stream
from pystreamapi._lazy.process import Process
from pystreamapi._parallel.fork_and_join import Parallelizer

_identity_missing = object()
Expand All @@ -23,11 +22,7 @@ def all_match(self, predicate: Callable[[Any], bool]):
return all(Parallel(n_jobs=-1, prefer="threads")(delayed(predicate)(element)
for element in self._source))

def filter(self, predicate: Callable[[Any], bool]):
self._queue.append(Process(self.__filter, predicate))
return self

def __filter(self, predicate: Callable[[Any], bool]):
def _filter(self, predicate: Callable[[Any], bool]):
self._set_parallelizer_src()
self._source = self.parallelizer.filter(predicate)

Expand All @@ -37,50 +32,37 @@ def find_any(self):
return Optional.of(self._source[0])
return Optional.empty()

def flat_map(self, predicate: Callable[[Any], stream.BaseStream]):
self._queue.append(Process(self.__flat_map, predicate))
return self

def __flat_map(self, predicate: Callable[[Any], stream.BaseStream]):
def _flat_map(self, predicate: Callable[[Any], stream.BaseStream]):
new_src = []
for element in Parallel(n_jobs=-1, prefer="threads")(delayed(predicate)(element)
for element in self._source):
new_src.extend(element.to_list())
self._source = new_src

def _group_to_dict(self, key_mapper: Callable[[Any], Any]):
groups = {}

def process_element(element):
key = key_mapper(element)
if key not in groups:
groups[key] = []
groups[key].append(element)

Parallel(n_jobs=-1, prefer="threads")(delayed(process_element)(element)
for element in self._source)
return groups

def for_each(self, predicate: Callable):
self._trigger_exec()
Parallel(n_jobs=-1, prefer="threads")(delayed(predicate)(element)
for element in self._source)

def map(self, mapper: Callable[[Any], Any]):
self._queue.append(Process(self.__map, mapper))
return self

def __map(self, predicate: Callable[[Any], Any]):
self._source = Parallel(n_jobs=-1, prefer="threads")(delayed(predicate)(element)
def _map(self, mapper: Callable[[Any], Any]):
self._source = Parallel(n_jobs=-1, prefer="threads")(delayed(mapper)(element)
for element in self._source)

def map_to_int(self):
self._queue.append(Process(self.__map_to_int))
return self

def __map_to_int(self):
self.__map(int)

def map_to_str(self):
self._queue.append(Process(self.__map_to_str))
return self

def __map_to_str(self):
self.__map(str)

def peek(self, action: Callable):
self._queue.append(Process(self.__peek, action))
return self

def __peek(self, predicate: Callable):
Parallel(n_jobs=-1, prefer="threads")(delayed(predicate)(element)
def _peek(self, action: Callable):
Parallel(n_jobs=-1, prefer="threads")(delayed(action)(element)
for element in self._source)

def reduce(self, predicate: Callable[[Any, Any], Any], identity=_identity_missing,
Expand All @@ -97,5 +79,9 @@ def reduce(self, predicate: Callable[[Any, Any], Any], identity=_identity_missin
def __reduce(self, pred, _):
return self.parallelizer.reduce(pred)

def to_dict(self, key_mapper: Callable[[Any], Any]) -> dict:
self._trigger_exec()
return dict(self._group_to_dict(key_mapper))

def _set_parallelizer_src(self):
self.parallelizer.set_source(self._source)
56 changes: 19 additions & 37 deletions pystreamapi/_streams/__sequential_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from optional import Optional

import pystreamapi._streams.__base_stream as stream
from pystreamapi._lazy.process import Process

_identity_missing = object()

Expand All @@ -16,11 +15,7 @@ def all_match(self, predicate: Callable[[Any], bool]):
self._trigger_exec()
return all(predicate(element) for element in self._source)

def filter(self, predicate: Callable[[Any], bool]):
self._queue.append(Process(self.__filter, predicate))
return self

def __filter(self, predicate: Callable[[Any], bool]):
def _filter(self, predicate: Callable[[Any], bool]):
self._source = [element for element in self._source if predicate(element)]

def find_any(self):
Expand All @@ -29,49 +24,32 @@ def find_any(self):
return Optional.of(self._source[0])
return Optional.empty()

def flat_map(self, predicate: Callable[[Any], stream.BaseStream]):
self._queue.append(Process(self.__flat_map, predicate))
return self

def __flat_map(self, predicate: Callable[[Any], stream.BaseStream]):
def _flat_map(self, predicate: Callable[[Any], stream.BaseStream]):
new_src = []
for element in [predicate(element) for element in self._source]:
new_src.extend(element.to_list())
self._source = new_src

def _group_to_dict(self, key_mapper: Callable[[Any], Any]):
groups = {}
for element in self._source:
key = key_mapper(element)
if key not in groups:
groups[key] = []
groups[key].append(element)
return groups

def for_each(self, predicate: Callable):
self._trigger_exec()
for element in self._source:
predicate(element)

def map(self, mapper: Callable[[Any], Any]):
self._queue.append(Process(self.__map, mapper))
return self

def __map(self, predicate: Callable[[Any], Any]):
self._source = [predicate(element) for element in self._source]

def map_to_int(self):
self._queue.append(Process(self.__map_to_int))
return self

def __map_to_int(self):
self.__map(int)
def _map(self, mapper: Callable[[Any], Any]):
self._source = [mapper(element) for element in self._source]

def map_to_str(self):
self._queue.append(Process(self.__map_to_str))
return self

def __map_to_str(self):
self.__map(str)

def peek(self, action: Callable):
self._queue.append(Process(self.__peek, action))
return self

def __peek(self, predicate: Callable):
def _peek(self, action: Callable):
for element in self._source:
predicate(element)
action(element)

def reduce(self, predicate: Callable, identity=_identity_missing, depends_on_state=False):
self._trigger_exec()
Expand All @@ -80,3 +58,7 @@ def reduce(self, predicate: Callable, identity=_identity_missing, depends_on_sta
return reduce(predicate, self._source)
return Optional.of(reduce(predicate, self._source))
return identity if identity is not _identity_missing else Optional.empty()

def to_dict(self, key_mapper: Callable[[Any], Any]) -> dict:
self._trigger_exec()
return self._group_to_dict(key_mapper)
2 changes: 1 addition & 1 deletion tests/date_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from datetime import datetime, timedelta, timezone
from unittest import TestCase


# skipcq: PTC-W0046
class DateTest(TestCase):

def setUp(self):
Expand Down
34 changes: 33 additions & 1 deletion tests/test_stream_implementation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
from optional import Optional
from optional.something import Something
from parameterized import parameterized_class

from pystreamapi._streams.__base_stream import BaseStream
from pystreamapi._streams.__parallel_stream import ParallelStream
from pystreamapi._streams.__sequential_stream import SequentialStream
from pystreamapi._streams.__base_stream import BaseStream
from pystreamapi._streams.numeric.__sequential_numeric_stream import SequentialNumericStream


Expand Down Expand Up @@ -126,6 +127,37 @@ def test_reduce_empty_stream_with_identity(self):
result = self.stream([]).reduce(lambda x, y: x + y, identity=0)
self.assertEqual(result, 0)

def test_group_by(self):
class Point:
def __init__(self, x, y):
self.x = x
self.y = y

pt1, pt2, pt3, pt4 = Point(1, 2), Point(1, 3), Point(2, 3), Point(2, 4)
result = self.stream([pt1, pt2, pt3, pt4]) \
.group_by(lambda p: p.x) \
.to_list()
self.assertListEqual(result, [(1, [pt1, pt2]), (2, [pt3, pt4])])

def test_group_by_empty(self):
result = self.stream([]).group_by(lambda x: x).to_list()
self.assertListEqual(result, [])

def test_to_dict(self):
class Point:
def __init__(self, x, y):
self.x = x
self.y = y

pt1, pt2, pt3, pt4 = Point(1, 2), Point(1, 3), Point(2, 3), Point(2, 4)
result = self.stream([pt1, pt2, pt3, pt4]) \
.to_dict(lambda p: p.x)
self.assertDictEqual(result, {1: [pt1, pt2], 2: [pt3, pt4]})

def test_to_dict_empty(self):
result = self.stream([]).to_dict(lambda x: x)
self.assertDictEqual(result, {})


if __name__ == '__main__':
unittest.main()

0 comments on commit cdb941d

Please sign in to comment.