Skip to content

Commit

Permalink
Merge pull request #38 from PickwickSoft/feature/add-internal-optiona…
Browse files Browse the repository at this point in the history
…l-implementation

Feature/add internal optional implementation
  • Loading branch information
garlontas authored Jun 26, 2023
2 parents b27adff + 0c21643 commit 9d8de82
Show file tree
Hide file tree
Showing 10 changed files with 300 additions and 45 deletions.
55 changes: 30 additions & 25 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ packages = [
[tool.poetry.dependencies]
python = ">=3.7,<4.0"
joblib = "~1.2.0"
optional-py = "^1.3.2"

[tool.poetry.group.test.dependencies]
parameterized = "*"
Expand Down
114 changes: 114 additions & 0 deletions pystreamapi/__optional.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
class Optional:
"""
A container object which may or may not contain a non-none value.
If a value is present, `is_present()` will return `True` and `get()` will return the value.
If a value is not present, `is_present()` will return `False`
and `get()` will raise a `ValueError`.
Additional methods provide ways to handle the presence or absence of a contained value.
This class is inspired by Java's `Optional` class.
"""

def __init__(self, value=None):
"""
Constructs an Optional with the given value.
If the value is None, the Optional is considered empty.
"""
self._value = value
self._is_present = value is not None

@staticmethod
def of(value):
"""
Returns an Optional with the given non-none value.
Raises a ValueError if the value is None.
"""
if value is None:
raise ValueError("Value cannot be None")
return Optional(value)

@staticmethod
def empty():
"""Returns an empty Optional."""
return Optional()

def is_present(self):
"""Returns `True` if the Optional contains a non-none value, `False` otherwise."""
return self._is_present

def get(self):
"""Returns the value if present, otherwise raises a `ValueError`."""
if not self._is_present:
raise ValueError("Value is not present")
return self._value

def or_else(self, default_value):
"""Returns the value if present, otherwise returns the given default value."""
return self._value if self._is_present else default_value

def or_else_get(self, supplier):
"""
Returns the value if present, otherwise calls the given supplier function to get a
default value.
"""
return self._value if self._is_present else supplier()

def map(self, mapper):
"""
Applies the given mapper function to the value if present, returning a new Optional
with the result.
If the Optional is empty, returns an empty Optional.
"""
if not self._is_present:
return Optional()
mapped_value = mapper(self._value)
return Optional(mapped_value)

def flat_map(self, mapper):
"""
Applies the given mapper function to the value if present, returning the result.
If the Optional is empty, returns an empty Optional.
If the mapper function does not return an Optional, raises a TypeError.
"""
if not self._is_present:
return Optional()
optional_result = mapper(self._value)
if not isinstance(optional_result, Optional):
raise TypeError("Mapper function must return an Optional")
return optional_result

def filter(self, predicate):
"""
Returns an Optional containing the value if present and the predicate is true,
otherwise an empty Optional.
"""
return self if self._is_present and predicate(self._value) else Optional()

def if_present(self, consumer):
"""Calls the given consumer function with the value if present, otherwise does nothing."""
if self._is_present:
consumer(self._value)

def __str__(self):
"""Returns a string representation of the Optional."""
return f"Optional({self._value if self._is_present else ''})"

def __repr__(self):
"""Returns a string representation of the Optional."""
return self.__str__()

def __eq__(self, other):
"""
Returns `True` if the other object is an Optional with the same value,
`False` otherwise.
"""
return self._value == other._value if isinstance(other, Optional) else False

def __hash__(self):
"""Returns the hash of the Optional's value."""
return hash(self._value)
9 changes: 3 additions & 6 deletions pystreamapi/_streams/__base_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@
from abc import abstractmethod
from builtins import reversed
from functools import cmp_to_key
from typing import Iterable, Callable, Any, TypeVar, Iterator, Union

from optional import Optional
from optional.nothing import Nothing
from optional.something import Something
from typing import Iterable, Callable, Any, TypeVar, Iterator

from pystreamapi._lazy.process import Process
from pystreamapi._lazy.queue import ProcessQueue
from pystreamapi.__optional import Optional

K = TypeVar('K')
_V = TypeVar('_V')
Expand Down Expand Up @@ -320,7 +317,7 @@ def max(self):

@abstractmethod
def reduce(self, predicate: Callable[[K, K], K], identity=_identity_missing,
depends_on_state=False) -> Union[K, Something, Nothing]:
depends_on_state=False) -> Optional:
"""
Performs a reduction on the elements of this stream, using the provided identity value
and an associative accumulation function, and returns the reduced value.
Expand Down
10 changes: 5 additions & 5 deletions pystreamapi/_streams/__parallel_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Callable, Any, Iterable

from joblib import Parallel, delayed
from optional import Optional
from pystreamapi.__optional import Optional

import pystreamapi._streams.__base_stream as stream
from pystreamapi._parallel.fork_and_join import Parallelizer
Expand All @@ -15,7 +15,7 @@ class ParallelStream(stream.BaseStream):

def __init__(self, source: Iterable[stream.K]):
super().__init__(source)
self.parallelizer = Parallelizer()
self._parallelizer = Parallelizer()

def all_match(self, predicate: Callable[[Any], bool]):
self._trigger_exec()
Expand All @@ -24,7 +24,7 @@ def all_match(self, predicate: Callable[[Any], bool]):

def _filter(self, predicate: Callable[[Any], bool]):
self._set_parallelizer_src()
self._source = self.parallelizer.filter(predicate)
self._source = self._parallelizer.filter(predicate)

def find_any(self):
self._trigger_exec()
Expand Down Expand Up @@ -77,11 +77,11 @@ def reduce(self, predicate: Callable[[Any, Any], Any], identity=_identity_missin
return identity if identity is not _identity_missing else Optional.empty()

def __reduce(self, pred, _):
return self.parallelizer.reduce(pred)
return self._parallelizer.reduce(pred)

def to_dict(self, key_mapper: Callable[[Any], Any]) -> dict:
self._trigger_exec()
return dict(self._group_to_dict(key_mapper))

def _set_parallelizer_src(self):
self.parallelizer.set_source(self._source)
self._parallelizer.set_source(self._source)
2 changes: 1 addition & 1 deletion pystreamapi/_streams/__sequential_stream.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from functools import reduce
from typing import Callable, Any

from optional import Optional
from pystreamapi.__optional import Optional

import pystreamapi._streams.__base_stream as stream

Expand Down
2 changes: 1 addition & 1 deletion pystreamapi/_streams/numeric/__parallel_numeric_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ def sum(self) -> Union[float, int, None]:
def __sum(self):
"""Parallel sum method"""
self._set_parallelizer_src()
return self.parallelizer.reduce(lambda x, y: x + y)
return self._parallelizer.reduce(lambda x, y: x + y)
Loading

0 comments on commit 9d8de82

Please sign in to comment.