diff --git a/pystreamapi/_streams/__base_stream.py b/pystreamapi/_streams/__base_stream.py index 5338085..78a6595 100644 --- a/pystreamapi/_streams/__base_stream.py +++ b/pystreamapi/_streams/__base_stream.py @@ -71,15 +71,19 @@ def __drop_while(self, predicate: Callable[[Any], bool]): """Drops elements from the stream while the predicate is true.""" self._source = list(itertools.dropwhile(predicate, self._source)) - @abstractmethod - def filter(self, predicate: Callable[[K], bool]) -> 'BaseStream[_V]': + def filter(self, predicate: Callable[[K], bool]) -> 'BaseStream[K]': """ Returns a stream consisting of the elements of this stream that match the given predicate. :param predicate: """ + self._queue.append(Process(self._filter, predicate)) + return self @abstractmethod + def _filter(self, predicate: Callable[[K], bool]): + """Implementation of filter. Should be implemented by subclasses.""" + def flat_map(self, predicate: Callable[[K], Iterable[_V]]) -> 'BaseStream[_V]': """ Returns a stream consisting of the results of replacing each element of this stream with @@ -88,6 +92,31 @@ def flat_map(self, predicate: Callable[[K], Iterable[_V]]) -> 'BaseStream[_V]': :param predicate: """ + self._queue.append(Process(self._flat_map, predicate)) + return self + + @abstractmethod + def _flat_map(self, predicate: Callable[[K], Iterable[_V]]): + """Implementation of flat_map. Should be implemented by subclasses.""" + + def group_by(self, key_mapper: Callable[[K], Any]) -> 'BaseStream[K]': + """ + Returns a Stream consisting of the results of grouping the elements of this stream + by the given classifier and extracting the key/value pairs. + + :param key_mapper: + """ + self._queue.append(Process(self.__group_by, key_mapper)) + return self + + def __group_by(self, key_mapper: Callable[[Any], Any]): + """Groups the stream by the given key mapper. Uses the implementation of _group_to_dict.""" + groups = self._group_to_dict(key_mapper) + self._source = groups.items() + + @abstractmethod + def _group_to_dict(self, key_mapper: Callable[[K], Any]) -> dict[K, list]: + """Groups the stream into a dictionary. Should be implemented by subclasses.""" def limit(self, max_size: int) -> 'BaseStream[_V]': """ @@ -103,7 +132,6 @@ def __limit(self, max_size: int): """Limits the stream to the first n elements.""" self._source = itertools.islice(self._source, max_size) - @abstractmethod def map(self, mapper: Callable[[K], _V]) -> 'BaseStream[_V]': """ Returns a stream consisting of the results of applying the given function to the elements @@ -111,22 +139,37 @@ def map(self, mapper: Callable[[K], _V]) -> 'BaseStream[_V]': :param mapper: """ + self._queue.append(Process(self._map, mapper)) + return self @abstractmethod + def _map(self, mapper: Callable[[K], _V]): + """Implementation of map. Should be implemented by subclasses.""" + def map_to_int(self) -> 'BaseStream[_V]': """ Returns a stream consisting of the results of converting the elements of this stream to integers. """ + self._queue.append(Process(self.__map_to_int)) + return self + + def __map_to_int(self): + """Converts the stream to integers.""" + self._map(int) - @abstractmethod def map_to_str(self) -> 'BaseStream[_V]': """ Returns a stream consisting of the results of converting the elements of this stream to strings. """ + self._queue.append(Process(self.__map_to_str)) + return self + + def __map_to_str(self): + """Converts the stream to strings.""" + self._map(str) - @abstractmethod def peek(self, action: Callable) -> 'BaseStream[_V]': """ Returns a stream consisting of the elements of this stream, additionally performing the @@ -134,6 +177,12 @@ def peek(self, action: Callable) -> 'BaseStream[_V]': :param action: """ + self._queue.append(Process(self._peek, action)) + return self + + @abstractmethod + def _peek(self, action: Callable): + """Implementation of peek. Should be implemented by subclasses.""" def reversed(self) -> 'BaseStream[_V]': """ @@ -296,6 +345,15 @@ def to_set(self): self._trigger_exec() return set(self._source) + @abstractmethod + def to_dict(self, key_mapper: Callable[[K], Any]) -> dict: + """ + Returns a dictionary consisting of the results of grouping the elements of this stream + by the given classifier. + + :param key_mapper: + """ + def _trigger_exec(self): """Triggers execution of the stream.""" self._queue.execute_all() diff --git a/pystreamapi/_streams/__parallel_stream.py b/pystreamapi/_streams/__parallel_stream.py index afb38c4..1b4d0aa 100644 --- a/pystreamapi/_streams/__parallel_stream.py +++ b/pystreamapi/_streams/__parallel_stream.py @@ -5,7 +5,6 @@ from optional import Optional import pystreamapi._streams.__base_stream as stream -from pystreamapi._lazy.process import Process from pystreamapi._parallel.fork_and_join import Parallelizer _identity_missing = object() @@ -23,11 +22,7 @@ def all_match(self, predicate: Callable[[Any], bool]): return all(Parallel(n_jobs=-1, prefer="threads")(delayed(predicate)(element) for element in self._source)) - def filter(self, predicate: Callable[[Any], bool]): - self._queue.append(Process(self.__filter, predicate)) - return self - - def __filter(self, predicate: Callable[[Any], bool]): + def _filter(self, predicate: Callable[[Any], bool]): self._set_parallelizer_src() self._source = self.parallelizer.filter(predicate) @@ -37,50 +32,37 @@ def find_any(self): return Optional.of(self._source[0]) return Optional.empty() - def flat_map(self, predicate: Callable[[Any], stream.BaseStream]): - self._queue.append(Process(self.__flat_map, predicate)) - return self - - def __flat_map(self, predicate: Callable[[Any], stream.BaseStream]): + def _flat_map(self, predicate: Callable[[Any], stream.BaseStream]): new_src = [] for element in Parallel(n_jobs=-1, prefer="threads")(delayed(predicate)(element) for element in self._source): new_src.extend(element.to_list()) self._source = new_src + def _group_to_dict(self, key_mapper: Callable[[Any], Any]): + groups = {} + + def process_element(element): + key = key_mapper(element) + if key not in groups: + groups[key] = [] + groups[key].append(element) + + Parallel(n_jobs=-1, prefer="threads")(delayed(process_element)(element) + for element in self._source) + return groups + def for_each(self, predicate: Callable): self._trigger_exec() Parallel(n_jobs=-1, prefer="threads")(delayed(predicate)(element) for element in self._source) - def map(self, mapper: Callable[[Any], Any]): - self._queue.append(Process(self.__map, mapper)) - return self - - def __map(self, predicate: Callable[[Any], Any]): - self._source = Parallel(n_jobs=-1, prefer="threads")(delayed(predicate)(element) + def _map(self, mapper: Callable[[Any], Any]): + self._source = Parallel(n_jobs=-1, prefer="threads")(delayed(mapper)(element) for element in self._source) - def map_to_int(self): - self._queue.append(Process(self.__map_to_int)) - return self - - def __map_to_int(self): - self.__map(int) - - def map_to_str(self): - self._queue.append(Process(self.__map_to_str)) - return self - - def __map_to_str(self): - self.__map(str) - - def peek(self, action: Callable): - self._queue.append(Process(self.__peek, action)) - return self - - def __peek(self, predicate: Callable): - Parallel(n_jobs=-1, prefer="threads")(delayed(predicate)(element) + def _peek(self, action: Callable): + Parallel(n_jobs=-1, prefer="threads")(delayed(action)(element) for element in self._source) def reduce(self, predicate: Callable[[Any, Any], Any], identity=_identity_missing, @@ -97,5 +79,9 @@ def reduce(self, predicate: Callable[[Any, Any], Any], identity=_identity_missin def __reduce(self, pred, _): return self.parallelizer.reduce(pred) + def to_dict(self, key_mapper: Callable[[Any], Any]) -> dict: + self._trigger_exec() + return dict(self._group_to_dict(key_mapper)) + def _set_parallelizer_src(self): self.parallelizer.set_source(self._source) diff --git a/pystreamapi/_streams/__sequential_stream.py b/pystreamapi/_streams/__sequential_stream.py index cda8672..f49026b 100644 --- a/pystreamapi/_streams/__sequential_stream.py +++ b/pystreamapi/_streams/__sequential_stream.py @@ -4,7 +4,6 @@ from optional import Optional import pystreamapi._streams.__base_stream as stream -from pystreamapi._lazy.process import Process _identity_missing = object() @@ -16,11 +15,7 @@ def all_match(self, predicate: Callable[[Any], bool]): self._trigger_exec() return all(predicate(element) for element in self._source) - def filter(self, predicate: Callable[[Any], bool]): - self._queue.append(Process(self.__filter, predicate)) - return self - - def __filter(self, predicate: Callable[[Any], bool]): + def _filter(self, predicate: Callable[[Any], bool]): self._source = [element for element in self._source if predicate(element)] def find_any(self): @@ -29,49 +24,32 @@ def find_any(self): return Optional.of(self._source[0]) return Optional.empty() - def flat_map(self, predicate: Callable[[Any], stream.BaseStream]): - self._queue.append(Process(self.__flat_map, predicate)) - return self - - def __flat_map(self, predicate: Callable[[Any], stream.BaseStream]): + def _flat_map(self, predicate: Callable[[Any], stream.BaseStream]): new_src = [] for element in [predicate(element) for element in self._source]: new_src.extend(element.to_list()) self._source = new_src + def _group_to_dict(self, key_mapper: Callable[[Any], Any]): + groups = {} + for element in self._source: + key = key_mapper(element) + if key not in groups: + groups[key] = [] + groups[key].append(element) + return groups + def for_each(self, predicate: Callable): self._trigger_exec() for element in self._source: predicate(element) - def map(self, mapper: Callable[[Any], Any]): - self._queue.append(Process(self.__map, mapper)) - return self - - def __map(self, predicate: Callable[[Any], Any]): - self._source = [predicate(element) for element in self._source] - - def map_to_int(self): - self._queue.append(Process(self.__map_to_int)) - return self - - def __map_to_int(self): - self.__map(int) + def _map(self, mapper: Callable[[Any], Any]): + self._source = [mapper(element) for element in self._source] - def map_to_str(self): - self._queue.append(Process(self.__map_to_str)) - return self - - def __map_to_str(self): - self.__map(str) - - def peek(self, action: Callable): - self._queue.append(Process(self.__peek, action)) - return self - - def __peek(self, predicate: Callable): + def _peek(self, action: Callable): for element in self._source: - predicate(element) + action(element) def reduce(self, predicate: Callable, identity=_identity_missing, depends_on_state=False): self._trigger_exec() @@ -80,3 +58,7 @@ def reduce(self, predicate: Callable, identity=_identity_missing, depends_on_sta return reduce(predicate, self._source) return Optional.of(reduce(predicate, self._source)) return identity if identity is not _identity_missing else Optional.empty() + + def to_dict(self, key_mapper: Callable[[Any], Any]) -> dict: + self._trigger_exec() + return self._group_to_dict(key_mapper) diff --git a/tests/date_test.py b/tests/date_test.py index 3b0bcab..63bc5ce 100644 --- a/tests/date_test.py +++ b/tests/date_test.py @@ -3,7 +3,7 @@ from datetime import datetime, timedelta, timezone from unittest import TestCase - +# skipcq: PTC-W0046 class DateTest(TestCase): def setUp(self): diff --git a/tests/test_stream_implementation.py b/tests/test_stream_implementation.py index f1e0b41..006a124 100644 --- a/tests/test_stream_implementation.py +++ b/tests/test_stream_implementation.py @@ -3,9 +3,10 @@ from optional import Optional from optional.something import Something from parameterized import parameterized_class + +from pystreamapi._streams.__base_stream import BaseStream from pystreamapi._streams.__parallel_stream import ParallelStream from pystreamapi._streams.__sequential_stream import SequentialStream -from pystreamapi._streams.__base_stream import BaseStream from pystreamapi._streams.numeric.__sequential_numeric_stream import SequentialNumericStream @@ -126,6 +127,37 @@ def test_reduce_empty_stream_with_identity(self): result = self.stream([]).reduce(lambda x, y: x + y, identity=0) self.assertEqual(result, 0) + def test_group_by(self): + class Point: + def __init__(self, x, y): + self.x = x + self.y = y + + pt1, pt2, pt3, pt4 = Point(1, 2), Point(1, 3), Point(2, 3), Point(2, 4) + result = self.stream([pt1, pt2, pt3, pt4]) \ + .group_by(lambda p: p.x) \ + .to_list() + self.assertListEqual(result, [(1, [pt1, pt2]), (2, [pt3, pt4])]) + + def test_group_by_empty(self): + result = self.stream([]).group_by(lambda x: x).to_list() + self.assertListEqual(result, []) + + def test_to_dict(self): + class Point: + def __init__(self, x, y): + self.x = x + self.y = y + + pt1, pt2, pt3, pt4 = Point(1, 2), Point(1, 3), Point(2, 3), Point(2, 4) + result = self.stream([pt1, pt2, pt3, pt4]) \ + .to_dict(lambda p: p.x) + self.assertDictEqual(result, {1: [pt1, pt2], 2: [pt3, pt4]}) + + def test_to_dict_empty(self): + result = self.stream([]).to_dict(lambda x: x) + self.assertDictEqual(result, {}) + if __name__ == '__main__': unittest.main()