Skip to content

Latest commit

 

History

History
435 lines (297 loc) · 12.8 KB

README.md

File metadata and controls

435 lines (297 loc) · 12.8 KB

Playground - DDD Aggregates w/ CDC and CQRS

This repository is packed with concepts, drawings, slides, and code related to a working "playground" building DDD Aggregates via a CDC-CQRS data stream.

The Presentation Slides outline a solution journey from story (requirements) to solution (architecture & design) to code. The goal of this demo is to give context to code and provide a pragmatic approach for designing sofware using DDD as a key driver.

This demo highlights a CDC-CQRS pipeline between a normalized relational database, MySQL, as the command database and a de-normalized NoSQL database, MongoDB, as the query database resulting in the creation of DDD Aggregates via Debezium & Kafka-Streams.

The Snacks Unlimited "Store" source code is centered around three microservices:

  • snack-order-commands
  • snack-order-processor
  • snack-customer-orders.

These services are implemented as Spring-Boot applications in Java. The snack-order-commands exposes API REST endpoints which persist item-details, shipping-info, and payment in their respective tables on MySQL database.

Debezium tails the MySQL bin logs to capture any events in both these tables and publishes messages to Kafka topics.

These topics are consumed by snack-order-processor which is a Kafka-Streams application that joins data from these topics to create an Order-Aggregate (CustomerOrder) object which is then published to a customer-order-aggregate topic.

This topic is consumed by MongoDB Sink Connector and the data is persisted in MongoDB which is served by snack-customer-ordersservice.

References

The Story

Snacks Unlimited Order Management System.

Below is a break down of a fictitious but relatable problem and the solution journey to building a scalable, flexible system using CQRS with CDC and Kadka.

Problem

Domain Storytelling

Event Blueprinting

Solution

DDD Context Map

C4 Modeling - Context View

C4 Modeling - Component View

Implementation Kafka Streams Topology


Playground Environment

Spinning up Playground "Platform"

The "platform" consists of:

  • Confluent Platform (Docker) - Kafka, Kafka Connect
  • MySQL and "Adminer" (manage MySQL via browser)
  • MongoDB and Mongo Express (manage Mongo via browser)

Start and Stop the Platform

$ ./platform kafka|redpanda start
$ ./platform kafka|redpanda stop

Spinning up Playground "Apps"

This will start the following Spring Boot applications:

  • snack-order-commands (Write / MySQL)
  • snack-customer-orders (Read / MongoDB)
  • snack-order-processor (Anti-corruption / Kafka Streams)

NOTE: Ensure the docker images for the apps have been build by running the following in each service project.

$ ./gradlew clean docker

Start & Stop the Apps

$ ./apps start
$ ./apps stop

Creating the Debezium CDC MySQL Source Connector (order-command-db)

  • In Postman run the POST Create MySQL CDC Connector Request

Creating the MongoDB Sink Connector (order aggregate store)

  • In Postman run the POST Create MongoDB Sink Connector Request

DEMO: E2E Script

See Snacks Unlimited - Postman Collection.

--- DEMO START ---

  • 1 - Create Kafka Connectors

  • 2 - Create Customer Order (Snack Customer)

    • POST Item Details
    • POST Shipping Location
    • POST Payment
  • 3 - Fulfill Orders (Sam the Wizard)

    • GET PAID Orders
    • PUT Order Fulfillment
  • 4 - Ship Orders (Minions Delivery Team)

    • GET FULFILLED Orders
    • PUT Order Shipment

--- DEMO COMPLETE ---

  • List Kafka Topics
  • Consume Kafka Topic Records
  • MySQL Database
  • MongoDB Database

Development Notes (Step-by-Step)

Build: snack-order-commands (Write)

This is a transactional service that triggers the overall CQRS workflow by initiating order commands:

  • POST order items (api/item)
  • POST shipping details (api/shipping)
  • POST payment details (api/payment)

Step 1: Create a Spring Boot project with Gradle, Web, JPA, MySQL

$ spring init -d=web,jpa,mysql --build=gradle --type=gradle-project-kotlin snack-order-commands

Create the base packages:

  • api.rest
  • data
  • exception
  • service

Step 2: Create the Skeleton OrderController (REST endpoints)

  • see src/main/java/api/rest/OrderWriteController.java

Step 3: Build Components, Tests, Config

Build: snack-order-processor (DDD Aggregate)

This service consumes from the CDC Topics (triggered from order-write-service) and joins them into an Order Aggregrate for reading.

Step 1: Create the Spring Boot Base Project

$ spring init -d=web --build=gradle --type=gradle-project-kotlin snack-order-processor

Add in the Kafka dependencies in build.gradle.kts:

implementation("org.springframework.kafka:spring-kafka")
implementation("org.apache.kafka:kafka-streams")

testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.kafka:spring-kafka-test")

Create the base packages:

  • config (Kafka Configuration)
  • model.domain
  • model.inbound
  • stream (Kafka Stream topology and Serdes configuration)

Step 2: Create Skeleton Model

Step 3: Configure Kafka and Streams

Step 4: Build Out Components, Tests

Step 5: Setup Kafka Streams Topology Visualization

  • Add the following line to OrderAggregateStream:
See TopologyController class
  • (OPTIONAL) Add this to application.yml to enable actuator endpoints.
management:
  endpoints:å
    web:
      exposure:
        include: "*"

Build: snack-customer-orders (Read)

This is a query service that reads from a MongoDB datastore and represents the read segregated responsibilities of CQRS.

Query commands:

  • GET api/orders/{orderId}

Step 1: Create a Spring Boot project with Gradle, Web, MongoDB

$ spring init -d=web,data-mongodb --build=gradle --type=gradle-project-kotlin snack-customer-orders

Create the base packages:

  • api.rest
  • api.model
  • data.repository
  • service

Step 2: Create the Skeleton OrderController (REST endpoints)

  • see src/main/java/api/rest/OrderWriteController.java

Step 3: Build Components, Tests, Config

Setup: Connectors

Step 1: Debezium CDC Connector

With the Platform Running (See Above).

POST http://localhost:8083/connectors
--
{
    "name": "order-command-db-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "topic.prefix": "order-command-server",
        "database.hostname": "mysql_db_server",
        "database.port": "3306",
        "database.user": "order-command-user",
        "database.password": "password",
        "database.server.id": "142401",
        "database.server.name": "order-command-server",
        "database.whitelist": "order-command-db",
        "table.whitelist": "order-command-db.payment, order-command-db.shipping_location,order-command-db.item_detail",
        "schema.history.internal.kafka.bootstrap.servers": "broker:29092",
        "schema.history.internal.kafka.topic": "dbhistory.order-command-db",
        "include.schema.changes": "true",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
    }
}

Step 2: MongoDB Sink Connector

With the Platform Running (See Above).

POST http://localhost:8083/connectors
--

{
    "name": "order-app-mongo-sink-connector",
    "config": {
        "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
        "topics": "customer-order-aggregate",
        "connection.uri": "mongodb://mongo-user:password@mongodb_server:27017",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false,
        "database": "customer_order_db",
        "collection": "customerOrder",
        "document.id.strategy.overwrite.existing": "true",
        "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy",
        "transforms": "hk,hv",
        "transforms.hk.type": "org.apache.kafka.connect.transforms.HoistField$Key",
        "transforms.hk.field": "_id",
        "transforms.hv.type": "org.apache.kafka.connect.transforms.HoistField$Value",
        "transforms.hv.field": "customerOrder"
    }
}


Development Troubleshooting Notes

UnsatisfiedDependencyException

Iorg.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'dataSourceScriptDatabaseInitializer'

This is due to the JPA Data Source not being configured correctly. This can be fixed in the following way:

  • Add the data source configuration to application.yml.
spring:
  datasource:
    url: jdbc:mysql://localhost:13306/order-command-db
    username: order-command-user
    password: password
    driver-class-name: com.mysql.cj.jdbc.Driver

  jpa:
    generate-ddl: true
    show-sql: true

    hibernate:
      ddl-auto: update
      show-sql: true
    properties:
      hibernate:
        format_sql: true
        dialect: org.hibernate.dialect.MySQLDialect

Dependency error configuring in-memory database for @DataJpaTest

Add this to your build.gradle.kts:

testImplementation("com.h2database:h2")

DDL error when running @DataJpaTest

This annotation starts up an in-memory database to run the JPA repository tests.

In application.yml you do not need this:

#        dialect: org.hibernate.dialect.MySQLDialect

Debezium MySQLConnector Error: Access denied; you need the SUPER, RELOAD, or FLUSH_TABLES privilege for this operation

Need to grant privileges for the Debezium MySQLConnector

 GRANT ALL PRIVILEGES ON *.* TO 'order-command-user';

My Discoveries (What I Learned)

Item Discovery Notes
Using @DataJpaTest for JPA Repository Tests This annotation bootstraps the test with an in-memory H2 SQL database.
Leverage Spring profiles for ./gradlew bootRun, ./apps start (local-docker), local-embedded for testing See _src/main/resources/application.yml
Access privileges for CDC Connector Check out ./docker/mysql/init/init-db.sql to grant privileges to the order-command-user.