-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdata_pipeline_sdk.py
51 lines (45 loc) · 2.17 KB
/
data_pipeline_sdk.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from kafka_ingestion import KafkaIngestion
from transformation import Transformer
from output import DataOutput
from config import Config
from datadog_logger import DatadogLogger
from metrics import Metrics
from plugin_manager import PluginManager
from error_handling import IngestionError, TransformationError, OutputError
class DataPipelineSDK:
def __init__(self, config_file):
self.config = Config.load_config(config_file)
self.ingestion = KafkaIngestion(self.config['kafka_brokers'], self.config['kafka_topic'])
self.plugin_manager = PluginManager(self.config['plugins'])
self.logger = DatadogLogger(api_key=self.config['datadog_api_key'], app_key=self.config['datadog_app_key'])
self.metrics = Metrics()
self.metrics.start_server(self.config.get('metrics_port', 8000))
def process_data(self):
try:
for message in self.ingestion.consume():
self.metrics.increment_ingested()
self.logger.info(f"Consumed message: {message}")
# Transform data using the plugin manager
try:
transformed_data = self.plugin_manager.apply_plugins(message)
self.metrics.increment_transformed()
self.logger.info(f"Transformed data: {transformed_data}")
except Exception as e:
self.logger.error(f"Transformation error: {e}")
raise TransformationError(e)
# Serialize to JSON
json_data = Transformer.dict_to_json(transformed_data)
# Output to file
try:
DataOutput.to_file(json_data, self.config['output_file'])
self.metrics.increment_output()
self.logger.info(f"Data written to {self.config['output_file']}")
except Exception as e:
self.logger.error(f"Output error: {e}")
raise OutputError(e)
except Exception as e:
self.logger.error(f"Ingestion error: {e}")
raise IngestionError(e)
# Example Usage
# sdk = DataPipelineSDK(config_file='config.json')
# sdk.process_data()