diff --git a/Cargo.lock b/Cargo.lock index 69d128a..e946beb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -394,9 +394,9 @@ checksum = "7a40729d2133846d9ed0ea60a8b9541bccddab49cd30f0715a1da672fe9a2524" [[package]] name = "async-trait" -version = "0.1.57" +version = "0.1.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76464446b8bc32758d7e88ee1a804d9914cd9b1cb264c029899680b0be29826f" +checksum = "1e805d94e6b5001b651426cf4cd446b1ab5f319d27bab5c644f61de0a804360c" dependencies = [ "proc-macro2", "quote", @@ -777,10 +777,11 @@ dependencies = [ ] [[package]] -name = "fdk-dataset-event-publisher" +name = "fdk-event-publisher" version = "0.1.0" dependencies = [ "actix-web", + "async-trait", "avro-rs", "chrono", "lapin", diff --git a/Cargo.toml b/Cargo.toml index 8ec8e3a..bceb6b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,10 +1,11 @@ [package] -name = "fdk-dataset-event-publisher" +name = "fdk-event-publisher" version = "0.1.0" edition = "2021" [dependencies] actix-web = "4.1.0" +async-trait = "0.1.58" avro-rs = "0.13.0" chrono = "0.4.19" lapin = "2.1.1" diff --git a/src/bin/fdk-dataset-event-publisher.rs b/src/bin/fdk-dataset-event-publisher.rs new file mode 100644 index 0000000..2903e49 --- /dev/null +++ b/src/bin/fdk-dataset-event-publisher.rs @@ -0,0 +1,141 @@ +use std::env; + +use async_trait::async_trait; +use lazy_static::lazy_static; +use serde_derive::Serialize; + +use fdk_event_publisher::{ + error::Error, kafka, run_event_publisher, utils::http_get, ChangeType, EventConfig, Resource, + ResourceConfig, +}; + +lazy_static! { + static ref HARVESTER_API_URL: String = + env::var("HARVESTER_API_URL").unwrap_or("http://localhost:8081".to_string()); + static ref REASONING_API_URL: String = + env::var("REASONING_API_URL").unwrap_or("http://localhost:8082".to_string()); + static ref CONSUMER_NAME: String = + env::var("CONSUMER_NAME").unwrap_or("fdk-dataset-event-publisher".to_string()); + static ref OUTPUT_TOPIC: String = + env::var("OUTPUT_TOPIC").unwrap_or("dataset-events".to_string()); +} + +#[tokio::main] +async fn main() { + let resource_config = ResourceConfig { + consumer_name: CONSUMER_NAME.clone(), + routing_keys: vec![ + "datasets.harvested".to_string(), + "datasets.reasoned".to_string(), + ], + }; + + let event_config = EventConfig { + name: "no.fdk.dataset.DatasetEvent".to_string(), + topic: OUTPUT_TOPIC.clone(), + schema: r#"{ + "name": "DatasetEvent", + "namespace": "no.fdk.dataset", + "type": "record", + "fields": [ + { + "name": "type", + "type": { + "type": "enum", + "name": "DatasetEventType", + "symbols": ["DATASET_HARVESTED", "DATASET_REASONED", "DATASET_REMOVED"] + } + }, + {"name": "fdkId", "type": "string"}, + {"name": "graph", "type": "string"}, + {"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"} + ] + }"# + .to_string(), + }; + + tracing_subscriber::fmt() + .json() + .with_max_level(tracing::Level::DEBUG) + .with_target(false) + .with_current_span(false) + .init(); + + run_event_publisher::(resource_config, event_config).await +} + +pub struct Dataset {} + +#[async_trait] +impl Resource for Dataset { + type Event = DatasetEvent; + + async fn event( + routing_key: &str, + id: String, + timestamp: i64, + report_change: ChangeType, + ) -> Result, Error> { + let event_type = match report_change { + ChangeType::CreateOrUpdate => DatasetEventType::from_routing_key(routing_key), + ChangeType::Remove => Ok(DatasetEventType::DatasetRemoved), + }?; + + let graph = match event_type { + DatasetEventType::DatasetHarvested => { + http_get(format!("{}/datasets/{}", HARVESTER_API_URL.as_str(), id)).await + } + DatasetEventType::DatasetReasoned => { + http_get(format!("{}/datasets/{}", REASONING_API_URL.as_str(), id)).await + } + // Do not bother fetching graph for remove events + DatasetEventType::DatasetRemoved => Ok("".to_string()), + }?; + + Ok(Some(Self::Event { + event_type, + fdk_id: id, + graph, + timestamp, + })) + } +} + +#[derive(Debug, Serialize)] +pub struct DatasetEvent { + #[serde(rename = "type")] + pub event_type: DatasetEventType, + #[serde(rename = "fdkId")] + pub fdk_id: String, + pub graph: String, + pub timestamp: i64, +} + +impl kafka::Event for DatasetEvent { + fn key(&self) -> String { + self.fdk_id.clone() + } +} + +#[derive(Clone, Copy, Debug, Serialize)] +pub enum DatasetEventType { + #[serde(rename = "DATASET_HARVESTED")] + DatasetHarvested, + #[serde(rename = "DATASET_REASONED")] + DatasetReasoned, + #[serde(rename = "DATASET_REMOVED")] + DatasetRemoved, +} + +impl DatasetEventType { + fn from_routing_key(routing_key: &str) -> Result { + match routing_key { + "datasets.harvested" => Ok(Self::DatasetHarvested), + "datasets.reasoned" => Ok(Self::DatasetReasoned), + _ => Err(Error::String(format!( + "unknown routing key: '{}'", + routing_key + ))), + } + } +} diff --git a/src/http.rs b/src/http.rs new file mode 100644 index 0000000..a3a087d --- /dev/null +++ b/src/http.rs @@ -0,0 +1,41 @@ +use actix_web::{get, App, HttpServer, Responder}; + +use crate::metrics::get_metrics; + +#[get("/ping")] +async fn ping() -> impl Responder { + "pong" +} + +#[get("/ready")] +async fn ready() -> impl Responder { + "ok" +} + +#[get("/metrics")] +async fn metrics_service() -> impl Responder { + match get_metrics() { + Ok(metrics) => metrics, + Err(e) => { + tracing::error!(error = e.to_string(), "unable to gather metrics"); + "".to_string() + } + } +} + +// TODO: should maybe return Server struct instead of a future? +pub async fn run_http_server() -> Result<(), std::io::Error> { + HttpServer::new(|| { + App::new() + .service(ping) + .service(ready) + .service(metrics_service) + }) + .bind(("0.0.0.0", 8080)) + .unwrap_or_else(|e| { + tracing::error!(error = e.to_string(), "http server error"); + std::process::exit(1); + }) + .run() + .await +} diff --git a/src/kafka.rs b/src/kafka.rs index 6389b24..f0e828a 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -9,15 +9,14 @@ use schema_registry_converter::{ async_impl::{avro::AvroEncoder, schema_registry::SrSettings}, schema_registry_common::SubjectNameStrategy, }; +use serde::Serialize; -use crate::schemas::DatasetEvent; +use crate::EventConfig; lazy_static! { pub static ref BROKERS: String = env::var("BROKERS").unwrap_or("localhost:9092".to_string()); pub static ref SCHEMA_REGISTRY: String = env::var("SCHEMA_REGISTRY").unwrap_or("http://localhost:8081".to_string()); - pub static ref OUTPUT_TOPIC: String = - env::var("OUTPUT_TOPIC").unwrap_or("dataset-events".to_string()); } #[derive(Debug, thiserror::Error)] @@ -28,6 +27,36 @@ pub enum KafkaError { RdkafkaError(#[from] rdkafka::error::KafkaError), } +pub trait Event: Serialize { + fn key(&self) -> String; +} + +pub async fn send_event( + encoder: &mut AvroEncoder<'_>, + producer: &FutureProducer, + event_config: &EventConfig, + event: E, +) -> Result<(), KafkaError> { + let key = event.key(); + + let encoded = encoder + .encode_struct( + event, + &SubjectNameStrategy::RecordNameStrategy(event_config.name.to_string()), + ) + .await?; + + let record = FutureRecord::to(&event_config.topic) + .key(&key) + .payload(&encoded); + producer + .send(record, Duration::from_secs(0)) + .await + .map_err(|e| e.0)?; + + Ok(()) +} + pub fn create_sr_settings() -> Result { let mut schema_registry_urls = SCHEMA_REGISTRY.split(","); @@ -50,26 +79,3 @@ pub fn create_producer() -> Result { .create()?; Ok(producer) } - -pub async fn send_event( - encoder: &mut AvroEncoder<'_>, - producer: &FutureProducer, - event: DatasetEvent, -) -> Result<(), KafkaError> { - let key = event.fdk_id.clone(); - let encoded = encoder - .encode_struct( - event, - &SubjectNameStrategy::RecordNameStrategy("no.fdk.dataset.DatasetEvent".to_string()), - ) - .await?; - - let record: FutureRecord> = - FutureRecord::to(&OUTPUT_TOPIC).key(&key).payload(&encoded); - producer - .send(record, Duration::from_secs(0)) - .await - .map_err(|e| e.0)?; - - Ok(()) -} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..5c8dbd8 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,272 @@ +use std::time::Instant; + +use async_trait::async_trait; +use chrono::DateTime; +use error::Error; +use kafka::create_sr_settings; +use lapin::{ + message::{Delivery, DeliveryResult}, + options::BasicAckOptions, +}; +use lazy_static::lazy_static; +use rabbit::HarvestReport; +use rdkafka::producer::FutureProducer; +use schema_registry_converter::async_impl::{avro::AvroEncoder, schema_registry::SrSettings}; + +use crate::{ + http::run_http_server, + kafka::{send_event, BROKERS, SCHEMA_REGISTRY}, + metrics::{register_metrics, PROCESSED_MESSAGES, PROCESSING_TIME}, + schema::setup_schema, +}; + +pub mod error; +mod http; +pub mod kafka; +mod metrics; +mod rabbit; +mod schema; +pub mod utils; + +lazy_static! { + pub static ref PRODUCER: FutureProducer = kafka::create_producer().unwrap_or_else(|e| { + tracing::error!(error = e.to_string(), "kafka producer creation error"); + std::process::exit(1); + }); + pub static ref SR_SETTINGS: SrSettings = create_sr_settings().unwrap_or_else(|e| { + tracing::error!(error = e.to_string(), "sr settings creation error"); + std::process::exit(1); + }); +} + +pub struct ResourceConfig { + pub consumer_name: String, + pub routing_keys: Vec, +} + +#[derive(Clone)] +pub struct EventConfig { + pub name: String, + pub topic: String, + pub schema: String, +} + +#[async_trait] +pub trait Resource { + type Event: kafka::Event + Send; + + async fn event( + routing_key: &str, + id: String, + timestamp: i64, + change: ChangeType, + ) -> Result, Error>; +} + +#[derive(Debug)] +pub enum ChangeType { + CreateOrUpdate, + Remove, +} + +pub async fn run_event_publisher( + resource_config: ResourceConfig, + event_config: EventConfig, +) { + tracing::info!( + brokers = BROKERS.to_string(), + schema_registry = SCHEMA_REGISTRY.to_string(), + consumer_name = resource_config.consumer_name, + output_topic = event_config.topic, + routing_keys = format!("{:?}", resource_config.routing_keys), + "starting service" + ); + + register_metrics(); + + setup_schema(&SR_SETTINGS, &event_config) + .await + .unwrap_or_else(|e| { + tracing::error!(error = e.to_string(), "schema registration error"); + std::process::exit(1); + }); + + let channel = rabbit::connect().await.unwrap_or_else(|e| { + tracing::error!(error = e.to_string(), "rabbit connection error"); + std::process::exit(1); + }); + rabbit::setup( + &channel, + &resource_config.consumer_name, + &resource_config.routing_keys, + ) + .await + .unwrap_or_else(|e| { + tracing::error!(error = e.to_string(), "rabbit setup error"); + std::process::exit(1); + }); + let consumer = rabbit::create_consumer(&channel, &resource_config.consumer_name) + .await + .unwrap_or_else(|e| { + tracing::error!(error = e.to_string(), "rabbit consumer creation error"); + std::process::exit(1); + }); + + consumer.set_delegate(move |delivery| receive_message::(event_config.clone(), delivery)); + + run_http_server().await.unwrap_or_else(|e| { + tracing::error!(error = e.to_string(), "failed to run http server"); + std::process::exit(1); + }); +} + +async fn receive_message(event_config: EventConfig, delivery: DeliveryResult) { + let delivery = match delivery { + Ok(Some(delivery)) => delivery, + Ok(None) => return, + Err(error) => { + tracing::error!(error = error.to_string(), "failed to consume message"); + std::process::exit(1); + } + }; + + let start_time = Instant::now(); + let result = + handle_message::(&PRODUCER, SR_SETTINGS.clone(), &event_config, &delivery).await; + let elapsed_millis = start_time.elapsed().as_millis(); + + let metric_status_label = match result { + Ok(_) => { + tracing::info!(elapsed_millis, "message handled successfully"); + "success" + } + Err(e) => { + tracing::error!( + elapsed_millis, + error = e.to_string(), + "failed while handling message" + ); + "error" + } + }; + PROCESSED_MESSAGES + .with_label_values(&[metric_status_label]) + .inc(); + PROCESSING_TIME.observe(elapsed_millis as f64 / 1000.0); + + delivery + .ack(BasicAckOptions::default()) + .await + .unwrap_or_else(|e| tracing::error!(error = e.to_string(), "failed to ack message")); +} + +async fn handle_message( + producer: &FutureProducer, + sr_settings: SrSettings, + event_config: &EventConfig, + delivery: &Delivery, +) -> Result<(), Error> { + let reports: Vec = serde_json::from_slice(&delivery.data)?; + + let changed_resource_count = reports + .iter() + .map(|element| element.changed_resources.len()) + .sum::(); + let removed_resource_count = reports + .iter() + .map(|element| { + element + .removed_resources + .as_ref() + .map_or(0, |resources| resources.len()) + }) + .sum::(); + + tracing::debug!( + routing_key = delivery.routing_key.as_str(), + reports = format!("{:?}", reports), + "processing event" + ); + + tracing::info!( + routing_key = delivery.routing_key.as_str(), + reports = reports.len(), + changed_resource_count, + removed_resource_count, + "processing event" + ); + let mut encoder = AvroEncoder::new(sr_settings); + + for element in reports { + let timestamp = DateTime::parse_from_str(&element.start_time, "%Y-%m-%d %H:%M:%S%.f %z")? + .timestamp_millis(); + + for resource in element.changed_resources { + if let Err(e) = handle_event::( + &mut encoder, + &producer, + &event_config, + delivery.routing_key.as_str(), + resource.fdk_id.clone(), + timestamp, + ChangeType::CreateOrUpdate, + ) + .await + { + tracing::error!( + id = resource.fdk_id, + change = format!("{:?}", ChangeType::CreateOrUpdate), + error = e.to_string(), + "failed while handling event" + ); + } + } + + if let Some(removed_resources) = element.removed_resources { + for resource in removed_resources { + if let Err(e) = handle_event::( + &mut encoder, + &producer, + &event_config, + delivery.routing_key.as_str(), + resource.fdk_id.clone(), + timestamp, + ChangeType::Remove, + ) + .await + { + tracing::error!( + id = resource.fdk_id, + change = format!("{:?}", ChangeType::Remove), + error = e.to_string(), + "failed while handling event" + ); + } + } + } + } + + Ok(()) +} + +async fn handle_event( + mut encoder: &mut AvroEncoder<'_>, + producer: &FutureProducer, + event_config: &EventConfig, + routing_key: &str, + id: String, + timestamp: i64, + change: ChangeType, +) -> Result<(), Error> { + tracing::debug!( + routing_key, + id = id.as_str(), + change = format!("{:?}", change), + "processing event" + ); + + if let Some(event) = R::event(routing_key, id, timestamp, change).await? { + send_event(&mut encoder, &producer, &event_config, event).await?; + }; + Ok(()) +} diff --git a/src/main.rs b/src/main.rs deleted file mode 100644 index 641e72f..0000000 --- a/src/main.rs +++ /dev/null @@ -1,267 +0,0 @@ -use std::{env, str::FromStr, time::Instant}; - -use actix_web::{get, App, HttpServer, Responder}; -use chrono::DateTime; -use error::Error; -use kafka::create_sr_settings; -use lapin::{ - message::{Delivery, DeliveryResult}, - options::BasicAckOptions, -}; -use lazy_static::lazy_static; -use rabbit::HarvestReport; -use rdkafka::producer::FutureProducer; -use reqwest::StatusCode; -use schema_registry_converter::async_impl::{avro::AvroEncoder, schema_registry::SrSettings}; -use schemas::{setup_schemas, DatasetEventType}; - -use crate::{ - kafka::{send_event, BROKERS, OUTPUT_TOPIC, SCHEMA_REGISTRY}, - metrics::{get_metrics, register_metrics, PROCESSED_MESSAGES, PROCESSING_TIME}, - schemas::DatasetEvent, -}; - -mod error; -mod kafka; -mod metrics; -mod rabbit; -mod schemas; - -lazy_static! { - pub static ref HARVESTER_API_URL: String = - env::var("HARVESTER_API_URL").unwrap_or("http://localhost:8080".to_string()); - pub static ref REASONING_API_URL: String = - env::var("REASONING_API_URL").unwrap_or("http://localhost:8081".to_string()); - pub static ref CONSUMER_NAME: String = - env::var("CONSUMER_NAME").unwrap_or("fdk-dataset-event-publisher".to_string()); - pub static ref ROUTING_KEYS: Vec = env::var("ROUTING_KEY") - .map(|s| s.split(",").map(|s| s.to_string()).collect()) - .unwrap_or(vec![ - "datasets.harvested".to_string(), - "datasets.reasoned".to_string() - ]); - pub static ref PRODUCER: FutureProducer = kafka::create_producer().unwrap_or_else(|e| { - tracing::error!(error = e.to_string(), "kafka producer creation error"); - std::process::exit(1); - }); - pub static ref CLIENT: reqwest::Client = - reqwest::ClientBuilder::new().build().unwrap_or_else(|e| { - tracing::error!(error = e.to_string(), "reqwest client creation error"); - std::process::exit(1); - }); - pub static ref SR_SETTINGS: SrSettings = create_sr_settings().unwrap_or_else(|e| { - tracing::error!(error = e.to_string(), "sr settings creation error"); - std::process::exit(1); - }); -} - -#[get("/ping")] -async fn ping() -> impl Responder { - "pong" -} - -#[get("/ready")] -async fn ready() -> impl Responder { - "ok" -} - -#[get("/metrics")] -async fn metrics_service() -> impl Responder { - match get_metrics() { - Ok(metrics) => metrics, - Err(e) => { - tracing::error!(error = e.to_string(), "unable to gather metrics"); - "".to_string() - } - } -} - -#[tokio::main] -async fn main() { - tracing_subscriber::fmt() - .json() - .with_max_level(tracing::Level::INFO) - .with_target(false) - .with_current_span(false) - .init(); - - register_metrics(); - - tracing::info!( - brokers = BROKERS.to_string(), - schema_registry = SCHEMA_REGISTRY.to_string(), - output_topic = OUTPUT_TOPIC.to_string(), - harvester_api_url = HARVESTER_API_URL.to_string(), - consumer_name = CONSUMER_NAME.to_string(), - routing_keys = format!("{:?}", ROUTING_KEYS.clone()), - "starting service" - ); - - setup_schemas(&SR_SETTINGS).await.unwrap_or_else(|e| { - tracing::error!(error = e.to_string(), "schema registration error"); - std::process::exit(1); - }); - - let channel = rabbit::connect().await.unwrap_or_else(|e| { - tracing::error!(error = e.to_string(), "rabbit connection error"); - std::process::exit(1); - }); - rabbit::setup(&channel, &CONSUMER_NAME, &ROUTING_KEYS) - .await - .unwrap_or_else(|e| { - tracing::error!(error = e.to_string(), "rabbit setup error"); - std::process::exit(1); - }); - let consumer = rabbit::create_consumer(&channel, &CONSUMER_NAME) - .await - .unwrap_or_else(|e| { - tracing::error!(error = e.to_string(), "rabbit consumer creation error"); - std::process::exit(1); - }); - - consumer.set_delegate(move |delivery: DeliveryResult| async { - let delivery = match delivery { - Ok(Some(delivery)) => delivery, - Ok(None) => return, - Err(error) => { - tracing::error!(error = error.to_string(), "failed to consume message"); - std::process::exit(1); - } - }; - - let start_time = Instant::now(); - let result = handle_message(&PRODUCER, &CLIENT, SR_SETTINGS.clone(), &delivery).await; - let elapsed_millis = start_time.elapsed().as_millis(); - - let metric_status_label = match result { - Ok(_) => { - tracing::info!(elapsed_millis, "message handled successfully"); - "success" - } - Err(e) => { - tracing::error!( - elapsed_millis, - error = e.to_string(), - "failed while handling message" - ); - "error" - } - }; - PROCESSED_MESSAGES - .with_label_values(&[metric_status_label]) - .inc(); - PROCESSING_TIME.observe(elapsed_millis as f64 / 1000.0); - - delivery - .ack(BasicAckOptions::default()) - .await - .unwrap_or_else(|e| tracing::error!(error = e.to_string(), "failed to ack message")); - }); - - HttpServer::new(|| { - App::new() - .service(ping) - .service(ready) - .service(metrics_service) - }) - .bind(("0.0.0.0", 8080)) - .unwrap_or_else(|e| { - tracing::error!(error = e.to_string(), "metrics server error"); - std::process::exit(1); - }) - .run() - .await - .unwrap_or_else(|e| { - tracing::error!(error = e.to_string(), "failed to run metrics server"); - std::process::exit(1); - }); -} - -async fn handle_message( - producer: &FutureProducer, - client: &reqwest::Client, - sr_settings: SrSettings, - delivery: &Delivery, -) -> Result<(), Error> { - let event_type = DatasetEventType::from_str(delivery.routing_key.as_str())?; - let reports: Vec = serde_json::from_slice(&delivery.data)?; - - let changed_resource_count = reports - .iter() - .map(|element| element.changed_resources.len()) - .sum::(); - let removed_resource_count = reports - .iter() - .map(|element| { - element - .removed_resources - .as_ref() - .map_or(0, |resources| resources.len()) - }) - .sum::(); - - tracing::info!( - routing_key = delivery.routing_key.as_str(), - reports = reports.len(), - changed_resource_count, - removed_resource_count, - "processing event" - ); - let mut encoder = AvroEncoder::new(sr_settings); - - for element in reports { - let timestamp = DateTime::parse_from_str(&element.start_time, "%Y-%m-%d %H:%M:%S%.f %z")? - .timestamp_millis(); - - for resource in element.changed_resources { - tracing::debug!(id = resource.fdk_id.as_str(), "processing changed dataset"); - if let Some(graph) = get_graph(&client, &resource.fdk_id).await? { - let message = DatasetEvent { - event_type, - fdk_id: resource.fdk_id, - graph, - timestamp, - }; - - send_event(&mut encoder, &producer, message).await?; - } else { - tracing::error!(id = resource.fdk_id, "graph not found"); - } - } - - if let Some(removed_resources) = element.removed_resources { - for resource in removed_resources { - tracing::debug!(id = resource.fdk_id.as_str(), "processing removed dataset"); - let message = DatasetEvent { - event_type: schemas::DatasetEventType::DatasetRemoved, - fdk_id: resource.fdk_id, - // TODO: this should probably not be empty string - graph: "".to_string(), - timestamp, - }; - - send_event(&mut encoder, &producer, message).await?; - } - } - } - - Ok(()) -} - -async fn get_graph(client: &reqwest::Client, id: &String) -> Result, Error> { - let response = client - .get(format!("{}/datasets/{}", REASONING_API_URL.as_str(), id)) - .send() - .await?; - - match response.status() { - StatusCode::NOT_FOUND => Ok(None), - StatusCode::OK => Ok(Some(response.text().await?)), - _ => Err(format!( - "Invalid response from harvester: {} - {}", - response.status(), - response.text().await? - ) - .into()), - } -} diff --git a/src/rabbit.rs b/src/rabbit.rs index 33d3a86..f552c04 100644 --- a/src/rabbit.rs +++ b/src/rabbit.rs @@ -13,7 +13,7 @@ pub enum RabbitError { ConfigError(&'static str, String), } -#[derive(Deserialize)] +#[derive(Debug, Deserialize)] pub struct HarvestReport { #[serde(alias = "startTime")] pub start_time: String, @@ -23,7 +23,7 @@ pub struct HarvestReport { pub removed_resources: Option>, } -#[derive(Deserialize)] +#[derive(Debug, Deserialize)] pub struct HarvestReportChange { #[serde(alias = "fdkId")] pub fdk_id: String, diff --git a/src/schema.rs b/src/schema.rs new file mode 100644 index 0000000..33ceef7 --- /dev/null +++ b/src/schema.rs @@ -0,0 +1,32 @@ +use schema_registry_converter::{ + async_impl::schema_registry::{post_schema, SrSettings}, + schema_registry_common::{SchemaType, SuppliedSchema}, +}; + +use crate::{kafka::KafkaError, EventConfig}; + +pub async fn setup_schema( + sr_settings: &SrSettings, + event_config: &EventConfig, +) -> Result<(), KafkaError> { + tracing::info!(event_config.name, "registering schema"); + + let schema = post_schema( + sr_settings, + event_config.name.to_string(), + SuppliedSchema { + name: Some(event_config.name.to_string()), + schema_type: SchemaType::Avro, + schema: event_config.schema.to_string(), + references: vec![], + }, + ) + .await?; + + tracing::info!( + id = schema.id, + event_config.name, + "schema succesfully registered" + ); + Ok(()) +} diff --git a/src/schemas.rs b/src/schemas.rs deleted file mode 100644 index 9633b2a..0000000 --- a/src/schemas.rs +++ /dev/null @@ -1,94 +0,0 @@ -use std::str::FromStr; - -use schema_registry_converter::{ - async_impl::schema_registry::{post_schema, SrSettings}, - schema_registry_common::{SchemaType, SuppliedSchema}, -}; -use serde_derive::Serialize; - -use crate::{error::Error, kafka::KafkaError}; - -#[derive(Clone, Copy, Debug, Serialize)] -pub enum DatasetEventType { - #[serde(rename = "DATASET_HARVESTED")] - DatasetHarvested, - #[serde(rename = "DATASET_REASONED")] - DatasetReasoned, - #[serde(rename = "DATASET_REMOVED")] - DatasetRemoved, -} - -impl FromStr for DatasetEventType { - type Err = Error; - - fn from_str(s: &str) -> Result { - match s { - "datasets.harvested" => Ok(Self::DatasetHarvested), - "datasets.reasoned" => Ok(Self::DatasetReasoned), - _ => Err(Self::Err::String(format!( - "unknown event (routing key) received: '{}'", - s - ))), - } - } -} - -#[derive(Debug, Serialize)] -pub struct DatasetEvent { - #[serde(rename = "type")] - pub event_type: DatasetEventType, - #[serde(rename = "fdkId")] - pub fdk_id: String, - pub graph: String, - pub timestamp: i64, -} - -pub async fn setup_schemas(sr_settings: &SrSettings) -> Result<(), KafkaError> { - register_schema( - sr_settings, - "no.fdk.dataset.DatasetEvent", - r#"{ - "name": "DatasetEvent", - "namespace": "no.fdk.dataset", - "type": "record", - "fields": [ - { - "name": "type", - "type": { - "type": "enum", - "name": "DatasetEventType", - "symbols": ["DATASET_HARVESTED", "DATASET_REASONED", "DATASET_REMOVED"] - } - }, - {"name": "fdkId", "type": "string"}, - {"name": "graph", "type": "string"}, - {"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"} - ] - }"#, - ) - .await?; - Ok(()) -} - -pub async fn register_schema( - sr_settings: &SrSettings, - name: &str, - schema_str: &str, -) -> Result<(), KafkaError> { - tracing::info!(name, "registering schema"); - - let schema = post_schema( - sr_settings, - name.to_string(), - SuppliedSchema { - name: Some(name.to_string()), - schema_type: SchemaType::Avro, - schema: schema_str.to_string(), - references: vec![], - }, - ) - .await?; - - tracing::info!(id = schema.id, name, "schema succesfully registered"); - Ok(()) -} diff --git a/src/utils.rs b/src/utils.rs new file mode 100644 index 0000000..0b6c654 --- /dev/null +++ b/src/utils.rs @@ -0,0 +1,26 @@ +use lazy_static::lazy_static; +use reqwest::StatusCode; + +use crate::error::Error; + +lazy_static! { + static ref CLIENT: reqwest::Client = + reqwest::ClientBuilder::new().build().unwrap_or_else(|e| { + tracing::error!(error = e.to_string(), "reqwest client creation error"); + std::process::exit(1); + }); +} + +pub async fn http_get(url: String) -> Result { + let response = CLIENT.get(url).send().await?; + + match response.status() { + StatusCode::OK => Ok(response.text().await?), + _ => Err(format!( + "Invalid http response: {} - {}", + response.status(), + response.text().await? + ) + .into()), + } +}