A mock for the Kinesis API, intended for local testing.
There are a few ways to start kinesis-mock.
It is available as a docker image in the GitHub Container Registry:
docker pull ghcr.io/etspaceman/kinesis-mock:0.4.1
docker run -p 4567:4567 -p 4568:4568 ghcr.io/etspaceman/kinesis-mock:0.4.1
It is available on NPM as an executable service.
npm i kinesis-local
npx kinesis-local
You can also leverage the following executable options in the release assets:
File | Description | Launching |
---|---|---|
main.js |
Executable NodeJS file that can be run in any NodeJS enabled environment | node ./main.js |
main.js.map |
Source mappings for main.js | |
server.json |
self-signed certificate for TLS. Should be included in the same area as main.js |
Below is the available configuration for the service. Note that it is not recommended to edit the ports in the docker environment (rather you can map these ports to a local one).
Variable | Data Type | Default Value | Notes |
---|---|---|---|
INITIALIZE_STREAMS | String | A comma-delimited string of stream names, its optional corresponding shard count and an optional region to initialize during startup. If the shard count is not provided, the default shard count of 4 is used. If the region is not provided, the default region is used. For example: "my-first-stream:1,my-other-stream::us-west-2,my-last-stream:1" | |
KINESIS_MOCK_TLS_PORT | Int | 4567 | Https Only |
KINESIS_MOCK_PLAIN_PORT | Int | 4568 | Http Only |
KINESIS_MOCK_KEYSTORE_PASSWORD | Int | Password for the JKS KeyStore (only for JVM, not JS) | |
KINESIS_MOCK_KEYMANAGER_PASSWORD | Int | Password for the JKS KeyManager (only for JVM, not JS) | |
KINESIS_MOCK_CERT_PASSWORD | Int | Password used for self-signed certificate (only for JS, not JVM) | |
KINESIS_MOCK_CERT_PATH | Int | server.json | Path to certificate file (only for JS, not JVM) |
CREATE_STREAM_DURATION | Duration | 500ms | |
DELETE_STREAM_DURATION | Duration | 500ms | |
REGISTER_STREAM_CONSUMER_DURATION | Duration | 500ms | |
START_STREAM_ENCRYPTION_DURATION | Duration | 500ms | |
STOP_STREAM_ENCRYPTION_DURATION | Duration | 500ms | |
DEREGISTER_STREAM_CONSUMER_DURATION | Duration | 500ms | |
MERGE_SHARDS_DURATION | Duration | 500ms | |
SPLIT_SHARD_DURATION | Duration | 500ms | |
UPDATE_SHARD_COUNT_DURATION | Duration | 500ms | |
UPDATE_STREAM_MODE_DURATION | Duration | 500ms | |
SHARD_LIMIT | Int | 50 | |
ON_DEMAND_STREAM_COUNT_LIMIT | Int | 10 | |
AWS_ACCOUNT_ID | String | "000000000000" | |
AWS_REGION | String | "us-east-1" | Default region in use for operations. E.g. if a region is not provided by the INITIALIZE_STREAMS values. |
LOG_LEVEL | String | "INFO" | Sets the log-level for kinesis-mock specific logs |
ROOT_LOG_LEVEL | String | "ERROR" | Sets the log-level for all dependencies |
LOAD_DATA_IF_EXISTS | Boolean | true | Loads data from the configured persisted data file if it exists |
SHOULD_PERSIST_DATA | Boolean | false | Persists data to disk. Used to keep data during restarts of the service |
PERSIST_PATH | String | "data" | Path to persist data to. If it doesn't start with "/", the path is considered relative to the present working directory. |
PERSIST_FILE_NAME | String | "kinesis-data.json" | File name for persisted data |
PERSIST_INTERVAL | Duration | 5s | Delay between data persistence |
You can configure the LOG_LEVEL
of the mock with the following levels in mind:
ERROR
- Unhandled errors in the serviceWARN
- Handled errors in the service (e.g. bad requests)INFO
- High-level, low-noise informational messages (default)DEBUG
- Low-level, high-noise informational messagesTRACE
- Log data bodies going in / out of the service
The image exposes 2 ports for interactions:
- 4567 (https)
- 4568 (http)
For an example docker-compose setup which uses this image, check out the docker-compose.yml file.
There are examples configuring the KPL, KCL and AWS SDK to use this mock in the integration tests.
import software.amazon.awssdk.auth.credentials.{AwsCredentials,AwsCredentialsProvider}
import software.amazon.awssdk.http.SdkHttpConfigurationOption
import software.amazon.awssdk.http.async.SdkAsyncHttpClient
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient
import software.amazon.awssdk.services.kinesis.model._
import software.amazon.awssdk.utils.AttributeMap
object MyApp {
// A mock credentials provider
final case class AwsCreds(accessKey: String, secretKey: String)
extends AwsCredentials
with AwsCredentialsProvider {
override def accessKeyId(): String = accessKey
override def secretAccessKey(): String = secretKey
override def resolveCredentials(): AwsCredentials = this
}
object AwsCreds {
val LocalCreds: AwsCreds =
AwsCreds("mockKinesisAccessKey", "mockKinesisSecretKey")
}
// The kinesis-mock uses a self-signed certificate
private val trustAllCertificates =
AttributeMap
.builder()
.put(
SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
java.lang.Boolean.TRUE
)
.build()
def nettyClient: SdkAsyncHttpClient =
NettyNioAsyncHttpClient
.builder()
.buildWithDefaults(trustAllCertificates)
val kinesisClient: KinesisAsyncClient =
KinesisAsyncClient
.builder()
.httpClient(nettyClient)
.region(Region.US_EAST_1)
.credentialsProvider(AwsCreds.LocalCreds)
.endpointOverride(URI.create(s"https://localhost:4567"))
.build()
}
import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider}
object MyApp {
// A mock credentials provider
final case class AwsCreds(accessKey: String, secretKey: String)
extends AWSCredentials
with AWSCredentialsProvider {
override def getAWSAccessKeyId: String = accessKey
override def getAWSSecretKey: String = secretKey
override def getCredentials: AWSCredentials = this
override def refresh(): Unit = ()
}
object AwsCreds {
val LocalCreds: AwsCreds =
AwsCreds("mockKinesisAccessKey", "mockKinesisSecretKey")
}
val kplProducer = new KinesisProducer(
new KinesisProducerConfiguration()
.setCredentialsProvider(AwsCreds.LocalCreds)
.setRegion(Regions.US_EAST_1.getName)
.setKinesisEndpoint("localhost")
.setKinesisPort(4567L)
.setCloudwatchEndpoint("localhost")
.setCloudwatchPort(4566L) // Using localstack's Cloudwatch port
.setVerifyCertificate(false)
)
}
import software.amazon.awssdk.auth.credentials.{AwsCredentials,AwsCredentialsProvider}
import software.amazon.awssdk.http.SdkHttpConfigurationOption
import software.amazon.awssdk.http.async.SdkAsyncHttpClient
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient
import software.amazon.awssdk.services.kinesis.model._
import software.amazon.awssdk.utils.AttributeMap
import software.amazon.kinesis.checkpoint.CheckpointConfig
import software.amazon.kinesis.common._
import software.amazon.kinesis.coordinator.{CoordinatorConfig, Scheduler}
import software.amazon.kinesis.leases.LeaseManagementConfig
import software.amazon.kinesis.lifecycle.LifecycleConfig
import software.amazon.kinesis.lifecycle.events._
import software.amazon.kinesis.metrics.MetricsConfig
import software.amazon.kinesis.processor._
import software.amazon.kinesis.retrieval.polling.PollingConfig
import software.amazon.kinesis.retrieval.RetrievalConfig
object MyApp {
// A mock credentials provider
final case class AwsCreds(accessKey: String, secretKey: String)
extends AwsCredentials
with AwsCredentialsProvider {
override def accessKeyId(): String = accessKey
override def secretAccessKey(): String = secretKey
override def resolveCredentials(): AwsCredentials = this
}
object AwsCreds {
val LocalCreds: AwsCreds =
AwsCreds("mockKinesisAccessKey", "mockKinesisSecretKey")
}
// The kinesis-mock uses a self-signed certificate
private val trustAllCertificates =
AttributeMap
.builder()
.put(
SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
java.lang.Boolean.TRUE
)
.build()
def nettyClient: SdkAsyncHttpClient =
NettyNioAsyncHttpClient
.builder()
.buildWithDefaults(trustAllCertificates)
val kinesisClient: KinesisAsyncClient =
KinesisAsyncClient
.builder()
.httpClient(nettyClient)
.region(Region.US_EAST_1)
.credentialsProvider(AwsCreds.LocalCreds)
.endpointOverride(URI.create(s"https://localhost:4567"))
.build()
val cloudwatchClient: CloudWatchAsyncClient -
CloudWatchAsyncClient
.builder()
.httpClient(nettyClient)
.region(Region.US_EAST_1)
.credentialsProvider(AwsCreds.LocalCreds)
.endpointOverride(URI.create(s"https://localhost:4566")) // localstack port
.build()
val dynamoClient: DynamoDbAsyncClient -
DynamoDbAsyncClient
.builder()
.httpClient(nettyClient)
.region(Region.US_EAST_1)
.credentialsProvider(AwsCreds.LocalCreds)
.endpointOverride(URI.create(s"http://localhost:8000")) // dynamodb-local port
.build()
object KCLRecordProcessor extends ShardRecordProcessor {
override def initialize(x: InitializationInput): Unit = ()
override def processRecords(x: ProcessRecordsInput): Unit = println(s"GOT RECORDS: $x")
override def leaseLost(x: LeaseLostInput): Unit = ()
override def shardEnded(x: ShardEndedInput): Unit = ()
override def shutdownRequested(x: ShutdownRequestedInput): Unit = ()
}
object KCLRecordProcessorFactory extends ShardRecordProcessorFactory {
override def shardRecordProcessor(): ShardRecordProcessor =
KCLRecordProcessor
override def shardRecordProcessor(
streamIdentifier: StreamIdentifier
): ShardRecordProcessor = KCLRecordProcessor
}
val appName = "some-app-name"
val workerId = "some-worker-id"
val streamName = "some-stream-name"
// kinesis-mock only supports polling consumers today
val retrievalSpecificConfig = new PollingConfig(streamName, kinesisClient)
// Consumer can be executed from this by running scheduler.run()
val scheduler = new Scheduler(
new CheckpointConfig(),
new CoordinatorConfig(appName)
.parentShardPollIntervalMillis(1000L),
new LeaseManagementConfig(
appName,
dynamoClient,
kinesisClient,
workerId
).shardSyncIntervalMillis(1000L),
new LifecycleConfig(),
new MetricsConfig(cloudwatchClient, appName),
new ProcessorConfig(KCLRecordProcessorFactory),
new RetrievalConfig(
kinesisClient,
streamName,
appName
).initialPositionInStreamExtended(
InitialPositionInStreamExtended.newInitialPosition(
InitialPositionInStream.TRIM_HORIZON
)
).retrievalSpecificConfig(retrievalSpecificConfig)
.retrievalFactory(retrievalSpecificConfig.retrievalFactory())
)
}
- Does not currently support SubscribeToShard due to lack of push-promise support (https://github.com/http4s/http4s/issues/4624)