Skip to content

Commit

Permalink
Merge pull request #50 from PickwickSoft/bugfix/fix-base-stream-knows…
Browse files Browse the repository at this point in the history
…-subclasses

Add stream converter
  • Loading branch information
garlontas authored Jul 24, 2023
2 parents 3c43321 + 7c6b180 commit 60098c8
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 21 deletions.
8 changes: 4 additions & 4 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,8 @@
# Minimum lines number of a similarity.
min-similarity-lines=-1

[tests/*.py]
disable=missing-function-docstring,no-member,missing-class-docstring,too-few-public-methods,too-many-public-methods,cyclic-import,import-error

[MESSAGES CONTROL]
disable=missing-function-docstring,missing-class-docstring,missing-module-docstring,import-error,too-few-public-methods,invalid-name,no-member,too-many-public-methods
disable=invalid-name,missing-module-docstring
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "streams.py"
version = "0.3.1"
version = "0.3.2"
authors = ["Stefan Garlonta <stefan@pickwicksoft.org>"]
description = "A stream library for Python inspired by Java Stream API"
keywords = ["streams", "parallel", "data"]
Expand Down
2 changes: 1 addition & 1 deletion pystreamapi/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pystreamapi.__stream import Stream
from pystreamapi._streams.error.__levels import ErrorLevel

__version__ = "0.3.1"
__version__ = "0.3.2"
__all__ = ["Stream", "ErrorLevel"]
37 changes: 37 additions & 0 deletions pystreamapi/__stream_converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from pystreamapi._streams.__base_stream import BaseStream
from pystreamapi._streams.__parallel_stream import ParallelStream
from pystreamapi._streams.__sequential_stream import SequentialStream
from pystreamapi._streams.numeric.__numeric_base_stream import NumericBaseStream
from pystreamapi._streams.numeric.__parallel_numeric_stream import ParallelNumericStream
from pystreamapi._streams.numeric.__sequential_numeric_stream import SequentialNumericStream


class StreamConverter:
"""Class for converting streams to other types of streams."""

@staticmethod
def to_numeric_stream(stream: BaseStream) -> NumericBaseStream:
"""Converts a stream to a numeric stream."""
if isinstance(stream, SequentialStream):
stream.__class__ = SequentialNumericStream
if isinstance(stream, ParallelStream):
stream.__class__ = ParallelNumericStream
return stream

@staticmethod
def to_parallel_stream(stream: BaseStream) -> ParallelStream:
"""Converts a stream to a parallel stream."""
if isinstance(stream, SequentialNumericStream):
stream.__class__ = ParallelNumericStream
elif isinstance(stream, SequentialStream):
stream.__class__ = ParallelStream
return stream

@staticmethod
def to_sequential_stream(stream: BaseStream) -> SequentialStream:
"""Converts a stream to a sequential stream."""
if isinstance(stream, ParallelNumericStream):
stream.__class__ = SequentialNumericStream
elif isinstance(stream, ParallelStream):
stream.__class__ = SequentialStream
return stream
22 changes: 20 additions & 2 deletions pystreamapi/_streams/__base_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

if TYPE_CHECKING:
from pystreamapi._streams.numeric.__numeric_base_stream import NumericBaseStream
from pystreamapi._streams.__parallel_stream import ParallelStream
from pystreamapi._streams.__sequential_stream import SequentialStream

K = TypeVar('K')
_V = TypeVar('_V')
Expand Down Expand Up @@ -237,6 +239,13 @@ def __map_to_str(self):
"""Converts the stream to strings."""
self._map(str)

@_operation
def parallel(self) -> 'ParallelStream[K]':
"""Returns a parallel stream. If the stream is already parallel, it is returned."""
# pylint: disable=import-outside-toplevel
from pystreamapi.__stream_converter import StreamConverter
return StreamConverter.to_parallel_stream(self)

@_operation
def peek(self, action: Callable) -> 'BaseStream[K]':
"""
Expand Down Expand Up @@ -269,6 +278,13 @@ def __reversed(self):
except TypeError:
self._source = reversed(list(self._source))

@_operation
def sequential(self) -> SequentialStream[K]:
"""Returns a sequential stream. If the stream is already sequential, it is returned."""
# pylint: disable=import-outside-toplevel
from pystreamapi.__stream_converter import StreamConverter
return StreamConverter.to_sequential_stream(self)

@_operation
def skip(self, n: int) -> 'BaseStream[K]':
"""
Expand Down Expand Up @@ -430,6 +446,8 @@ def to_dict(self, key_mapper: Callable[[K], Any]) -> dict:
:param key_mapper:
"""

@abstractmethod
def _to_numeric_stream(self) -> NumericBaseStream:
"""Converts a stream to a numeric stream. To be implemented by subclasses."""
"""Converts a stream to a numeric stream using the stream converter"""
# pylint: disable=import-outside-toplevel
from pystreamapi.__stream_converter import StreamConverter
return StreamConverter.to_numeric_stream(self)
6 changes: 0 additions & 6 deletions pystreamapi/_streams/__parallel_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,3 @@ def _set_parallelizer_src(self):

def __mapper(self, mapper):
return lambda x: self._one(mapper=mapper, item=x)

def _to_numeric_stream(self):
# pylint: disable=import-outside-toplevel
from pystreamapi._streams.numeric.__parallel_numeric_stream import ParallelNumericStream
self.__class__ = ParallelNumericStream
return self
6 changes: 0 additions & 6 deletions pystreamapi/_streams/__sequential_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,3 @@ def reduce(self, predicate: Callable, identity=_identity_missing, depends_on_sta
@stream.terminal
def to_dict(self, key_mapper: Callable[[Any], Any]) -> dict:
return self._group_to_dict(key_mapper)

def _to_numeric_stream(self):
# pylint: disable=import-outside-toplevel
from pystreamapi._streams.numeric.__sequential_numeric_stream import SequentialNumericStream
self.__class__ = SequentialNumericStream
return self
53 changes: 53 additions & 0 deletions tests/test_stream_converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from unittest import TestCase

from pystreamapi._streams.__parallel_stream import ParallelStream
from pystreamapi._streams.__sequential_stream import SequentialStream
from pystreamapi._streams.numeric.__parallel_numeric_stream import ParallelNumericStream
from pystreamapi._streams.numeric.__sequential_numeric_stream import SequentialNumericStream


class TestStreamConverter(TestCase):

def test_convert_to_numeric_stream_sequential(self):
stream = SequentialStream(["1", "2", "3"]).map_to_int()
self.assertIsInstance(stream, SequentialNumericStream)

def test_convert_to_numeric_stream_parallel(self):
stream = ParallelStream(["1", "2", "3"]).map_to_int()
self.assertIsInstance(stream, ParallelNumericStream)

def test_convert_to_numeric_stream_numeric_parallel(self):
stream = ParallelNumericStream(["1", "2", "3"]).map_to_int()
self.assertIsInstance(stream, ParallelNumericStream)

def test_convert_to_parallel_stream_sequential(self):
stream = SequentialStream(["1", "2", "3"]).parallel()
self.assertIsInstance(stream, ParallelStream)

def test_convert_to_parallel_stream_sequential_numeric(self):
stream = SequentialNumericStream(["1", "2", "3"]).parallel()
self.assertIsInstance(stream, ParallelNumericStream)

def test_convert_to_parallel_stream_parallel(self):
stream = ParallelStream(["1", "2", "3"]).parallel()
self.assertIsInstance(stream, ParallelStream)

def test_convert_to_parallel_stream_parallel_numeric(self):
stream = ParallelNumericStream(["1", "2", "3"]).parallel()
self.assertIsInstance(stream, ParallelNumericStream)

def test_convert_to_sequential_stream_sequential(self):
stream = SequentialStream(["1", "2", "3"]).sequential()
self.assertIsInstance(stream, SequentialStream)

def test_convert_to_sequential_stream_sequential_numeric(self):
stream = SequentialNumericStream(["1", "2", "3"]).sequential()
self.assertIsInstance(stream, SequentialNumericStream)

def test_convert_to_sequential_stream_parallel(self):
stream = ParallelStream(["1", "2", "3"]).sequential()
self.assertIsInstance(stream, SequentialStream)

def test_convert_to_sequential_stream_parallel_numeric(self):
stream = ParallelNumericStream(["1", "2", "3"]).sequential()
self.assertIsInstance(stream, SequentialNumericStream)

0 comments on commit 60098c8

Please sign in to comment.