Skip to content

Commit

Permalink
Implement generic resource types (#13)
Browse files Browse the repository at this point in the history
* Implement generic event publisher

* debug log rabbitmq messages

* fix: read harvest graphs from harvester

* fix: HARVESTER_API_URL

* fix: dont stop on first error in report

* Add routing_key to debug log

* Fix import

Co-authored-by: Nils Ove Tendenes <50194012+NilsOveTen@users.noreply.github.com>

Co-authored-by: Nils Ove Tendenes <50194012+NilsOveTen@users.noreply.github.com>
  • Loading branch information
tenstad and NilsOveTen authored Dec 13, 2022
1 parent 2eb038f commit ffcb6db
Show file tree
Hide file tree
Showing 11 changed files with 552 additions and 393 deletions.
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
[package]
name = "fdk-dataset-event-publisher"
name = "fdk-event-publisher"
version = "0.1.0"
edition = "2021"

[dependencies]
actix-web = "4.1.0"
async-trait = "0.1.58"
avro-rs = "0.13.0"
chrono = "0.4.19"
lapin = "2.1.1"
Expand Down
141 changes: 141 additions & 0 deletions src/bin/fdk-dataset-event-publisher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
use std::env;

use async_trait::async_trait;
use lazy_static::lazy_static;
use serde_derive::Serialize;

use fdk_event_publisher::{
error::Error, kafka, run_event_publisher, utils::http_get, ChangeType, EventConfig, Resource,
ResourceConfig,
};

lazy_static! {
static ref HARVESTER_API_URL: String =
env::var("HARVESTER_API_URL").unwrap_or("http://localhost:8081".to_string());
static ref REASONING_API_URL: String =
env::var("REASONING_API_URL").unwrap_or("http://localhost:8082".to_string());
static ref CONSUMER_NAME: String =
env::var("CONSUMER_NAME").unwrap_or("fdk-dataset-event-publisher".to_string());
static ref OUTPUT_TOPIC: String =
env::var("OUTPUT_TOPIC").unwrap_or("dataset-events".to_string());
}

#[tokio::main]
async fn main() {
let resource_config = ResourceConfig {
consumer_name: CONSUMER_NAME.clone(),
routing_keys: vec![
"datasets.harvested".to_string(),
"datasets.reasoned".to_string(),
],
};

let event_config = EventConfig {
name: "no.fdk.dataset.DatasetEvent".to_string(),
topic: OUTPUT_TOPIC.clone(),
schema: r#"{
"name": "DatasetEvent",
"namespace": "no.fdk.dataset",
"type": "record",
"fields": [
{
"name": "type",
"type": {
"type": "enum",
"name": "DatasetEventType",
"symbols": ["DATASET_HARVESTED", "DATASET_REASONED", "DATASET_REMOVED"]
}
},
{"name": "fdkId", "type": "string"},
{"name": "graph", "type": "string"},
{"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"}
]
}"#
.to_string(),
};

tracing_subscriber::fmt()
.json()
.with_max_level(tracing::Level::DEBUG)
.with_target(false)
.with_current_span(false)
.init();

run_event_publisher::<Dataset>(resource_config, event_config).await
}

pub struct Dataset {}

#[async_trait]
impl Resource for Dataset {
type Event = DatasetEvent;

async fn event(
routing_key: &str,
id: String,
timestamp: i64,
report_change: ChangeType,
) -> Result<Option<Self::Event>, Error> {
let event_type = match report_change {
ChangeType::CreateOrUpdate => DatasetEventType::from_routing_key(routing_key),
ChangeType::Remove => Ok(DatasetEventType::DatasetRemoved),
}?;

let graph = match event_type {
DatasetEventType::DatasetHarvested => {
http_get(format!("{}/datasets/{}", HARVESTER_API_URL.as_str(), id)).await
}
DatasetEventType::DatasetReasoned => {
http_get(format!("{}/datasets/{}", REASONING_API_URL.as_str(), id)).await
}
// Do not bother fetching graph for remove events
DatasetEventType::DatasetRemoved => Ok("".to_string()),
}?;

Ok(Some(Self::Event {
event_type,
fdk_id: id,
graph,
timestamp,
}))
}
}

#[derive(Debug, Serialize)]
pub struct DatasetEvent {
#[serde(rename = "type")]
pub event_type: DatasetEventType,
#[serde(rename = "fdkId")]
pub fdk_id: String,
pub graph: String,
pub timestamp: i64,
}

impl kafka::Event for DatasetEvent {
fn key(&self) -> String {
self.fdk_id.clone()
}
}

#[derive(Clone, Copy, Debug, Serialize)]
pub enum DatasetEventType {
#[serde(rename = "DATASET_HARVESTED")]
DatasetHarvested,
#[serde(rename = "DATASET_REASONED")]
DatasetReasoned,
#[serde(rename = "DATASET_REMOVED")]
DatasetRemoved,
}

impl DatasetEventType {
fn from_routing_key(routing_key: &str) -> Result<Self, Error> {
match routing_key {
"datasets.harvested" => Ok(Self::DatasetHarvested),
"datasets.reasoned" => Ok(Self::DatasetReasoned),
_ => Err(Error::String(format!(
"unknown routing key: '{}'",
routing_key
))),
}
}
}
41 changes: 41 additions & 0 deletions src/http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use actix_web::{get, App, HttpServer, Responder};

use crate::metrics::get_metrics;

#[get("/ping")]
async fn ping() -> impl Responder {
"pong"
}

#[get("/ready")]
async fn ready() -> impl Responder {
"ok"
}

#[get("/metrics")]
async fn metrics_service() -> impl Responder {
match get_metrics() {
Ok(metrics) => metrics,
Err(e) => {
tracing::error!(error = e.to_string(), "unable to gather metrics");
"".to_string()
}
}
}

// TODO: should maybe return Server struct instead of a future?
pub async fn run_http_server() -> Result<(), std::io::Error> {
HttpServer::new(|| {
App::new()
.service(ping)
.service(ready)
.service(metrics_service)
})
.bind(("0.0.0.0", 8080))
.unwrap_or_else(|e| {
tracing::error!(error = e.to_string(), "http server error");
std::process::exit(1);
})
.run()
.await
}
58 changes: 32 additions & 26 deletions src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,14 @@ use schema_registry_converter::{
async_impl::{avro::AvroEncoder, schema_registry::SrSettings},
schema_registry_common::SubjectNameStrategy,
};
use serde::Serialize;

use crate::schemas::DatasetEvent;
use crate::EventConfig;

lazy_static! {
pub static ref BROKERS: String = env::var("BROKERS").unwrap_or("localhost:9092".to_string());
pub static ref SCHEMA_REGISTRY: String =
env::var("SCHEMA_REGISTRY").unwrap_or("http://localhost:8081".to_string());
pub static ref OUTPUT_TOPIC: String =
env::var("OUTPUT_TOPIC").unwrap_or("dataset-events".to_string());
}

#[derive(Debug, thiserror::Error)]
Expand All @@ -28,6 +27,36 @@ pub enum KafkaError {
RdkafkaError(#[from] rdkafka::error::KafkaError),
}

pub trait Event: Serialize {
fn key(&self) -> String;
}

pub async fn send_event<E: Event>(
encoder: &mut AvroEncoder<'_>,
producer: &FutureProducer,
event_config: &EventConfig,
event: E,
) -> Result<(), KafkaError> {
let key = event.key();

let encoded = encoder
.encode_struct(
event,
&SubjectNameStrategy::RecordNameStrategy(event_config.name.to_string()),
)
.await?;

let record = FutureRecord::to(&event_config.topic)
.key(&key)
.payload(&encoded);
producer
.send(record, Duration::from_secs(0))
.await
.map_err(|e| e.0)?;

Ok(())
}

pub fn create_sr_settings() -> Result<SrSettings, KafkaError> {
let mut schema_registry_urls = SCHEMA_REGISTRY.split(",");

Expand All @@ -50,26 +79,3 @@ pub fn create_producer() -> Result<FutureProducer, KafkaError> {
.create()?;
Ok(producer)
}

pub async fn send_event(
encoder: &mut AvroEncoder<'_>,
producer: &FutureProducer,
event: DatasetEvent,
) -> Result<(), KafkaError> {
let key = event.fdk_id.clone();
let encoded = encoder
.encode_struct(
event,
&SubjectNameStrategy::RecordNameStrategy("no.fdk.dataset.DatasetEvent".to_string()),
)
.await?;

let record: FutureRecord<String, Vec<u8>> =
FutureRecord::to(&OUTPUT_TOPIC).key(&key).payload(&encoded);
producer
.send(record, Duration::from_secs(0))
.await
.map_err(|e| e.0)?;

Ok(())
}
Loading

0 comments on commit ffcb6db

Please sign in to comment.