From 5e65992eaaf0a5d4ec6d0df15d10ae4d56abdce2 Mon Sep 17 00:00:00 2001 From: Doru Date: Mon, 10 Jun 2024 14:20:32 +0300 Subject: [PATCH] Add skip_list --- src/stateful_data_processor/processor.py | 10 ++++++++-- tests/test_processor.py | 24 +++++++++++++++++++++--- 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/src/stateful_data_processor/processor.py b/src/stateful_data_processor/processor.py index 3daace5..eac276f 100644 --- a/src/stateful_data_processor/processor.py +++ b/src/stateful_data_processor/processor.py @@ -29,9 +29,11 @@ def __init__( logger: Optional[Logger] = None, should_read: Optional[bool] = True, print_interval: Optional[int] = 1, + skip_list: Optional[Collection[Any]] = None, ): self.file_rw = file_rw self.print_interval = print_interval + self.skip_list = skip_list if logger is None: self.logger = getLogger("StatefulDataProcessor") else: @@ -78,10 +80,14 @@ def _iterate_items(self, items: Collection[Any], *args, **kwargs): self.logger.info(f"Item {item} already processed, skipping...") continue + if self.skip_list and item in self.skip_list: + self.logger.info(f"Item {item} in skip list, skipping...") + continue + self.process_item(item, iteration_index, *args, **kwargs) if (iteration_index) % self.print_interval == 0: - self.logger.info(f"Processed item {item} {len(self.data)} / {items_len}") - self.logger.info("Finished processing all items.") + self.logger.info(f"Processed item {item} {iteration_index + 1} / {items_len}") + self.logger.info(f"Finished processing all items. {len(self.data)} / {items_len} items processed.") @abstractmethod def process_item( diff --git a/tests/test_processor.py b/tests/test_processor.py index 98f7698..c193f79 100644 --- a/tests/test_processor.py +++ b/tests/test_processor.py @@ -88,7 +88,7 @@ def test_process_data(self): call("Processed item a 1 / 3"), call("Processed item b 2 / 3"), call("Processed item c 3 / 3"), - call("Finished processing all items."), + call("Finished processing all items. 3 / 3 items processed."), ] self.mock_logger.info.assert_has_calls(calls, any_order=True) wait_for_file(TEST_FILE_JSON_PATH) @@ -168,7 +168,7 @@ def test_resumes_after_termination_with_saved_state(self): call("Item a already processed, skipping..."), call("Processed item b 2 / 3"), call("Processed item c 3 / 3"), - call("Finished processing all items."), + call("Finished processing all items. 3 / 3 items processed."), ] self.mock_logger.info.assert_has_calls(calls, any_order=True) @@ -182,6 +182,24 @@ def test_number_processor_print_interval(self): calls = [ call("Processed item 1 1 / 4"), call("Processed item 3 3 / 4"), - call("Finished processing all items."), + call("Finished processing all items. 4 / 4 items processed."), + ] + self.mock_logger.info.assert_has_calls(calls, any_order=True) + + def test_number_processor_skip_list(self): + skip_list = [2, 4, 7] + processor = NumberProcessor( + self.file_rw, should_read=False, logger=self.mock_logger, skip_list=skip_list + ) + processor.run(items=[1, 2, 3, 4, 7], delay=0) + self.assertEqual(processor.data, {1: "a1", 3: "c9"}) + + calls = [ + call("Processed item 1 1 / 5"), + call("Item 2 in skip list, skipping..."), + call("Processed item 3 3 / 5"), + call("Item 4 in skip list, skipping..."), + call("Item 7 in skip list, skipping..."), + call("Finished processing all items. 2 / 5 items processed."), ] self.mock_logger.info.assert_has_calls(calls, any_order=True)