From 8161c4e5bc9689802740554851421275d8aa0cc2 Mon Sep 17 00:00:00 2001 From: Timofey Myagkikh Date: Fri, 10 Nov 2023 21:32:11 +0300 Subject: [PATCH] kafka consumer example --- Cargo.lock | 136 ++++++++++++++++++++ Cargo.toml | 1 + README.md | 2 + acl/Cargo.toml | 11 ++ acl/Dockerfile | 15 +++ acl/src/lib.rs | 1 + acl/src/main.rs | 19 +++ app/Cargo.toml | 1 + app/src/endpoints.rs | 30 ++++- app/src/models.rs | 8 ++ build_image.sh | 4 - build_images.sh | 3 + docker-compose.yml | 71 ++++++++++- domain/src/events.rs | 14 +++ domain/src/lib.rs | 3 +- host/Cargo.toml | 3 +- Dockerfile => host/Dockerfile | 7 +- host/src/composition.rs | 24 +++- host/src/conf.rs | 2 +- host/src/main.rs | 8 +- infra/Cargo.toml | 3 +- infra/src/lib.rs | 2 + infra/src/repositories.rs | 4 +- infra/src/tracker.rs | 22 ++++ integrtion-tests/Cargo.toml | 1 + integrtion-tests/tests/gen.rs | 8 +- integrtion-tests/tests/sut.rs | 46 ++++++- integrtion-tests/tests/user_tests.rs | 41 +++++- messaging/Cargo.toml | 13 ++ messaging/src/fake_kafka.rs | 88 +++++++++++++ messaging/src/kafka.rs | 181 +++++++++++++++++++++++++++ messaging/src/lib.rs | 5 + 32 files changed, 745 insertions(+), 32 deletions(-) create mode 100644 acl/Cargo.toml create mode 100644 acl/Dockerfile create mode 100644 acl/src/lib.rs create mode 100644 acl/src/main.rs delete mode 100755 build_image.sh create mode 100755 build_images.sh create mode 100644 domain/src/events.rs rename Dockerfile => host/Dockerfile (62%) create mode 100644 infra/src/tracker.rs create mode 100644 messaging/Cargo.toml create mode 100644 messaging/src/fake_kafka.rs create mode 100644 messaging/src/kafka.rs create mode 100644 messaging/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index f8b5888..7da7f79 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,17 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "acl" +version = "0.0.1" +dependencies = [ + "domain", + "env_logger", + "log", + "messaging", + "tokio", +] + [[package]] name = "actix-codec" version = "0.5.1" @@ -255,6 +266,7 @@ version = "0.0.1" dependencies = [ "actix-web", "domain", + "messaging", "serde", "serde_derive", "utoipa", @@ -434,6 +446,15 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "cmake" +version = "0.1.50" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a31c789563b815f77f4250caee12365734369f942439b7defd71e18a48197130" +dependencies = [ + "cc", +] + [[package]] name = "convert_case" version = "0.4.0" @@ -902,6 +923,7 @@ dependencies = [ "domain_impl", "env_logger", "infra", + "messaging", "mongodb", "serde", "serde_derive", @@ -1075,6 +1097,7 @@ dependencies = [ "async-trait", "domain", "domain_impl", + "messaging", "mongodb", ] @@ -1149,6 +1172,18 @@ version = "0.2.149" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" +[[package]] +name = "libz-sys" +version = "1.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d97137b25e321a73eef1418d1d5d2eda4d77e12813f8e6dead84bc52c5870a7b" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linked-hash-map" version = "0.5.6" @@ -1231,6 +1266,18 @@ version = "2.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" +[[package]] +name = "messaging" +version = "0.0.1" +dependencies = [ + "async-trait", + "log", + "rdkafka", + "serde", + "serde_json", + "tokio", +] + [[package]] name = "mime" version = "0.3.17" @@ -1352,6 +1399,27 @@ dependencies = [ "libc", ] +[[package]] +name = "num_enum" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f646caf906c20226733ed5b1374287eb97e3c2a5c227ce668c1f2ce20ae57c9" +dependencies = [ + "num_enum_derive", +] + +[[package]] +name = "num_enum_derive" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcbff9bc912032c62bf65ef1d5aea88983b420f4f839db1e9b0c281a25c9c799" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "object" version = "0.32.1" @@ -1485,6 +1553,16 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "proc-macro-crate" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f4c021e1093a56626774e81216a4ce732a735e5bad4868a03f3ed65ca0c3919" +dependencies = [ + "once_cell", + "toml_edit", +] + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -1569,6 +1647,37 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rdkafka" +version = "0.35.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f16c17f411935214a5870e40aff9291f8b40a73e97bf8de29e5959c473d5ef33" +dependencies = [ + "futures-channel", + "futures-util", + "libc", + "log", + "rdkafka-sys", + "serde", + "serde_derive", + "serde_json", + "slab", + "tokio", +] + +[[package]] +name = "rdkafka-sys" +version = "4.7.0+2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55e0d2f9ba6253f6ec72385e453294f8618e9e15c2c6aba2a5c01ccf9622d615" +dependencies = [ + "cmake", + "libc", + "libz-sys", + "num_enum", + "pkg-config", +] + [[package]] name = "redox_syscall" version = "0.2.16" @@ -2164,6 +2273,7 @@ dependencies = [ "domain_impl", "host", "infra", + "messaging", "rand", "reqwest", "tokio", @@ -2299,6 +2409,23 @@ dependencies = [ "tracing", ] +[[package]] +name = "toml_datetime" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3550f4e9685620ac18a50ed434eb3aec30db8ba93b0287467bca5826ea25baf1" + +[[package]] +name = "toml_edit" +version = "0.19.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" +dependencies = [ + "indexmap 2.1.0", + "toml_datetime", + "winnow", +] + [[package]] name = "tower-service" version = "0.3.2" @@ -2722,6 +2849,15 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" +[[package]] +name = "winnow" +version = "0.5.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "829846f3e3db426d4cee4510841b71a8e58aa2a76b1132579487ae430ccd9c7b" +dependencies = [ + "memchr", +] + [[package]] name = "winreg" version = "0.50.0" diff --git a/Cargo.toml b/Cargo.toml index 501da4a..70ebe64 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "host", "infra", "integrtion-tests", + "acl" ] [workspace.dependencies] diff --git a/README.md b/README.md index 78ce3ac..2f3a930 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,7 @@ Web api application prototype written in rust - Clean architecture design with CQRS pattern - Crud api endpoints with actix-web and mongodb driver +- Kafka Consumer implemented with `rdkafka` - Integration tests - Workspaces usage - OpenApi and Swagger-UI @@ -15,6 +16,7 @@ Web api application prototype written in rust ## Structure +- `acl` - anti corruption layer implemented as kafka consumer - `app` - application layer with endpoints - `domain` - business logic contracts (queries/commands interfaces) - `domain_impl` - implementation of buisiness logic contracts diff --git a/acl/Cargo.toml b/acl/Cargo.toml new file mode 100644 index 0000000..39d3506 --- /dev/null +++ b/acl/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name="acl" +version="0.0.1" +edition="2021" + +[dependencies] +log = "0.4.20" +env_logger = "0.10.0" +tokio = { version = "1.33.0", features = ["full"] } +messaging = { path = "../messaging" } +domain = { path = "../domain" } \ No newline at end of file diff --git a/acl/Dockerfile b/acl/Dockerfile new file mode 100644 index 0000000..4df613a --- /dev/null +++ b/acl/Dockerfile @@ -0,0 +1,15 @@ +# build image +FROM rust:1.73.0-slim-buster as build +WORKDIR /web-api +COPY . . +RUN apt-get update && apt-get install -y build-essential \ + openssl libssl-dev \ + zlib1g \ + cmake + +RUN --mount=type=cache,target=/web-api/target cargo build --release && cp target/release/acl /acl + +# acl image +FROM debian:buster-slim as acl +COPY --from=build /acl /acl +CMD ["/acl"] \ No newline at end of file diff --git a/acl/src/lib.rs b/acl/src/lib.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/acl/src/lib.rs @@ -0,0 +1 @@ + diff --git a/acl/src/main.rs b/acl/src/main.rs new file mode 100644 index 0000000..53b01d3 --- /dev/null +++ b/acl/src/main.rs @@ -0,0 +1,19 @@ +use domain::events::ActivityEvent; +use log::info; +use messaging::kafka::{IKafkaFactory, KafkaConfig, KafkaFactory}; +use std::sync::Arc; + +#[tokio::main] +async fn main() { + std::env::set_var("RUST_LOG", "info"); + env_logger::init(); + + let config = KafkaConfig::new(); + let kafka_factory: Arc> = Arc::new(KafkaFactory::new(config)); + kafka_factory + .create_consumer() + .consume(&|message| { + info!("message: '{:?}'", message); + }) + .await; +} diff --git a/app/Cargo.toml b/app/Cargo.toml index 7c6619c..5952eec 100644 --- a/app/Cargo.toml +++ b/app/Cargo.toml @@ -8,4 +8,5 @@ actix-web = "4.4.0" serde = "1.0.188" serde_derive = "1.0.188" domain = { path = "../domain" } +messaging = { path = "../messaging" } utoipa = { version = "4", features = ["actix_extras"] } \ No newline at end of file diff --git a/app/src/endpoints.rs b/app/src/endpoints.rs index d9d901c..d2b3544 100644 --- a/app/src/endpoints.rs +++ b/app/src/endpoints.rs @@ -1,7 +1,11 @@ -use crate::models::{CreateUserRequest, CreatedUserIdResponse, ErrorResponse, UserResponse}; +use crate::models::TrackActivityRequest::{Click, Open}; +use crate::models::{ + CreateUserRequest, CreatedUserIdResponse, ErrorResponse, TrackActivityRequest, UserResponse, +}; use actix_web::web::{Data, Json, Path}; use actix_web::{get, post, HttpResponse}; use domain::commands::{CreateUserCommand, ICommandHandler}; +use domain::events::{ActivityEvent, IActivityTracker}; use domain::queries::{GetUserQuery, IQueryHandler, User}; /// Get user @@ -65,3 +69,27 @@ pub async fn create_user( None => HttpResponse::BadRequest().json(ErrorResponse { code: 102 }), } } + +/// Track user activity +/// +/// Track user activity +#[utoipa::path( + request_body = TrackActivityRequest, + responses( + (status = 200, description = "Activity successfully tracked"), + ) +)] +#[post("/track_activity")] +pub async fn track_activity( + tracker: Data, + json: Json, +) -> HttpResponse { + let request = json.into_inner(); + + match request { + Click { x, y } => tracker.track(&ActivityEvent::Click { x, y }).await, + Open { path } => tracker.track(&ActivityEvent::Open { p: path }).await, + } + + HttpResponse::Ok().json(()) +} diff --git a/app/src/models.rs b/app/src/models.rs index 69ecc33..cb65070 100644 --- a/app/src/models.rs +++ b/app/src/models.rs @@ -23,3 +23,11 @@ pub struct UserResponse { pub struct ErrorResponse { pub code: i32, } + +#[derive(Serialize, Deserialize, ToSchema)] +pub enum TrackActivityRequest { + #[serde(rename = "click")] + Click { x: i32, y: i32 }, + #[serde(rename = "open")] + Open { path: String }, +} diff --git a/build_image.sh b/build_image.sh deleted file mode 100755 index a0b84f6..0000000 --- a/build_image.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/bash - -export DOCKER_BUILDKIT=1 -docker build -t web-api . \ No newline at end of file diff --git a/build_images.sh b/build_images.sh new file mode 100755 index 0000000..169a51d --- /dev/null +++ b/build_images.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +docker-compose build \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index e3ade03..ad07079 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -7,15 +7,82 @@ services: image: mongo:7.0 ports: - "27017:27017" + + acl: + build: + context: . + dockerfile: acl/Dockerfile + hostname: acl + container_name: acl + image: acl:latest + depends_on: + - kafka + - mongo + - zookeeper + environment: + - RUST_BACKTRACE=1 + - KAFKA_BROKERS=kafka:29092 + - KAFKA_TOPIC=messages + - KAFKA_GROUP_ID=dev + web-api: + build: + context: . + dockerfile: host/Dockerfile hostname: web-api container_name: web-api image: web-api:latest + depends_on: + - kafka + - mongo + - zookeeper ports: - "8080:8080" environment: - API_HOST=0.0.0.0 - API_PORT=8080 - - DB_NAME=docker + - DB_NAME=dev - DB_URI=mongodb://mongo:27017 - - RUST_BACKTRACE=1 \ No newline at end of file + - RUST_BACKTRACE=1 + - KAFKA_BROKERS=kafka:29092 + - KAFKA_TOPIC=messages + - KAFKA_GROUP_ID=dev + + zookeeper: + image: confluentinc/cp-zookeeper:7.0.1 + hostname: zookeeper + container_name: zookeeper + ports: + - "2181:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + + kafka: + image: confluentinc/cp-kafka:7.0.1 + hostname: kafka + container_name: kafka + ports: + - "9092:9092" + depends_on: + - zookeeper + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://0.0.0.0:9092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + + kafka-ui: + image: provectuslabs/kafka-ui + container_name: kafka-ui + ports: + - "8085:8080" + depends_on: + - kafka + environment: + - KAFKA_CLUSTERS_0_NAME=local + - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:29092 \ No newline at end of file diff --git a/domain/src/events.rs b/domain/src/events.rs new file mode 100644 index 0000000..d3b7670 --- /dev/null +++ b/domain/src/events.rs @@ -0,0 +1,14 @@ +use async_trait::async_trait; +use serde_derive::{Deserialize, Serialize}; + +#[async_trait] +pub trait IActivityTracker: Send + Sync { + async fn track(&self, activity: &ActivityEvent); +} +#[derive(Serialize, Deserialize, Debug)] +pub enum ActivityEvent { + #[serde(rename = "click")] + Click { x: i32, y: i32 }, + #[serde(rename = "open")] + Open { p: String }, +} diff --git a/domain/src/lib.rs b/domain/src/lib.rs index 328b661..aa65b90 100644 --- a/domain/src/lib.rs +++ b/domain/src/lib.rs @@ -1,5 +1,6 @@ #[path = "commands.rs"] pub mod commands; - +#[path = "events.rs"] +pub mod events; #[path = "queries.rs"] pub mod queries; diff --git a/host/Cargo.toml b/host/Cargo.toml index 18a63ae..3e776bb 100644 --- a/host/Cargo.toml +++ b/host/Cargo.toml @@ -16,4 +16,5 @@ utoipa-swagger-ui = { version = "4", features = ["actix-web"] } app = { path = "../app" } domain = { path = "../domain" } domain_impl = { path = "../domain_impl" } -infra = { path = "../infra" } \ No newline at end of file +infra = { path = "../infra" } +messaging = { path = "../messaging" } \ No newline at end of file diff --git a/Dockerfile b/host/Dockerfile similarity index 62% rename from Dockerfile rename to host/Dockerfile index 53e6bfe..c43f934 100644 --- a/Dockerfile +++ b/host/Dockerfile @@ -2,9 +2,14 @@ FROM rust:1.73.0-slim-buster as build WORKDIR /web-api COPY . . +RUN apt-get update && apt-get install -y build-essential \ + openssl libssl-dev \ + zlib1g \ + cmake + RUN --mount=type=cache,target=/web-api/target cargo build --release && cp target/release/host /host # app image -FROM debian:buster-slim +FROM debian:buster-slim as app COPY --from=build /host /host CMD ["/host"] \ No newline at end of file diff --git a/host/src/composition.rs b/host/src/composition.rs index d09570b..4761ff2 100644 --- a/host/src/composition.rs +++ b/host/src/composition.rs @@ -9,19 +9,27 @@ use utoipa::OpenApi; use utoipa_swagger_ui::SwaggerUi; use crate::conf::AppConf; -use app::endpoints::{create_user, get_user}; +use app::endpoints::{create_user, get_user, track_activity}; use app::models::*; use domain::commands::{CreateUserCommand, ICommandHandler}; +use domain::events::{ActivityEvent, IActivityTracker}; use domain::queries::{GetUserQuery, IQueryHandler, User}; use domain_impl::handlers::{CreateUserCommandHandler, GetUserQueryHandler}; use infra::repositories::UserRepository; +use infra::tracker::ActivityTracker; +use messaging::kafka::IKafkaFactory; #[derive(OpenApi)] #[openapi( - paths(app::endpoints::create_user, app::endpoints::get_user), + paths( + app::endpoints::create_user, + app::endpoints::get_user, + app::endpoints::track_activity + ), components( schemas(CreateUserRequest, CreatedUserIdResponse, ErrorResponse), - schemas(UserResponse, ErrorResponse) + schemas(UserResponse, ErrorResponse), + schemas(TrackActivityRequest) ) )] struct ApiDoc; @@ -31,7 +39,13 @@ pub struct Composition { } impl Composition { - pub async fn new(conf: &AppConf) -> Result { + pub async fn new( + conf: &AppConf, + kafka_factory: Arc>, + ) -> Result { + let producer = kafka_factory.create_producer(); + let tracker: Arc = Arc::new(ActivityTracker::new(producer)); + let mut client_options = ClientOptions::parse(&conf.db_uri).await.unwrap(); let server_api = ServerApi::builder().version(ServerApiVersion::V1).build(); client_options.server_api = Some(server_api); @@ -62,8 +76,10 @@ impl Composition { ) .service(create_user) .service(get_user) + .service(track_activity) .app_data(Data::from(command_handler.clone())) .app_data(Data::from(query_handler.clone())) + .app_data(Data::from(tracker.clone())) }) .bind(&addr) .unwrap_or_else(|_| panic!("Failed to bind to the host: {}", &addr)); diff --git a/host/src/conf.rs b/host/src/conf.rs index 8b7ffcb..736962a 100644 --- a/host/src/conf.rs +++ b/host/src/conf.rs @@ -6,7 +6,7 @@ pub struct AppConf { } impl AppConf { - pub fn new() -> AppConf { + pub fn new() -> Self { Self { db_name: std::env::var("DB_NAME").unwrap_or_else(|_| "test".into()), api_host: std::env::var("API_HOST").unwrap_or_else(|_| "0.0.0.0".into()), diff --git a/host/src/main.rs b/host/src/main.rs index 85160f7..eda4249 100644 --- a/host/src/main.rs +++ b/host/src/main.rs @@ -1,11 +1,17 @@ +use domain::events::ActivityEvent; use host::composition::Composition; use host::conf::AppConf; +use messaging::kafka::{IKafkaFactory, KafkaConfig, KafkaFactory}; +use std::sync::Arc; #[actix_web::main] async fn main() -> std::io::Result<()> { std::env::set_var("RUST_LOG", "debug"); env_logger::init(); + let config = KafkaConfig::new(); + let kafka_factory: Arc> = Arc::new(KafkaFactory::new(config)); + let conf = AppConf::new(); - Composition::new(&conf).await?.server.await + Composition::new(&conf, kafka_factory).await?.server.await } diff --git a/infra/Cargo.toml b/infra/Cargo.toml index 1b77c43..ab866b1 100644 --- a/infra/Cargo.toml +++ b/infra/Cargo.toml @@ -7,4 +7,5 @@ edition = "2021" async-trait = { version = "0.1.73", features = [] } mongodb = "2.7.0" domain = { path = "../domain" } -domain_impl = { path = "../domain_impl" } \ No newline at end of file +domain_impl = { path = "../domain_impl" } +messaging = { path = "../messaging" } \ No newline at end of file diff --git a/infra/src/lib.rs b/infra/src/lib.rs index 7951b49..2be822a 100644 --- a/infra/src/lib.rs +++ b/infra/src/lib.rs @@ -1,2 +1,4 @@ #[path = "repositories.rs"] pub mod repositories; +#[path = "tracker.rs"] +pub mod tracker; diff --git a/infra/src/repositories.rs b/infra/src/repositories.rs index 971206f..af4ac3b 100644 --- a/infra/src/repositories.rs +++ b/infra/src/repositories.rs @@ -11,9 +11,9 @@ pub struct UserRepository { } impl UserRepository { - pub fn new(client: Client, db_name: &str) -> UserRepository { + pub fn new(client: Client, db_name: &str) -> Self { let collection: Collection = client.database(db_name).collection("users"); - UserRepository { collection } + Self { collection } } } diff --git a/infra/src/tracker.rs b/infra/src/tracker.rs new file mode 100644 index 0000000..5cd1533 --- /dev/null +++ b/infra/src/tracker.rs @@ -0,0 +1,22 @@ +use async_trait::async_trait; +use domain::events::{ActivityEvent, IActivityTracker}; +use messaging::kafka::IKafkaProducer; +use std::sync::Arc; + +#[derive(Clone)] +pub struct ActivityTracker { + producer: Arc>, +} + +impl ActivityTracker { + pub fn new(producer: Arc>) -> ActivityTracker { + ActivityTracker { producer } + } +} + +#[async_trait] +impl IActivityTracker for ActivityTracker { + async fn track(&self, activity: &ActivityEvent) { + self.producer.produce(activity).await; + } +} diff --git a/integrtion-tests/Cargo.toml b/integrtion-tests/Cargo.toml index 12fed57..bb26c48 100644 --- a/integrtion-tests/Cargo.toml +++ b/integrtion-tests/Cargo.toml @@ -8,6 +8,7 @@ app = { path = "../app" } domain = { path = "../domain" } domain_impl = { path = "../domain_impl" } infra = { path = "../infra" } +messaging = { path = "../messaging" } host = { path = "../host" } rand = "0.8.5" diff --git a/integrtion-tests/tests/gen.rs b/integrtion-tests/tests/gen.rs index 4f0c14d..b679c7a 100644 --- a/integrtion-tests/tests/gen.rs +++ b/integrtion-tests/tests/gen.rs @@ -1,7 +1,8 @@ +use app::models::TrackActivityRequest; use rand::Rng; use uuid::Uuid; -pub struct Gen {} +pub struct Gen; impl Gen { pub fn random_string() -> String { @@ -12,4 +13,9 @@ impl Gen { let mut rng = rand::thread_rng(); rng.gen() } + + pub fn random_i32() -> i32 { + let mut rng = rand::thread_rng(); + rng.gen() + } } diff --git a/integrtion-tests/tests/sut.rs b/integrtion-tests/tests/sut.rs index 1585cf8..feb1047 100644 --- a/integrtion-tests/tests/sut.rs +++ b/integrtion-tests/tests/sut.rs @@ -1,16 +1,20 @@ use actix_web::rt; -use app::models::{CreateUserRequest, CreatedUserIdResponse, UserResponse}; +use app::models::{CreateUserRequest, CreatedUserIdResponse, TrackActivityRequest, UserResponse}; +use domain::events::ActivityEvent; use host::composition::Composition; use host::conf::AppConf; +use messaging::fake_kafka::FakeKafkaFactory; +use messaging::kafka::IKafkaFactory; use reqwest::StatusCode; +use std::sync::Arc; -#[derive(Clone)] pub struct Sut { base_url: String, + kafka_factory: Arc>, } impl Sut { - pub async fn new() -> Sut { + pub async fn new() -> Self { let conf = AppConf { api_host: "127.0.0.1".to_string(), api_port: 0, @@ -18,12 +22,20 @@ impl Sut { db_name: "test".to_string(), }; - let info = Composition::new(&conf).await.unwrap(); + let kafka_factory: Arc> = + Arc::new(FakeKafkaFactory::new()); + let info = Composition::new(&conf, kafka_factory.clone()) + .await + .unwrap(); + let base_url = format!("http://{}:{}", info.addrs[0].ip(), info.addrs[0].port()); rt::spawn(info.server); - Self { base_url } + Self { + base_url, + kafka_factory: kafka_factory.clone(), + } } pub async fn get_user(&self, id: String) -> Result { @@ -41,7 +53,11 @@ impl Sut { } } - pub async fn create_user(self, name: String, age: u8) -> Result { + pub async fn create_user( + &self, + name: String, + age: u8, + ) -> Result { let client = reqwest::Client::new(); let uri = format!("{}/user", self.base_url); @@ -60,4 +76,22 @@ impl Sut { Err(response.text().await.unwrap()) } } + + pub async fn track_activity(&self, activity: &TrackActivityRequest) -> Result<(), String> { + let client = reqwest::Client::new(); + let uri = format!("{}/track_activity", self.base_url); + + let response = client.post(uri).json(&activity).send().await.unwrap(); + + if response.status() == StatusCode::OK { + Ok(()) + } else { + Err(response.text().await.unwrap()) + } + } + + pub fn get_activity_event(&self) -> ActivityEvent { + let consumer = self.kafka_factory.create_consumer(); + consumer.poll() + } } diff --git a/integrtion-tests/tests/user_tests.rs b/integrtion-tests/tests/user_tests.rs index b50726b..d2307f9 100644 --- a/integrtion-tests/tests/user_tests.rs +++ b/integrtion-tests/tests/user_tests.rs @@ -1,6 +1,9 @@ mod gen; mod sut; +use app::models::TrackActivityRequest; +use domain::events::ActivityEvent; + use crate::gen::Gen; use crate::sut::Sut; @@ -21,17 +24,43 @@ async fn user_created() { let sut = Sut::new().await; - let created_user = sut - .clone() - .create_user(name.to_string(), age) - .await - .unwrap(); + let created_user = sut.create_user(name.to_string(), age).await.unwrap(); let user_id = created_user.id; - let user = sut.clone().get_user(user_id.clone()).await.unwrap(); + let user = sut.get_user(user_id.clone()).await.unwrap(); assert_eq!(user.id, user_id); assert_eq!(user.name, name); assert_eq!(user.age, age); } + +#[actix_rt::test] +async fn open_tracked() { + let path = Gen::random_string(); + let sut = Sut::new().await; + + sut.track_activity(&TrackActivityRequest::Open { path }) + .await + .unwrap(); + + let event = sut.get_activity_event(); + + assert!(matches!(event, ActivityEvent::Open { p: path })); +} + +#[actix_rt::test] +async fn click_tracked() { + let x = Gen::random_i32(); + let y = Gen::random_i32(); + let path = Gen::random_string(); + let sut = Sut::new().await; + + sut.track_activity(&TrackActivityRequest::Click { x, y }) + .await + .unwrap(); + + let event = sut.get_activity_event(); + + assert!(matches!(event, ActivityEvent::Click { x, y })); +} diff --git a/messaging/Cargo.toml b/messaging/Cargo.toml new file mode 100644 index 0000000..f4564a8 --- /dev/null +++ b/messaging/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "messaging" +version = "0.0.1" +edition = "2021" + + +[dependencies] +rdkafka = { version = "0.35.0", features = ["cmake-build"] } +tokio = { version = "1.33.0", features = ["full"] } +serde_json = "1.0.108" +serde = { version = "1.0.189", features = ["derive"] } +async-trait = "0.1.74" +log = { version = "0.4.20", features = [] } \ No newline at end of file diff --git a/messaging/src/fake_kafka.rs b/messaging/src/fake_kafka.rs new file mode 100644 index 0000000..f1fb3f5 --- /dev/null +++ b/messaging/src/fake_kafka.rs @@ -0,0 +1,88 @@ +use crate::kafka::{IKafkaConsumer, IKafkaFactory, IKafkaProducer}; +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use std::fmt::Debug; +use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; +use std::sync::Arc; + +pub struct FakeKafkaFactory { + sender: Arc>, + receiver: Arc>, +} + +impl Default for FakeKafkaFactory { + fn default() -> Self { + Self::new() + } +} + +pub struct FakeKafkaProducer { + sender: Arc>, +} +pub struct FakeKafkaConsumer { + receiver: Arc>, +} + +unsafe impl Send for FakeKafkaConsumer {} +unsafe impl Sync for FakeKafkaConsumer {} + +impl Deserialize<'a> + Serialize + Send + Sync + Debug> IKafkaFactory + for FakeKafkaFactory +{ + fn create_producer(&self) -> Arc> { + let producer = FakeKafkaProducer::new(self.sender.clone()); + Arc::new(producer) + } + + fn create_consumer(&self) -> Arc> { + let consumer = FakeKafkaConsumer::new(self.receiver.clone()); + Arc::new(consumer) + } +} + +#[async_trait] +impl IKafkaProducer for FakeKafkaProducer { + async fn produce(&self, message: &T) -> Option<()> { + let json = serde_json::to_string(message).ok()?; + self.sender.send(json).ok()?; + Some(()) + } +} + +#[async_trait] +impl Deserialize<'a> + Debug> IKafkaConsumer for FakeKafkaConsumer { + async fn consume(&self, _handler: &(dyn Fn(T) + Sync)) { + unimplemented!(); + } + + fn poll(&self) -> T { + let message = self.receiver.recv().unwrap(); + serde_json::from_str::(&message).unwrap() + } +} + +impl FakeKafkaFactory { + #[allow(clippy::arc_with_non_send_sync)] + pub fn new() -> Self { + let (sender, receiver): (SyncSender, Receiver) = sync_channel(10); + let sender_arc = Arc::new(sender); + let receiver_arc = Arc::new(receiver); + + Self { + sender: sender_arc, + receiver: receiver_arc, + } + } +} + +impl FakeKafkaProducer { + pub fn new(sender: Arc>) -> Self { + Self { sender } + } +} + +impl FakeKafkaConsumer { + pub fn new(receiver: Arc>) -> Self { + Self { receiver } + } +} diff --git a/messaging/src/kafka.rs b/messaging/src/kafka.rs new file mode 100644 index 0000000..f18f600 --- /dev/null +++ b/messaging/src/kafka.rs @@ -0,0 +1,181 @@ +use async_trait::async_trait; +use log::{error, info, warn}; +use rdkafka::config::RDKafkaLogLevel; +use rdkafka::consumer::{Consumer, ConsumerContext, Rebalance, StreamConsumer}; +use rdkafka::error::KafkaResult; +use rdkafka::producer::{FutureProducer, FutureRecord}; +use rdkafka::{ClientConfig, ClientContext, Message, TopicPartitionList}; +use serde::{Deserialize, Serialize}; +use std::fmt::Debug; +use std::sync::Arc; +use std::time::Duration; + +struct CustomContext; + +impl ClientContext for CustomContext {} + +impl ConsumerContext for CustomContext { + fn pre_rebalance(&self, rebalance: &Rebalance) { + info!("Pre rebalance {:?}", rebalance); + } + + fn post_rebalance(&self, rebalance: &Rebalance) { + info!("Post rebalance {:?}", rebalance); + } + + fn commit_callback(&self, result: KafkaResult<()>, _offsets: &TopicPartitionList) { + info!("Committing offsets: {:?}", result); + } +} + +#[derive(Clone)] +pub struct KafkaConfig { + brokers: String, + topic: String, + group_id: String, +} + +impl KafkaConfig { + pub fn new() -> Self { + Self { + brokers: std::env::var("KAFKA_BROKERS").unwrap_or_else(|_| "localhost:9092".into()), + topic: std::env::var("KAFKA_TOPIC").unwrap_or_else(|_| "messages".into()), + group_id: std::env::var("KAFKA_GROUP_ID").unwrap_or_else(|_| "local".into()), + } + } +} + +impl Default for KafkaConfig { + fn default() -> Self { + Self::new() + } +} + +pub trait IKafkaFactory Deserialize<'a> + Serialize + Send + Sync + Debug> { + fn create_producer(&self) -> Arc>; + fn create_consumer(&self) -> Arc>; +} + +#[async_trait] +pub trait IKafkaProducer: Sync + Send +where + T: Serialize + Send + Sync, +{ + async fn produce(&self, message: &T) -> Option<()>; +} + +#[async_trait] +pub trait IKafkaConsumer Deserialize<'a> + Debug> { + async fn consume(&self, handler: &(dyn Fn(T) + Sync)); + fn poll(&self) -> T; +} + +pub struct KafkaProducer { + producer: FutureProducer, + topic: String, +} + +impl KafkaProducer { + pub fn new(config: KafkaConfig) -> Self { + let producer: FutureProducer = ClientConfig::new() + .set("bootstrap.servers", config.brokers) + .set("message.timeout.ms", "5000") + .create() + .expect("Producer creation error"); + + let topic = config.topic; + + Self { producer, topic } + } +} + +#[async_trait] +impl IKafkaProducer for KafkaProducer { + async fn produce(&self, message: &T) -> Option<()> { + let mut buffer: Vec = Vec::new(); + serde_json::to_writer(&mut buffer, message).ok()?; + + let record = FutureRecord::to(&self.topic).key("").payload(&buffer); + + self.producer + .send(record, Duration::from_secs(0)) + .await + .ok()?; + + Some(()) + } +} + +pub struct KafkaConsumer { + consumer: StreamConsumer, +} + +impl KafkaConsumer { + pub fn new(config: KafkaConfig) -> KafkaConsumer { + let context = CustomContext; + + let consumer: StreamConsumer = ClientConfig::new() + .set("group.id", config.group_id) + .set("bootstrap.servers", config.brokers) + .set("enable.partition.eof", "false") + .set("session.timeout.ms", "6000") + .set("enable.auto.commit", "true") + .set_log_level(RDKafkaLogLevel::Debug) + .create_with_context(context) + .expect("Consumer creation failed"); + + consumer + .subscribe(&[&config.topic]) + .expect("Can't subscribe to specified topics"); + + Self { consumer } + } +} + +#[async_trait] +impl Deserialize<'a> + Debug> IKafkaConsumer for KafkaConsumer { + async fn consume(&self, handler: &(dyn Fn(T) + Sync)) { + loop { + match self.consumer.recv().await { + Err(e) => warn!("Kafka error: {}", e), + Ok(m) => { + match m.payload() { + None => error!("empty payload"), + Some(p) => match serde_json::from_slice::(p) { + Err(e) => warn!("Serialization error: {}", e), + Ok(message) => handler(message), + }, + }; + } + }; + } + } + + fn poll(&self) -> T { + todo!() + } +} + +pub struct KafkaFactory { + config: KafkaConfig, +} + +impl KafkaFactory { + pub fn new(config: KafkaConfig) -> KafkaFactory { + Self { config } + } +} + +impl Deserialize<'a> + Serialize + Send + Sync + Debug> IKafkaFactory + for KafkaFactory +{ + fn create_producer(&self) -> Arc> { + let producer = KafkaProducer::new(self.config.clone()); + Arc::new(producer) + } + + fn create_consumer(&self) -> Arc> { + let consumer = KafkaConsumer::new(self.config.clone()); + Arc::new(consumer) + } +} diff --git a/messaging/src/lib.rs b/messaging/src/lib.rs new file mode 100644 index 0000000..f7e8c2b --- /dev/null +++ b/messaging/src/lib.rs @@ -0,0 +1,5 @@ +#[path = "kafka.rs"] +pub mod kafka; + +#[path = "fake_kafka.rs"] +pub mod fake_kafka;