Connections
kafka
Commentary
added in 0.0.5
Connects to an Apache Kafka cluster.
Automatic topic creation
By default for convenience, any topics ShadowTraffic writes to will be automatically created if they don't exist. This makes it easier to iterate on your generators without flipping back and forth between ShadowTraffic and the Kafka admin tools.
You can control the properties of how the topic is created with tablePolicy
. See the examples.
Manual topic control
If you don't want ShadowTraffic to control your topics, set topicPolicy
to manual
. See the examples.
Built-in serializers
All listed serializers correspond to their Apache Kafka equivalent.
One additional serializer, io.shadowtraffic.kafka.serdes.JsonSerializer
, has been provided for convenience.
The full list of built-in serializers is:
org.apache.kafka.common.serialization.StringSerializer
org.apache.kafka.common.serialization.ByteArraySerializer
org.apache.kafka.common.serialization.IntegerSerializer
org.apache.kafka.common.serialization.LongSerializer
org.apache.kafka.common.serialization.DoubleSerializer
org.apache.kafka.common.serialization.FloatSerializer
org.apache.kafka.common.serialization.ShortSerializer
org.apache.kafka.common.serialization.ByteBufferSerializer
org.apache.kafka.common.serialization.BytesSerializer
org.apache.kafka.common.serialization.UuidSerializer
io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer
io.confluent.kafka.serializers.KafkaAvroSerializer
io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
io.shadowtraffic.kafka.serdes.JsonSerializer
Custom serializers
You may also use custom serializers. Create a class that implements the org.apache.kafka.common.serialization.Serializer
interface.
The generic type you use for that interface depends on the kind of data you are generating. If you're generating a primitive type, like {"value" {"_gen" "boolean"}}
, you should use Serializer<Boolean>
- likewise for whatever other primitive types you're using.
But if you're generating a nested type, like:
{"value" {"foo" {"_gen" "boolean"}}}
...implement Serializer<Map<Object, Object>>
.
To use your serializer, create a jar with all the dependencies the serializer needs, then run ShadowTraffic with your serializer on the classpath, like so:
docker run --net=host --env-file license.env -v ./your-serializer.jar:/home/serde.jar -v ./your-config.json:/home/config.json --entrypoint=java shadowtraffic/shadowtraffic:latest -cp '/home/shadowtraffic.jar:/home/serde.jar' io.shadowtraffic.generator.entrypoint --config /home/config.json
Schema Registry
Set schema.registry.url
when using any Confluent-provided serializers, like KafkaAvroSerializer
, since they require Schema Registry to function.
Examples
Configuring the connection
Basic configuration for the Kafka connection. You must choose default serializers, but you can override these for each generator.
{
"connections": {
"dev-kafka": {
"kind": "kafka",
"producerConfigs": {
"bootstrap.servers": "localhost:9092",
"key.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"value.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer"
}
}
}
}
Overriding serializers
Use producerConfigs
at the generator-level to override the producer properties. This example overrides the default JSON serializer for the value and uses a String serializer instead.
{
"generators": [
{
"topic": "sandbox",
"value": {
"_gen": "sequentialString",
"expr": "id-~d"
},
"producerConfigs": {
"value.serializer": "org.apache.kafka.common.serialization.StringSerializer"
}
}
],
"connections": {
"kafka": {
"kind": "kafka",
"producerConfigs": {
"bootstrap.servers": "localhost:9092",
"key.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"value.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer"
}
}
}
}
Overriding topic properties
Use topicPolicy
in the connections map to override how topics are automatically created. This example overrides the default partition and replication factor set on the brokers.
{
"kafka": {
"kind": "kafka",
"topicPolicy": {
"policy": "create",
"partitions": 6,
"replicationFactor": 2
},
"producerConfigs": {
"bootstrap.servers": "localhost:9092",
"key.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"value.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer"
}
}
}
Drop and recreate topics
Use the policy dropAndCreate
to drop the topics and recreate them before each run. This can be useful if you're using a stream processor off the back of Kafka.
{
"kafka": {
"kind": "kafka",
"topicPolicy": {
"policy": "dropAndCreate"
},
"producerConfigs": {
"bootstrap.servers": "localhost:9092",
"key.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"value.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer"
}
}
}
Manual topic creation
Use the policy manual
to prevent ShadowTraffic from automatically creating topics for you. When you do this, your topics must already exist before ShadowTraffic tries to write to them.
{
"kafka": {
"kind": "kafka",
"topicPolicy": {
"policy": "manual"
},
"producerConfigs": {
"bootstrap.servers": "localhost:9092",
"key.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"value.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer"
}
}
}
Connecting to Confluent Cloud
Use additional security configuration to connect to hosted Kafka providers, like Confluent Cloud.
{
"kafka": {
"kind": "kafka",
"producerConfigs": {
"bootstrap.servers": "xxx.confluent.cloud:9092",
"schema.registry.url": "https://xxx.confluent.cloud",
"basic.auth.credentials.source": "USER_INFO",
"value.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"key.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"basic.auth.user.info": "xxx:xxx",
"sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='xxx' password='xxx';",
"sasl.mechanism": "PLAIN",
"security.protocol": "SASL_SSL"
}
}
}
Specification
Connection JSON schema
{
"type": "object",
"properties": {
"kind": {
"type": "string",
"const": "kafka"
},
"producerConfigs": {
"type": "object",
"properties": {
"compression": {
"type": "string"
},
"bootstrap.servers": {
"type": "string"
},
"schema.registry.url": {
"type": "string"
},
"linger.ms": {
"type": "string"
},
"value.serializer": {
"type": "string"
},
"key.serializer": {
"type": "string"
},
"batch.size": {
"type": "string"
},
"acks": {
"type": "string"
},
"sasl.jaas.config": {
"type": "string"
},
"sasl.mechanism": {
"type": "string",
"enum": [
"PLAIN",
"SCRAM-SHA-256",
"SCRAM-SHA-512"
]
},
"security.protocol": {
"type": "string",
"enum": [
"SASL_SSL",
"SASL_PLAINTEXT"
]
}
},
"required": [
"bootstrap.servers",
"key.serializer",
"value.serializer"
]
},
"topicPolicy": {
"type": "object",
"properties": {
"policy": {
"type": "string",
"enum": [
"manual",
"create",
"dropAndCreate"
]
},
"partitions": {
"type": "integer",
"minimum": 1
},
"replicationFactor": {
"type": "integer",
"minimum": 1
}
},
"required": [
"policy"
]
}
},
"required": [
"producerConfigs"
]
}
Generator JSON schema
{
"type": "object",
"properties": {
"connection": {
"type": "string"
},
"topic": {
"type": "string"
},
"localConfigs": {
"type": "object",
"properties": {
"maxEvents": {
"oneOf": [
{
"type": "integer",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
},
"discard": {
"type": "object",
"properties": {
"rate": {
"type": "number",
"minimum": 0,
"maximum": 1
}
},
"required": [
"rate"
]
},
"key": {
"type": "object",
"properties": {
"null": {
"rate": {
"type": {
"type": "number",
"minimum": 0,
"maximum": 1
}
}
}
}
},
"repeat": {
"type": "object",
"properties": {
"rate": {
"type": "number",
"minimum": 0,
"maximum": 1
},
"times": {
"oneOf": [
{
"type": "integer",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
},
"required": [
"rate",
"times"
]
},
"value": {
"type": "object",
"properties": {
"null": {
"rate": {
"type": {
"type": "number",
"minimum": 0,
"maximum": 1
}
}
}
}
},
"events": {
"type": "object",
"properties": {
"exactly": {
"oneOf": [
{
"type": "integer",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
}
},
"delay": {
"type": "object",
"properties": {
"rate": {
"type": "number",
"minimum": 0,
"maximum": 1
},
"ms": {
"oneOf": [
{
"type": "integer",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
},
"required": [
"rate",
"ms"
]
},
"history": {
"type": "object",
"properties": {
"events": {
"type": "object",
"properties": {
"max": {
"type": "integer",
"minimum": 0
}
}
}
}
},
"throttle": {
"type": "object",
"properties": {
"ms": {
"oneOf": [
{
"type": "number",
"minimum": 0
},
{
"type": "object"
}
]
}
}
}
}
},
"producerConfigs": {
"type": "object",
"properties": {
"key.serializer": {
"type": "string"
},
"value.serializer": {
"type": "string"
}
}
}
},
"required": [
"topic"
],
"anyOf": [
{
"required": [
"key"
]
},
{
"required": [
"value"
]
}
]
}