diff --git a/pystreamapi/_parallel/fork_and_join.py b/pystreamapi/_parallel/fork_and_join.py index ee74c9e..1c3b985 100644 --- a/pystreamapi/_parallel/fork_and_join.py +++ b/pystreamapi/_parallel/fork_and_join.py @@ -1,12 +1,13 @@ # pylint: disable=protected-access import os - from typing import Callable, Any, Optional -from pystreamapi._parallel.parallelizer import Parallel, delayed +from joblib import delayed + +from pystreamapi._itertools.tools import reduce +from pystreamapi._parallel.parallelizer import Parallel from pystreamapi._streams.error.__error import ErrorHandler from pystreamapi._streams.error.__levels import ErrorLevel -from pystreamapi._itertools.tools import reduce class Parallelizer: diff --git a/pystreamapi/_parallel/parallelizer.py b/pystreamapi/_parallel/parallelizer.py index 01dad67..995d1db 100644 --- a/pystreamapi/_parallel/parallelizer.py +++ b/pystreamapi/_parallel/parallelizer.py @@ -1,4 +1,4 @@ -from joblib import Parallel as _JoblibParallel, delayed # pylint: disable=unused-import +from joblib import Parallel as _JoblibParallel # pylint: disable=unused-import from pystreamapi._streams.error.__error import ErrorHandler from pystreamapi._streams.error.__levels import ErrorLevel @@ -7,7 +7,7 @@ class Parallel: """Wrapper for joblib.Parallel supporting error handling""" - def __init__(self, n_jobs=-1, prefer="processes", handler: ErrorHandler=None): + def __init__(self, n_jobs=-1, prefer="processes", handler: ErrorHandler = None): self.n_jobs = n_jobs self.prefer = prefer self.handler = handler diff --git a/pystreamapi/_streams/__parallel_stream.py b/pystreamapi/_streams/__parallel_stream.py index 29a99dc..7583b20 100644 --- a/pystreamapi/_streams/__parallel_stream.py +++ b/pystreamapi/_streams/__parallel_stream.py @@ -2,13 +2,13 @@ from functools import reduce as seq_reduce from typing import Callable, Any, Iterable -from pystreamapi._streams.__base_stream import terminal -from pystreamapi._parallel.parallelizer import Parallel, delayed - -from pystreamapi.__optional import Optional +from joblib import delayed import pystreamapi._streams.__base_stream as stream +from pystreamapi.__optional import Optional from pystreamapi._parallel.fork_and_join import Parallelizer +from pystreamapi._parallel.parallelizer import Parallel +from pystreamapi._streams.__base_stream import terminal _identity_missing = object() diff --git a/tests/test_error_handler_streams.py b/tests/test_error_handler_streams.py index a939ffc..2be86c7 100644 --- a/tests/test_error_handler_streams.py +++ b/tests/test_error_handler_streams.py @@ -2,9 +2,9 @@ from parameterized import parameterized_class -from pystreamapi._streams.error.__levels import ErrorLevel from pystreamapi._streams.__parallel_stream import ParallelStream from pystreamapi._streams.__sequential_stream import SequentialStream +from pystreamapi._streams.error.__levels import ErrorLevel from pystreamapi._streams.numeric.__parallel_numeric_stream import ParallelNumericStream from pystreamapi._streams.numeric.__sequential_numeric_stream import SequentialNumericStream @@ -95,12 +95,12 @@ def test_map_to_str_ignore(self): .error_level(ErrorLevel.IGNORE).map_to_str().to_list() self.assertListEqual(result, ["1", "2", "a"]) - def peek_raise(self): + def test_peek_raise(self): with self.assertRaises(ValueError): - self.stream([1, 2, 3, "a"])\ - .error_level(ErrorLevel.RAISE).peek(int) + self.stream([1, 2, 3, "a"]) \ + .error_level(ErrorLevel.RAISE).peek(int).to_list() - def peek_ignore(self): + def test_peek_ignore(self): result = self.stream([1, 2, 3, "a"])\ .error_level(ErrorLevel.IGNORE).peek(int).to_list() self.assertListEqual(result, [1, 2, 3, "a"]) diff --git a/tests/test_json_loader.py b/tests/test_json_loader.py index e4d61de..7532af4 100644 --- a/tests/test_json_loader.py +++ b/tests/test_json_loader.py @@ -3,6 +3,7 @@ from unittest import TestCase from unittest.mock import patch, mock_open +from file_test import OPEN, PATH_EXISTS, PATH_ISFILE from pystreamapi.loaders import json file_content = """ @@ -22,9 +23,9 @@ class TestJsonLoader(TestCase): def test_json_loader_from_file(self): - with (patch('builtins.open', mock_open(read_data=file_content)), - patch('os.path.exists', return_value=True), - patch('os.path.isfile', return_value=True)): + with (patch(OPEN, mock_open(read_data=file_content)), + patch(PATH_EXISTS, return_value=True), + patch(PATH_ISFILE, return_value=True)): data = json('path/to/data.json') self.assertEqual(len(data), 2) self.assertEqual(data[0].attr1, 1) @@ -35,16 +36,16 @@ def test_json_loader_from_file(self): self.assertIsInstance(data[1].attr1, str) def test_json_loader_is_iterable(self): - with (patch('builtins.open', mock_open(read_data=file_content)), - patch('os.path.exists', return_value=True), - patch('os.path.isfile', return_value=True)): + with (patch(OPEN, mock_open(read_data=file_content)), + patch(PATH_EXISTS, return_value=True), + patch(PATH_ISFILE, return_value=True)): data = json('path/to/data.json') self.assertEqual(len(list(iter(data))), 2) def test_json_loader_with_empty_file(self): - with (patch('builtins.open', mock_open(read_data="")), - patch('os.path.exists', return_value=True), - patch('os.path.isfile', return_value=True)): + with (patch(OPEN, mock_open(read_data="")), + patch(PATH_EXISTS, return_value=True), + patch(PATH_ISFILE, return_value=True)): data = json('path/to/data.json') self.assertEqual(len(data), 0) diff --git a/tests/test_numeric_base_stream.py b/tests/test_numeric_base_stream.py index 38dc3cf..337a27d 100644 --- a/tests/test_numeric_base_stream.py +++ b/tests/test_numeric_base_stream.py @@ -11,7 +11,7 @@ def test_range(self): def test_range_empty(self): result = Stream([]).range() - self.assertEqual(result, None) + self.assertIsNone(result) def test_range_negative(self): result = Stream([-1, -2, -3, -4, -5]).range() @@ -23,7 +23,7 @@ def test_interquartile_range(self): def test_interquartile_range_empty(self): result = Stream([]).interquartile_range() - self.assertEqual(result, None) + self.assertIsNone(result) def test_interquartile_range_odd(self): result = Stream([1, 2, 3, 4, 5, 6, 7, 8, 9]).interquartile_range() @@ -39,7 +39,7 @@ def test_median_even(self): def test_median_empty(self): result = Stream([]).median() - self.assertEqual(result, None) + self.assertIsNone(result) def test_first_quartile(self): result = Stream([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).first_quartile() @@ -47,7 +47,7 @@ def test_first_quartile(self): def test_first_quartile_empty(self): result = Stream([]).first_quartile() - self.assertEqual(result, None) + self.assertIsNone(result) def test_first_quartile_odd(self): result = Stream([1, 2, 3, 4, 5, 6, 7, 8, 9]).first_quartile() @@ -59,7 +59,7 @@ def test_third_quartile(self): def test_third_quartile_empty(self): result = Stream([]).third_quartile() - self.assertEqual(result, None) + self.assertIsNone(result) def test_third_quartile_odd(self): result = Stream([1, 2, 3, 4, 5, 6, 7, 8, 9]).third_quartile() @@ -75,7 +75,7 @@ def test_mode_multiple(self): def test_mode_empty(self): result = Stream([]).mode() - self.assertEqual(result, None) + self.assertIsNone(result) def test_mode_negative(self): result = Stream([-1, -2, -3, -3]).mode() diff --git a/tests/test_numeric_stream_implementation.py b/tests/test_numeric_stream_implementation.py index 232fda5..d643664 100644 --- a/tests/test_numeric_stream_implementation.py +++ b/tests/test_numeric_stream_implementation.py @@ -18,7 +18,7 @@ def test_mean(self): def test_mean_empty(self): result = self.stream([]).mean() - self.assertEqual(result, None) + self.assertIsNone(result) def test_mean_negative(self): result = self.stream([-1, -2, -3, -4, -5]).mean() diff --git a/tests/test_stream_implementation.py b/tests/test_stream_implementation.py index 71cafc9..17c1e5f 100644 --- a/tests/test_stream_implementation.py +++ b/tests/test_stream_implementation.py @@ -2,13 +2,13 @@ from parameterized import parameterized_class +from pystreamapi.__optional import Optional from pystreamapi._streams.__base_stream import BaseStream from pystreamapi._streams.__parallel_stream import ParallelStream from pystreamapi._streams.__sequential_stream import SequentialStream from pystreamapi._streams.numeric.__numeric_base_stream import NumericBaseStream from pystreamapi._streams.numeric.__parallel_numeric_stream import ParallelNumericStream from pystreamapi._streams.numeric.__sequential_numeric_stream import SequentialNumericStream -from pystreamapi.__optional import Optional @parameterized_class("stream", [ @@ -41,7 +41,7 @@ def test_map_to_int_empty(self): def test_map_to_int_returns_numeric_stream(self): result = self.stream(["1", "2", "3", "9"]).map_to_int() - self.assertTrue(isinstance(result, NumericBaseStream)) + self.assertIsInstance(result, NumericBaseStream) def test_map_to_float(self): result = self.stream(["1", "2", "3", "9"]).map_to_float().to_list() @@ -53,7 +53,7 @@ def test_map_to_float_empty(self): def test_map_to_float_returns_numeric_stream(self): result = self.stream(["1", "2", "3", "9"]).map_to_float() - self.assertTrue(isinstance(result, NumericBaseStream)) + self.assertIsInstance(result, NumericBaseStream) def test_map_to_str(self): result = self.stream([1, 2, 3, 9]).map_to_str().to_list() @@ -61,11 +61,11 @@ def test_map_to_str(self): def test_convert_to_numeric_stream(self): result = self.stream([1, 2, 3, 9]).numeric() - self.assertTrue(isinstance(result, NumericBaseStream)) + self.assertIsInstance(result, NumericBaseStream) def test_convert_to_numeric_stream_is_already_numeric(self): result = self.stream([1.0, 2.0, 3.0, 9.0]).numeric() - self.assertTrue(isinstance(result, NumericBaseStream)) + self.assertIsInstance(result, NumericBaseStream) def test_flat_map(self): result = self.stream([1, 2, 3, 9]).flat_map(lambda x: self.stream([x, x])).to_list() @@ -93,7 +93,7 @@ def test_filter_complex(self): def test_filter_lazy(self): result = self.stream([1, 2, 3]).filter(lambda x: x > 1) self.assertListEqual(result.to_list(), [2, 3]) - self.assertTrue(isinstance(result, BaseStream)) + self.assertIsInstance(result, BaseStream) def test_peek(self): src = []