-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_data_pipeline_sdk.py
102 lines (77 loc) · 3.79 KB
/
test_data_pipeline_sdk.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import pytest
from data_pipeline_sdk import DataPipelineSDK
from output import DataOutput
import pytest
from data_pipeline_sdk import DataPipelineSDK
from error_handling import IngestionError, TransformationError, OutputError
@pytest.fixture
def sdk(mocker):
mocker.patch('data_pipeline_sdk.Config.load_config', return_value={
'kafka_brokers': 'localhost:9092',
'kafka_topic': 'test-topic',
'log_file': 'data_pipeline.log',
'output_file': 'output.json',
'datadog_api_key': 'YOUR_API_KEY',
'datadog_app_key': 'YOUR_APP_KEY',
'metrics_port': 8000,
'plugins': [{'module': 'plugins.sample_plugin', 'class': 'SamplePlugin'}]
})
return DataPipelineSDK(config_file='config.json')
# @pytest.fixture
# def sdk():
# return DataPipelineSDK(kafka_brokers='localhost:9092', kafka_topic='test-topic')
def test_kafka_produce_consume(sdk):
test_message = {"key": "value"}
sdk.ingestion.produce(test_message)
for message in sdk.ingestion.consume():
assert message == test_message
break
def test_transformation_plugin():
plugin = SamplePlugin()
data = {"key": "value"}
transformed_data = plugin.transform(data)
assert transformed_data["new_key"] == "new_value"
def test_data_output():
data = {"key": "value"}
DataOutput.to_file(data, 'test_output.json')
loaded_data = DataOutput.from_file('test_output.json')
assert loaded_data == data
def test_process_data(mocker, sdk):
# Mock KafkaIngestion
mock_consume = mocker.patch.object(sdk.ingestion, 'consume', return_value=[{"key": "value"}])
# Mock PluginManager
mock_transform = mocker.patch.object(sdk.plugin_manager, 'apply_plugins', return_value={"key": "value", "new_key": "new_value"})
# Mock DataOutput
mock_to_file = mocker.patch.object(sdk.output.DataOutput, 'to_file')
# Run the process_data method
sdk.process_data()
# Verify Kafka consumption
mock_consume.assert_called_once()
# Verify transformation
mock_transform.assert_called_once_with({"key": "value"})
# Verify data output
mock_to_file.assert_called_once_with('{\n "key": "value",\n "new_key": "new_value"\n}', 'output.json')
def test_transformation_error(mocker, sdk):
mock_consume = mocker.patch.object(sdk.ingestion, 'consume', return_value=[{"key": "value"}])
mock_transform = mocker.patch.object(sdk.plugin_manager, 'apply_plugins', side_effect=Exception('Transformation error'))
mock_to_file = mocker.patch.object(sdk.output.DataOutput, 'to_file')
mock_log_error = mocker.patch.object(sdk.logger, 'error')
with pytest.raises(TransformationError):
sdk.process_data()
mock_log_error.assert_called_with('Transformation error: Transformation error')
def test_output_error(mocker, sdk):
mock_consume = mocker.patch.object(sdk.ingestion, 'consume', return_value=[{"key": "value"}])
mock_transform = mocker.patch.object(sdk.plugin_manager, 'apply_plugins', return_value={"key": "value", "new_key": "new_value"})
mock_to_file = mocker.patch.object(sdk.output.DataOutput, 'to_file', side_effect=Exception('Output error'))
mock_log_error = mocker.patch.object(sdk.logger, 'error')
with pytest.raises(OutputError):
sdk.process_data()
mock_log_error.assert_called_with('Output error: Output error')
def test_ingestion_error(mocker, sdk):
mock_consume = mocker.patch.object(sdk.ingestion, 'consume', side_effect=Exception('Ingestion error'))
mock_transform = mocker.patch.object(sdk.plugin_manager, 'apply_plugins')
mock_to_file = mocker.patch.object(sdk.output.DataOutput, 'to_file')
mock_log_error = mocker.patch.object(sdk.logger, 'error')
with pytest.raises(IngestionError):
sdk.process_data()
mock_log_error.assert_called_with('Ingestion error: Ingestion error')