Connections
kafka
Commentary
added in 0.0.5
Connects to an Apache Kafka cluster.
Automatic topic creation
By default for convenience, any topics ShadowTraffic writes to will be automatically created if they don't exist. This makes it easier to iterate on your generators without flipping back and forth between ShadowTraffic and the Kafka admin tools.
Topics will be created with the broker default partition count and replication factor, but you can control these with tablePolicy
. 1
Manual topic control
If you don't want ShadowTraffic to control your topics, set topicPolicy
to manual
. 2
Built-in serializers
All listed serializers correspond to their Apache Kafka equivalent.
One additional serializer, io.shadowtraffic.kafka.serdes.JsonSerializer
, has been provided for convenience.
The full list of built-in serializers is:
org.apache.kafka.common.serialization.StringSerializer
org.apache.kafka.common.serialization.ByteArraySerializer
org.apache.kafka.common.serialization.IntegerSerializer
org.apache.kafka.common.serialization.LongSerializer
org.apache.kafka.common.serialization.DoubleSerializer
org.apache.kafka.common.serialization.FloatSerializer
org.apache.kafka.common.serialization.ShortSerializer
org.apache.kafka.common.serialization.ByteBufferSerializer
org.apache.kafka.common.serialization.BytesSerializer
org.apache.kafka.common.serialization.UuidSerializer
io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer
io.confluent.kafka.serializers.KafkaAvroSerializer
io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
io.shadowtraffic.kafka.serdes.JsonSerializer
Schema Registry
To use Schema Registry, simply set schema.registry.url
in the connection map when using any Confluent-provided serializers, like KafkaAvroSerializer
. You can also set the accompanying basic.auth.user.info
config if the Schema Registry instance requires authentication. 3
Custom serializers
ShadowTraffic also supports custom serializers. To use them, create a class that implements the org.apache.kafka.common.serialization.Serializer
interface.
The generic type you use for that interface depends on the kind of data you are generating. If you're generating a primitive type, like {"value" {"_gen" "boolean"}}
, you should use Serializer<Boolean>
- likewise for whatever other primitive types you're using.
But if you're generating a nested type, like:
{"value": {"foo": {"_gen": "boolean"}}}
...implement Serializer<Map<Object, Object>>
.
To use your serializer, create a jar with all the dependencies the serializer needs, then run ShadowTraffic with your serializer on the classpath, like so:
docker run --net=host --env-file license.env -v ./your-serializer.jar:/home/serde.jar -v ./your-config.json:/home/config.json --entrypoint=java shadowtraffic/shadowtraffic:latest -cp '/home/shadowtraffic.jar:/home/serde.jar' io.shadowtraffic.generator.entrypoint --config /home/config.json
Examples
Configuring the connection
Basic configuration for the Kafka connection. You must choose default serializers, but you can override these for each generator.
{
"connections": {
"dev-kafka": {
"kind": "kafka",
"producerConfigs": {
"bootstrap.servers": "localhost:9092",
"key.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"value.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer"
}
}
}
}
Overriding serializers
Use producerConfigs
at the generator-level to override the producer properties. This example overrides the default JSON serializer for the value and uses a String serializer instead.
{
"generators": [
{
"topic": "sandbox",
"value": {
"_gen": "sequentialString",
"expr": "id-~d"
},
"producerConfigs": {
"value.serializer": "org.apache.kafka.common.serialization.StringSerializer"
}
}
],
"connections": {
"kafka": {
"kind": "kafka",
"producerConfigs": {
"bootstrap.servers": "localhost:9092",
"key.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"value.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer"
}
}
}
}
Overriding topic properties
Use topicPolicy
in the connections map to override how topics are automatically created. This example overrides the default partition and replication factor set on the brokers.
{
"kafka": {
"kind": "kafka",
"topicPolicy": {
"policy": "create",
"partitions": 6,
"replicationFactor": 2
},
"producerConfigs": {
"bootstrap.servers": "localhost:9092",
"key.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"value.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer"
}
}
}
Drop and recreate topics
Use the policy dropAndCreate
to drop the topics and recreate them before each run. This can be useful if you're using a stream processor off the back of Kafka.
{
"kafka": {
"kind": "kafka",
"topicPolicy": {
"policy": "dropAndCreate"
},
"producerConfigs": {
"bootstrap.servers": "localhost:9092",
"key.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"value.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer"
}
}
}
Manual topic creation
Use the policy manual
to prevent ShadowTraffic from automatically creating topics for you. When you do this, your topics must already exist before ShadowTraffic tries to write to them.
{
"kafka": {
"kind": "kafka",
"topicPolicy": {
"policy": "manual"
},
"producerConfigs": {
"bootstrap.servers": "localhost:9092",
"key.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"value.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer"
}
}
}
Connecting to Schema Registry
Use schema.registry.url
and any additional configuration to use serializers that communicate with Schema Registry.
{
"kafka": {
"kind": "kafka",
"producerConfigs": {
"bootstrap.servers": "localhost:9092",
"schema.registry.url": "http://localhost:8081",
"key.serializer": "io.confluent.kafka.serializers.KafkaAvroSerializer",
"value.serializer": "io.confluent.kafka.serializers.KafkaAvroSerializer"
}
}
}
Connecting to Confluent Cloud
Use additional security configuration to connect to hosted Kafka providers, like Confluent Cloud.
For the broker, use security.protocol
, sasl.jaas.config
, and sasl.mechanism
.
For Schema Registry, use basic.auth.credentials.source
and basic.auth.user.info
{
"confluent": {
"kind": "kafka",
"producerConfigs": {
"bootstrap.servers": "xxx.confluent.cloud:9092",
"schema.registry.url": "https://xxx.confluent.cloud",
"basic.auth.credentials.source": "USER_INFO",
"value.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"key.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"basic.auth.user.info": "xxx:xxx",
"sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='xxx' password='xxx';",
"sasl.mechanism": "PLAIN",
"security.protocol": "SASL_SSL"
}
}
}
Connecting to Aiven
To connect to Aiven, you need to download the CA certicate for your cluster and generate keystore/truststore files. Aiven has a good article explaining how to do this with their access key, access certificate, and CA certificates. When you're done, be sure to mount your keystore/truststore files into your container to ShadowTraffic can access them, like so:
docker run ... -v $(pwd)/ssl:/home/ssl
Where ssl
is a local directory containing your authentication files.
To authenticate with a client certicate, use these connection settings:
{
"connections": {
"aiven": {
"kind": "kafka",
"producerConfigs": {
"ssl.truststore.location": "/home/ssl/client.truststore.jks",
"bootstrap.servers": "xxxx.aivencloud.com:16550",
"ssl.key.password": "xxxxx",
"ssl.keystore.password": "xxxxx",
"ssl.keystore.location": "/home/ssl/client.keystore.p12",
"ssl.truststore.password": "yyyyyy",
"ssl.keystore.type": "PKCS12",
"value.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"key.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"ssl.truststore.type": "JKS",
"security.protocol": "SSL"
}
}
}
}
To authenticate with SASL, use these connection settings. Get your username and password from the Aiven console.
{
"connections": {
"aiven": {
"kind": "kafka",
"producerConfigs": {
"ssl.truststore.location": "/home/ssl/client.truststore.jks",
"bootstrap.servers": "xxxx.aivencloud.com:16561",
"ssl.truststore.password": "yyyyyy",
"value.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"key.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"ssl.truststore.type": "JKS",
"sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username='xxx' password='xxx';",
"sasl.mechanism": "SCRAM-SHA-256",
"security.protocol": "SASL_SSL"
}
}
}
}
Automatic schema creation
When you use the JSON Schema or Avro serializers, ShadowTraffic will by default automatically attempt to guess your schema and submit to Schema Registry. This is helpful if you're developing new streams and don't want to fuss around with maintaining both your data generation logic and schema.
For example, if you run this configuration in ShadowTraffic:
{
"generators": [
{
"topic": "sandbox",
"value": {
"id": {
"_gen": "uuid"
},
"state": {
"_gen": "boolean"
}
}
}
],
"connections": {
"kafka": {
"kind": "kafka",
"producerConfigs": {
"bootstrap.servers": "localhost:9092",
"schema.registry.url": "http://localhost:8081",
"key.serializer": "io.confluent.kafka.serializers.KafkaAvroSerializer",
"value.serializer": "io.confluent.kafka.serializers.KafkaAvroSerializer"
}
}
}
}
It will create this schema in Schema Registry for you:
{
"type": "record",
"name": "sandboxValue",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "state",
"type": "boolean"
}
]
}
If you already have your desired schema, or want to override the schema ShadowTraffic picked automatically, you can supply it explicitly. 4 5
Using Avro schema files
If you already have Avro schemas in their own files, use the loadJsonFile
function to load them directly and supply the schemas explicitly with avroSchemaHint
.
{
"topic": "sandbox",
"key": {
"_gen": "boolean"
},
"value": {
"_gen": "sequentialString",
"expr": "id-~d"
},
"localConfigs": {
"avroSchemaHint": {
"key": {
"_gen": "loadJsonFile",
"file": "/path/to/keySchema.avsc"
},
"value": {
"_gen": "loadJsonFile",
"file": "/path/to/ValueSchema.avsc"
}
}
}
}
Using Avro type unions
If your Avro schema has a type union, be sure to explicitly supply the type of the value in the generator.
In this example, id
can either be string
or null
. To satisfy the Avro encoding, the generated value is wrapped with {"string": ...}
.
{
"topic": "sandbox",
"value": {
"id": {
"string": {
"_gen": "sequentialString",
"expr": "id-~d"
}
}
},
"localConfigs": {
"avroSchemaHint": {
"value": {
"type": "record",
"name": "recordValue",
"fields": [
{
"name": "id",
"type": [
"null",
"string"
],
"default": null
}
]
}
}
}
}
Using Protobuf schema files
If you're serializing your events with Protobuf, use the protobufSchemaHint
local configuration to supply the schema explicitly.
Using JSON Schema files
If you're serializing your events with JSON Schema and want to explicitly supply your schema (instead of allowing ShadowTraffic to automatically generate a schema for you), use the jsonSchemaHint
local configuration.
Setting record headers
Use the optional headers
field to set record headers. headers
must be an array of maps, with each map containing a key
and value
field.
Both key
and value
must be strings, the latter of which will be encoded as UTF-8 bytes. If value
is not a string, ShadowTraffic will attempt to coerce it into one.
{
"topic": "sandbox",
"headers": [
{
"key": "region",
"value": {
"_gen": "oneOf",
"choices": [
"us-east-1",
"us-west-2"
]
}
}
],
"value": {
"id": {
"_gen": "uuid"
}
}
}
Changing the log level
By default, the underlying Kafka client library will log messages at the INFO
level. You can override that using logLevel
set to FATAL
, WARN
, ERROR
, DEBUG
, TRACE
, or explicitly set to INFO
.
{
"connections": {
"kafka": {
"kind": "kafka",
"logLevel": "DEBUG",
"producerConfigs": {
"bootstrap.servers": "localhost:9092",
"key.serializer": "io.confluent.kafka.serializers.KafkaAvroSerializer",
"value.serializer": "io.confluent.kafka.serializers.KafkaAvroSerializer"
}
}
}
}
Specification
Connection JSON schema
{
"type": "object",
"properties": {
"kind": {
"type": "string",
"const": "kafka"
},
"producerConfigs": {
"type": "object",
"properties": {
"compression": {
"type": "string"
},
"bootstrap.servers": {
"type": "string"
},
"schema.registry.url": {
"type": "string"
},
"linger.ms": {
"type": "string"
},
"value.serializer": {
"type": "string"
},
"key.serializer": {
"type": "string"
},
"batch.size": {
"type": "string"
},
"acks": {
"type": "string"
},
"sasl.jaas.config": {
"type": "string"
},
"sasl.mechanism": {
"type": "string"
},
"security.protocol": {
"type": "string"
}
},
"required": [
"bootstrap.servers",
"key.serializer",
"value.serializer"
]
},
"topicPolicy": {
"type": "object",
"properties": {
"policy": {
"type": "string",
"enum": [
"manual",
"create",
"dropAndCreate"
]
},
"partitions": {
"type": "integer",
"minimum": 1
},
"replicationFactor": {
"type": "integer",
"minimum": 1
}
},
"required": [
"policy"
]
},
"logLevel": {
"type": "string",
"enum": [
"INFO",
"WARN",
"ERROR",
"FATAL",
"DEBUG",
"TRACE"
]
}
},
"required": [
"producerConfigs"
]
}
Generator JSON schema
{
"type": "object",
"properties": {
"connection": {
"type": "string"
},
"name": {
"type": "string"
},
"topic": {
"type": "string"
},
"key": {},
"value": {},
"headers": {
"type": "array",
"items": {
"type": "object",
"properties": {
"key": {
"oneOf": [
{
"type": "string"
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
},
"value": {
"oneOf": [
{
"type": "string"
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
},
"required": [
"key",
"value"
]
}
},
"localConfigs": {
"type": "object",
"properties": {
"throttleMs": {
"oneOf": [
{
"type": "number",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
},
"maxEvents": {
"oneOf": [
{
"type": "integer",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
},
"kafkaKeyProtobufHint": {
"type": "object",
"properties": {
"schemaFile": {
"type": "string"
},
"message": {
"type": "string"
}
},
"required": [
"schemaFile",
"message"
]
},
"jsonSchemaHint": {
"type": "object"
},
"maxBytes": {
"type": "integer",
"minimum": 1
},
"discard": {
"type": "object",
"properties": {
"rate": {
"type": "number",
"minimum": 0,
"maximum": 1
}
},
"required": [
"rate"
]
},
"key": {
"type": "object",
"properties": {
"null": {
"type": "object",
"properties": {
"rate": {
"oneOf": [
{
"type": "number",
"minimum": 0,
"maximum": 1
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
},
"required": [
"rate"
]
}
}
},
"repeat": {
"type": "object",
"properties": {
"rate": {
"type": "number",
"minimum": 0,
"maximum": 1
},
"times": {
"oneOf": [
{
"type": "integer",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
},
"required": [
"rate",
"times"
]
},
"value": {
"type": "object",
"properties": {
"null": {
"type": "object",
"properties": {
"rate": {
"oneOf": [
{
"type": "number",
"minimum": 0,
"maximum": 1
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
},
"required": [
"rate"
]
}
}
},
"protobufSchemaHint": {
"type": "object",
"patternProperties": {
"^.*$": {
"type": "object",
"properties": {
"schemaFile": {
"type": "string"
},
"message": {
"type": "string"
}
},
"required": [
"schemaFile",
"message"
]
}
}
},
"maxHistoryEvents": {
"type": "integer",
"minimum": 0
},
"maxMs": {
"type": "integer",
"minimum": 0
},
"time": {
"type": "integer"
},
"events": {
"type": "object",
"properties": {
"exactly": {
"oneOf": [
{
"type": "integer",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
}
},
"delay": {
"type": "object",
"properties": {
"rate": {
"type": "number",
"minimum": 0,
"maximum": 1
},
"ms": {
"oneOf": [
{
"type": "integer",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
},
"required": [
"rate",
"ms"
]
},
"history": {
"type": "object",
"properties": {
"events": {
"type": "object",
"properties": {
"max": {
"type": "integer",
"minimum": 0
}
}
}
}
},
"avroSchemaHint": {
"type": "object"
},
"throttle": {
"type": "object",
"properties": {
"ms": {
"oneOf": [
{
"type": "number",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
}
},
"throughput": {
"oneOf": [
{
"type": "integer",
"minimum": 1
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
},
"timeMultiplier": {
"oneOf": [
{
"type": "number"
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
},
"kafkaValueProtobufHint": {
"type": "object",
"properties": {
"schemaFile": {
"type": "string"
},
"message": {
"type": "string"
}
},
"required": [
"schemaFile",
"message"
]
}
}
},
"producerConfigs": {
"type": "object",
"properties": {
"key.serializer": {
"type": "string"
},
"value.serializer": {
"type": "string"
}
}
}
},
"required": [
"topic"
],
"anyOf": [
{
"required": [
"key"
]
},
{
"required": [
"value"
]
}
]
}