Skip to content

Commit

Permalink
kafka consumer example
Browse files Browse the repository at this point in the history
  • Loading branch information
rdcm committed Nov 10, 2023
1 parent 642ec68 commit 8161c4e
Show file tree
Hide file tree
Showing 32 changed files with 745 additions and 32 deletions.
136 changes: 136 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ members = [
"host",
"infra",
"integrtion-tests",
"acl"
]

[workspace.dependencies]
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Web api application prototype written in rust

- Clean architecture design with CQRS pattern
- Crud api endpoints with actix-web and mongodb driver
- Kafka Consumer implemented with `rdkafka`
- Integration tests
- Workspaces usage
- OpenApi and Swagger-UI
Expand All @@ -15,6 +16,7 @@ Web api application prototype written in rust

## Structure

- `acl` - anti corruption layer implemented as kafka consumer
- `app` - application layer with endpoints
- `domain` - business logic contracts (queries/commands interfaces)
- `domain_impl` - implementation of buisiness logic contracts
Expand Down
11 changes: 11 additions & 0 deletions acl/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name="acl"
version="0.0.1"
edition="2021"

[dependencies]
log = "0.4.20"
env_logger = "0.10.0"
tokio = { version = "1.33.0", features = ["full"] }
messaging = { path = "../messaging" }
domain = { path = "../domain" }
15 changes: 15 additions & 0 deletions acl/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# build image
FROM rust:1.73.0-slim-buster as build
WORKDIR /web-api
COPY . .
RUN apt-get update && apt-get install -y build-essential \
openssl libssl-dev \
zlib1g \
cmake

RUN --mount=type=cache,target=/web-api/target cargo build --release && cp target/release/acl /acl

# acl image
FROM debian:buster-slim as acl
COPY --from=build /acl /acl
CMD ["/acl"]
1 change: 1 addition & 0 deletions acl/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

19 changes: 19 additions & 0 deletions acl/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use domain::events::ActivityEvent;
use log::info;
use messaging::kafka::{IKafkaFactory, KafkaConfig, KafkaFactory};
use std::sync::Arc;

#[tokio::main]
async fn main() {
std::env::set_var("RUST_LOG", "info");
env_logger::init();

let config = KafkaConfig::new();
let kafka_factory: Arc<dyn IKafkaFactory<ActivityEvent>> = Arc::new(KafkaFactory::new(config));
kafka_factory
.create_consumer()
.consume(&|message| {
info!("message: '{:?}'", message);
})
.await;
}
1 change: 1 addition & 0 deletions app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ actix-web = "4.4.0"
serde = "1.0.188"
serde_derive = "1.0.188"
domain = { path = "../domain" }
messaging = { path = "../messaging" }
utoipa = { version = "4", features = ["actix_extras"] }
30 changes: 29 additions & 1 deletion app/src/endpoints.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use crate::models::{CreateUserRequest, CreatedUserIdResponse, ErrorResponse, UserResponse};
use crate::models::TrackActivityRequest::{Click, Open};
use crate::models::{
CreateUserRequest, CreatedUserIdResponse, ErrorResponse, TrackActivityRequest, UserResponse,
};
use actix_web::web::{Data, Json, Path};
use actix_web::{get, post, HttpResponse};
use domain::commands::{CreateUserCommand, ICommandHandler};
use domain::events::{ActivityEvent, IActivityTracker};
use domain::queries::{GetUserQuery, IQueryHandler, User};

/// Get user
Expand Down Expand Up @@ -65,3 +69,27 @@ pub async fn create_user(
None => HttpResponse::BadRequest().json(ErrorResponse { code: 102 }),
}
}

/// Track user activity
///
/// Track user activity
#[utoipa::path(
request_body = TrackActivityRequest,
responses(
(status = 200, description = "Activity successfully tracked"),
)
)]
#[post("/track_activity")]
pub async fn track_activity(
tracker: Data<dyn IActivityTracker>,
json: Json<TrackActivityRequest>,
) -> HttpResponse {
let request = json.into_inner();

match request {
Click { x, y } => tracker.track(&ActivityEvent::Click { x, y }).await,
Open { path } => tracker.track(&ActivityEvent::Open { p: path }).await,
}

HttpResponse::Ok().json(())
}
8 changes: 8 additions & 0 deletions app/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,11 @@ pub struct UserResponse {
pub struct ErrorResponse {
pub code: i32,
}

#[derive(Serialize, Deserialize, ToSchema)]
pub enum TrackActivityRequest {
#[serde(rename = "click")]
Click { x: i32, y: i32 },
#[serde(rename = "open")]
Open { path: String },
}
4 changes: 0 additions & 4 deletions build_image.sh

This file was deleted.

3 changes: 3 additions & 0 deletions build_images.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash

docker-compose build
Loading

0 comments on commit 8161c4e

Please sign in to comment.