Skip to content

Commit

Permalink
mypy: fix part of quixstreams.dataframe
Browse files Browse the repository at this point in the history
  • Loading branch information
quentin-quix committed Dec 17, 2024
1 parent fe1c0e9 commit e242b3b
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 81 deletions.
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ ignore_errors = true

[[tool.mypy.overrides]]
module = [
"quixstreams.dataframe.*",
"quixstreams.dataframe.series.*",
"quixstreams.dataframe.windows.*",
"quixstreams.rowproducer.*"
]
ignore_errors = true
9 changes: 7 additions & 2 deletions quixstreams/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def alter_context(value):
_current_message_context.set(context)


def message_context() -> Optional[MessageContext]:
def message_context() -> MessageContext:
"""
Get a MessageContext for the current message, which houses most of the message
metadata, like:
Expand All @@ -75,6 +75,11 @@ def message_context() -> Optional[MessageContext]:
:return: instance of `MessageContext`
"""
try:
return _current_message_context.get()
ctx = _current_message_context.get()
except LookupError:
raise MessageContextNotSetError("Message context is not set")

if ctx is None:
raise MessageContextNotSetError("Message context is not set")

return ctx
4 changes: 2 additions & 2 deletions quixstreams/core/stream/functions/apply.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class ApplyWithMetadataFunction(StreamFunction):

@overload
def __init__(
self, func: ApplyWithMetadataCallback, expand: Literal[False]
self, func: ApplyWithMetadataCallback, expand: Literal[False] = False
) -> None: ...

@overload
Expand All @@ -90,7 +90,7 @@ def __init__(

def __init__(
self,
func: ApplyWithMetadataCallback,
func: Union[ApplyWithMetadataCallback, ApplyWithMetadataExpandedCallback],
expand: bool = False,
):
super().__init__(func)
Expand Down
8 changes: 4 additions & 4 deletions quixstreams/core/stream/functions/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,20 @@ def get_executor(self, *child_executors: VoidExecutor) -> VoidExecutor:
child_executor = self._resolve_branching(*child_executors)

if self.expand:
expanded_callback = cast(TransformExpandedCallback, self.func)
expanded_func = cast(TransformExpandedCallback, self.func)

def wrapper(
value: Any,
key: Any,
timestamp: int,
headers: Any,
):
result = expanded_callback(value, key, timestamp, headers)
result = expanded_func(value, key, timestamp, headers)
for new_value, new_key, new_timestamp, new_headers in result:
child_executor(new_value, new_key, new_timestamp, new_headers)

else:
callback = cast(TransformCallback, self.func)
func = cast(TransformCallback, self.func)

def wrapper(
value: Any,
Expand All @@ -67,7 +67,7 @@ def wrapper(
headers: Any,
):
# Execute a function on a single value and return its result
new_value, new_key, new_timestamp, new_headers = callback(
new_value, new_key, new_timestamp, new_headers = func(
value, key, timestamp, headers
)
child_executor(new_value, new_key, new_timestamp, new_headers)
Expand Down
15 changes: 7 additions & 8 deletions quixstreams/core/stream/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def __repr__(self) -> str:
)
return f"<{self.__class__.__name__} [{len(tree_funcs)}]: {funcs_repr}>"

def diff(self, other: Self) -> Optional[Self]:
def diff(self, other: Self) -> Self:
"""
Takes the difference between Streams `self` and `other` based on their last
common parent, and returns a new, independent `Stream` that includes only
Expand All @@ -102,7 +102,7 @@ def diff(self, other: Self) -> Optional[Self]:
the `other` Stream, and the resulting diff is empty.
:param other: a `Stream` to take a diff from.
:raises ValueError: if Streams don't have a common parent,
:raises InvalidOperation: if Streams don't have a common parent,
if the diff is empty, or pruning failed.
:return: a new independent `Stream` instance whose root begins at the diff
"""
Expand Down Expand Up @@ -149,6 +149,10 @@ def diff(self, other: Self) -> Optional[Self]:
node.parent = parent
parent = node
self._prune(diff[0])

if parent is None:
raise InvalidOperation("No common parent found")

return parent

def root_path(self, allow_splits=True) -> List[Self]:
Expand Down Expand Up @@ -302,7 +306,7 @@ def _compose(

def _validate_func(
self,
func,
func: StreamFunction,
allow_filters: bool,
allow_updates: bool,
allow_expands: bool,
Expand Down Expand Up @@ -345,11 +349,6 @@ def add(self, func: StreamFunction) -> Self:
self.children.add(new_node)
return new_node

def _add(self, func: StreamFunction) -> Self:
new_node = self.__class__(func=func, parent=self)
self.children.add(new_node)
return new_node

def _default_sink(self, value: Any, key: Any, timestamp: int, headers: Any): ...

def _prune(self, other: Self):
Expand Down
9 changes: 7 additions & 2 deletions quixstreams/dataframe/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,14 @@ class BaseStreaming:
def stream(self) -> Stream: ...

@abc.abstractmethod
def compose(self, *args, **kwargs) -> VoidExecutor: ...
def compose(self, *args, **kwargs) -> dict[str, VoidExecutor]: ...

@abc.abstractmethod
def test(
self, value: Any, key: Any, timestamp: int, ctx: Optional[MessageContext] = None
self,
value: Any,
key: Any,
timestamp: int,
headers: Optional[Any] = None,
ctx: Optional[MessageContext] = None,
) -> Any: ...
Loading

0 comments on commit e242b3b

Please sign in to comment.