Skip to content

unbroken-dome/kafka-streams-testing

Repository files navigation

Kafka Streams Test Library

This library contains some tools to test Kafka Streams topologies.

It is inspired by the Mocked Streams Scala project, but is implemented in Java and offers some JUnit 4/5 sugar and Hamcrest matchers.

Like Mocked Streams, it makes use of the ProcessorTopologyTestDriver class that is part of Kafka’s own test libraries. This driver allows testing most features of Kafka Streams without having an actual Kafka instance running, which makes it very fast and suitable for unit tests. The ProcessorTopologyTestDriver is encapsulated behind a convenient interface named MockStreams, which is the main interaction point for tests.

Setup

The library is available on JCenter. Click the "set me up" button on the JCenter page to see instructions for setting up the JCenter repository in your build.

Import the library as follows:

Gradle
dependencies {
    testImplementation 'org.unbroken-dome.kafka-streams-testing:kafka-streams-testing:1.0.0'
}

Maven

<dependency>
    <groupId>org.unbroken-dome.kafka-streams-testing</groupId>
    <artifactId>kafka-streams-testing</artifactId>
    <version>1.0.0</version>
</dependency>

Using the JUnit 5 Extension

The library contains a JUnit 5 extension that allows for setting up and testing Kafka Streams topologies in a clean way.

// Register the MockStreamsExtension with the test
@ExtendWith(MockStreamsExtension.class)
public class StreamsTest {

    // All methods annotated with @Topology will be used to set up
    // the topology under test with a Kafka StreamsBuilder. You can
    // have multiple @Topology methods, even in @Nested classes.
    @Topology
    public void topology(StreamsBuilder builder) {
        // This is the "word count" example from the Kafka Streams documentation.
        // Usually you would point this method at your production code that sets
        // up the topology to test.
        builder.stream("plaintext-input", Consumed.with(Serdes.String(), Serdes.String()))
                .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
                .groupBy((key, word) -> word, Serialized.with(Serdes.String(), Serdes.String()))
                .count(Materialized
                        .<String, Long, KeyValueStore<Bytes, byte[]>>as("Counts")
                        .withKeySerde(Serdes.String())
                        .withValueSerde(Serdes.Long()))
                .toStream()
                .to("wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
    }

    // The extension allows us to inject a MockStreams instance into any
    // test or configuration method.
    @BeforeEach
    public void generateInputs(MockStreams mockStreams) {
        // Use the MockStreams instance to send some input records to the topic
        mockStreams.input("plaintext-input", Serdes.String(), Serdes.String())
                .record(null, "all streams lead to kafka")
                .record(null, "hello kafka streams");
    }

    @Test
    public void testStoreContents(MockStreams mockStreams) {
        // We can use verifyKeyValueStore to make assertions on the contents of
        // a KeyValueStore
        mockStreams.<String, Long>verifyKeyValueStore("Counts", (store) -> {
            assertThat(store.get("all"), equalTo(1L));
            assertThat(store.get("streams"), equalTo(2L));
            assertThat(store.get("lead"), equalTo(1L));
            assertThat(store.get("to"), equalTo(1L));
            assertThat(store.get("kafka"), equalTo(2L));
            assertThat(store.get("hello"), equalTo(1L));
        });
    }

    @Test
    public void testStoreVerificationWithMatcher(MockStreams mockStreams) {
        // The same test but with the KeyValueStoreMatchers.hasEntryFor matcher,
        // which allows for more informative error messages
        mockStreams.<String, Long>verifyKeyValueStore("Counts", (store) -> {
            assertThat(store, hasEntryFor("all", equalTo(1L)));
            assertThat(store, hasEntryFor("streams", equalTo(2L)));
            assertThat(store, hasEntryFor("lead", equalTo(1L)));
            assertThat(store, hasEntryFor("to", equalTo(1L)));
            assertThat(store, hasEntryFor("kafka", equalTo(2L)));
            assertThat(store, hasEntryFor("hello", equalTo(1L)));
        });
    }
}

Using the JUnit 4 Test Rule

For JUnit 4 tests, the MockStreams instance is made available through a custom test rule, and is set up using a builder-style syntax:

public class MockStreamsRuleTest {

    @Rule
    public final MockStreamsRule mockStreams = MockStreamsRule.builder()
            .withTopology(streamsBuilder -> {
                // Use the StreamsBuilder to set up the topology
            })
            .build();

    @Test
    public void testTopicVerification() {
        // Use MockStreams instance to send inputs and verify the results
    }
}

Releases

No releases published

Packages

No packages published

Languages