Skip to content

Commit

Permalink
Add skip_list
Browse files Browse the repository at this point in the history
  • Loading branch information
doruirimescu committed Jun 10, 2024
1 parent 7d29d01 commit 5e65992
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 5 deletions.
10 changes: 8 additions & 2 deletions src/stateful_data_processor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ def __init__(
logger: Optional[Logger] = None,
should_read: Optional[bool] = True,
print_interval: Optional[int] = 1,
skip_list: Optional[Collection[Any]] = None,
):
self.file_rw = file_rw
self.print_interval = print_interval
self.skip_list = skip_list
if logger is None:
self.logger = getLogger("StatefulDataProcessor")
else:
Expand Down Expand Up @@ -78,10 +80,14 @@ def _iterate_items(self, items: Collection[Any], *args, **kwargs):
self.logger.info(f"Item {item} already processed, skipping...")
continue

if self.skip_list and item in self.skip_list:
self.logger.info(f"Item {item} in skip list, skipping...")
continue

self.process_item(item, iteration_index, *args, **kwargs)
if (iteration_index) % self.print_interval == 0:
self.logger.info(f"Processed item {item} {len(self.data)} / {items_len}")
self.logger.info("Finished processing all items.")
self.logger.info(f"Processed item {item} {iteration_index + 1} / {items_len}")
self.logger.info(f"Finished processing all items. {len(self.data)} / {items_len} items processed.")

@abstractmethod
def process_item(
Expand Down
24 changes: 21 additions & 3 deletions tests/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def test_process_data(self):
call("Processed item a 1 / 3"),
call("Processed item b 2 / 3"),
call("Processed item c 3 / 3"),
call("Finished processing all items."),
call("Finished processing all items. 3 / 3 items processed."),
]
self.mock_logger.info.assert_has_calls(calls, any_order=True)
wait_for_file(TEST_FILE_JSON_PATH)
Expand Down Expand Up @@ -168,7 +168,7 @@ def test_resumes_after_termination_with_saved_state(self):
call("Item a already processed, skipping..."),
call("Processed item b 2 / 3"),
call("Processed item c 3 / 3"),
call("Finished processing all items."),
call("Finished processing all items. 3 / 3 items processed."),
]
self.mock_logger.info.assert_has_calls(calls, any_order=True)

Expand All @@ -182,6 +182,24 @@ def test_number_processor_print_interval(self):
calls = [
call("Processed item 1 1 / 4"),
call("Processed item 3 3 / 4"),
call("Finished processing all items."),
call("Finished processing all items. 4 / 4 items processed."),
]
self.mock_logger.info.assert_has_calls(calls, any_order=True)

def test_number_processor_skip_list(self):
skip_list = [2, 4, 7]
processor = NumberProcessor(
self.file_rw, should_read=False, logger=self.mock_logger, skip_list=skip_list
)
processor.run(items=[1, 2, 3, 4, 7], delay=0)
self.assertEqual(processor.data, {1: "a1", 3: "c9"})

calls = [
call("Processed item 1 1 / 5"),
call("Item 2 in skip list, skipping..."),
call("Processed item 3 3 / 5"),
call("Item 4 in skip list, skipping..."),
call("Item 7 in skip list, skipping..."),
call("Finished processing all items. 2 / 5 items processed."),
]
self.mock_logger.info.assert_has_calls(calls, any_order=True)

0 comments on commit 5e65992

Please sign in to comment.