Skip to content

Commit

Permalink
Add stream converter
Browse files Browse the repository at this point in the history
Convert between numeric and non numeric as well  as sequential and parallel streams
  • Loading branch information
garlontas committed Jul 24, 2023
1 parent 3c43321 commit 8262859
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 14 deletions.
37 changes: 37 additions & 0 deletions pystreamapi/__stream_converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from pystreamapi._streams.__base_stream import BaseStream
from pystreamapi._streams.__parallel_stream import ParallelStream
from pystreamapi._streams.__sequential_stream import SequentialStream
from pystreamapi._streams.numeric.__numeric_base_stream import NumericBaseStream
from pystreamapi._streams.numeric.__parallel_numeric_stream import ParallelNumericStream
from pystreamapi._streams.numeric.__sequential_numeric_stream import SequentialNumericStream


class StreamConverter:
"""Class for converting streams to other types of streams."""

@staticmethod
def to_numeric_stream(stream: BaseStream) -> NumericBaseStream:
"""Converts a stream to a numeric stream."""
if isinstance(stream, SequentialStream):
stream.__class__ = SequentialNumericStream
if isinstance(stream, ParallelStream):
stream.__class__ = ParallelNumericStream
return stream

@staticmethod
def to_parallel_stream(stream: BaseStream) -> ParallelStream:
"""Converts a stream to a parallel stream."""
if isinstance(stream, SequentialNumericStream):
stream.__class__ = ParallelNumericStream
elif isinstance(stream, SequentialStream):
stream.__class__ = ParallelStream
return stream

@staticmethod
def to_sequential_stream(stream: BaseStream) -> SequentialStream:
"""Converts a stream to a sequential stream."""
if isinstance(stream, ParallelNumericStream):
stream.__class__ = SequentialNumericStream
elif isinstance(stream, ParallelStream):
stream.__class__ = SequentialStream
return stream
26 changes: 24 additions & 2 deletions pystreamapi/_streams/__base_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

if TYPE_CHECKING:
from pystreamapi._streams.numeric.__numeric_base_stream import NumericBaseStream
from pystreamapi._streams.__parallel_stream import ParallelStream
from pystreamapi._streams.__sequential_stream import SequentialStream

K = TypeVar('K')
_V = TypeVar('_V')
Expand Down Expand Up @@ -237,6 +239,15 @@ def __map_to_str(self):
"""Converts the stream to strings."""
self._map(str)

@_operation
def parallel(self) -> 'ParallelStream[K]':
"""
Returns a parallel stream. If the stream is already parallel, it is returned.
"""
# pylint: disable=import-outside-toplevel
from pystreamapi.__stream_converter import StreamConverter
return StreamConverter.to_parallel_stream(self)

@_operation
def peek(self, action: Callable) -> 'BaseStream[K]':
"""
Expand Down Expand Up @@ -269,6 +280,15 @@ def __reversed(self):
except TypeError:
self._source = reversed(list(self._source))

@_operation
def sequential(self) -> SequentialStream[K]:
"""
Returns a sequential stream. If the stream is already sequential, it is returned.
"""
# pylint: disable=import-outside-toplevel
from pystreamapi.__stream_converter import StreamConverter
return StreamConverter.to_sequential_stream(self)

@_operation
def skip(self, n: int) -> 'BaseStream[K]':
"""
Expand Down Expand Up @@ -430,6 +450,8 @@ def to_dict(self, key_mapper: Callable[[K], Any]) -> dict:
:param key_mapper:
"""

@abstractmethod
def _to_numeric_stream(self) -> NumericBaseStream:
"""Converts a stream to a numeric stream. To be implemented by subclasses."""
"""Converts a stream to a numeric stream using the stream converter"""
# pylint: disable=import-outside-toplevel
from pystreamapi.__stream_converter import StreamConverter
return StreamConverter.to_numeric_stream(self)
6 changes: 0 additions & 6 deletions pystreamapi/_streams/__parallel_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,3 @@ def _set_parallelizer_src(self):

def __mapper(self, mapper):
return lambda x: self._one(mapper=mapper, item=x)

def _to_numeric_stream(self):
# pylint: disable=import-outside-toplevel
from pystreamapi._streams.numeric.__parallel_numeric_stream import ParallelNumericStream
self.__class__ = ParallelNumericStream
return self
6 changes: 0 additions & 6 deletions pystreamapi/_streams/__sequential_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,3 @@ def reduce(self, predicate: Callable, identity=_identity_missing, depends_on_sta
@stream.terminal
def to_dict(self, key_mapper: Callable[[Any], Any]) -> dict:
return self._group_to_dict(key_mapper)

def _to_numeric_stream(self):
# pylint: disable=import-outside-toplevel
from pystreamapi._streams.numeric.__sequential_numeric_stream import SequentialNumericStream
self.__class__ = SequentialNumericStream
return self
54 changes: 54 additions & 0 deletions tests/test_stream_converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from unittest import TestCase

from pystreamapi._streams.__base_stream import BaseStream
from pystreamapi._streams.__parallel_stream import ParallelStream
from pystreamapi._streams.__sequential_stream import SequentialStream
from pystreamapi._streams.numeric.__parallel_numeric_stream import ParallelNumericStream
from pystreamapi._streams.numeric.__sequential_numeric_stream import SequentialNumericStream


class TestStreamConverter(TestCase):

def test_convert_to_numeric_stream_sequential(self):
stream = SequentialStream(["1", "2", "3"]).map_to_int()
self.assertIsInstance(stream, SequentialNumericStream)

def test_convert_to_numeric_stream_parallel(self):
stream = ParallelStream(["1", "2", "3"]).map_to_int()
self.assertIsInstance(stream, ParallelNumericStream)

def test_convert_to_numeric_stream_numeric_parallel(self):
stream = ParallelNumericStream(["1", "2", "3"]).map_to_int()
self.assertIsInstance(stream, ParallelNumericStream)

def test_convert_to_parallel_stream_sequential(self):
stream = SequentialStream(["1", "2", "3"]).parallel()
self.assertIsInstance(stream, ParallelStream)

def test_convert_to_parallel_stream_sequential_numeric(self):
stream = SequentialNumericStream(["1", "2", "3"]).parallel()
self.assertIsInstance(stream, ParallelNumericStream)

def test_convert_to_parallel_stream_parallel(self):
stream = ParallelStream(["1", "2", "3"]).parallel()
self.assertIsInstance(stream, ParallelStream)

def test_convert_to_parallel_stream_parallel_numeric(self):
stream = ParallelNumericStream(["1", "2", "3"]).parallel()
self.assertIsInstance(stream, ParallelNumericStream)

def test_convert_to_sequential_stream_sequential(self):
stream = SequentialStream(["1", "2", "3"]).sequential()
self.assertIsInstance(stream, SequentialStream)

def test_convert_to_sequential_stream_sequential_numeric(self):
stream = SequentialNumericStream(["1", "2", "3"]).sequential()
self.assertIsInstance(stream, SequentialNumericStream)

def test_convert_to_sequential_stream_parallel(self):
stream = ParallelStream(["1", "2", "3"]).sequential()
self.assertIsInstance(stream, SequentialStream)

def test_convert_to_sequential_stream_parallel_numeric(self):
stream = ParallelNumericStream(["1", "2", "3"]).sequential()
self.assertIsInstance(stream, SequentialNumericStream)

0 comments on commit 8262859

Please sign in to comment.