Skip to main content

Connections

kafka

Commentary

added in 0.0.5

Connects to an Apache Kafka cluster.

Automatic topic creation

By default for convenience, any topics ShadowTraffic writes to will be automatically created if they don't exist. This makes it easier to iterate on your generators without flipping back and forth between ShadowTraffic and the Kafka admin tools.

You can control the properties of how the topic is created with tablePolicy. See the examples.

Manual topic control

If you don't want ShadowTraffic to control your topics, set topicPolicy to manual. See the examples.

Built-in serializers

All listed serializers correspond to their Apache Kafka equivalent.

One additional serializer, io.shadowtraffic.kafka.serdes.JsonSerializer, has been provided for convenience.

The full list of built-in serializers is:

org.apache.kafka.common.serialization.StringSerializer
org.apache.kafka.common.serialization.ByteArraySerializer
org.apache.kafka.common.serialization.IntegerSerializer
org.apache.kafka.common.serialization.LongSerializer
org.apache.kafka.common.serialization.DoubleSerializer
org.apache.kafka.common.serialization.FloatSerializer
org.apache.kafka.common.serialization.ShortSerializer
org.apache.kafka.common.serialization.ByteBufferSerializer
org.apache.kafka.common.serialization.BytesSerializer
org.apache.kafka.common.serialization.UuidSerializer
io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer
io.confluent.kafka.serializers.KafkaAvroSerializer
io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
io.shadowtraffic.kafka.serdes.JsonSerializer

Custom serializers

You may also use custom serializers. Create a class that implements the org.apache.kafka.common.serialization.Serializer interface.

The generic type you use for that interface depends on the kind of data you are generating. If you're generating a primitive type, like {"value" {"_gen" "boolean"}}, you should use Serializer<Boolean> - likewise for whatever other primitive types you're using.

But if you're generating a nested type, like:

{"value" {"foo" {"_gen" "boolean"}}}

...implement Serializer<Map<Object, Object>>.

To use your serializer, create a jar with all the dependencies the serializer needs, then run ShadowTraffic with your serializer on the classpath, like so:

docker run --net=host --env-file license.env -v ./your-serializer.jar:/home/serde.jar -v ./your-config.json:/home/config.json  --entrypoint=java shadowtraffic/shadowtraffic:latest -cp '/home/shadowtraffic.jar:/home/serde.jar' io.shadowtraffic.generator.entrypoint --config /home/config.json

Schema Registry

Set schema.registry.url when using any Confluent-provided serializers, like KafkaAvroSerializer, since they require Schema Registry to function.


Examples

Configuring the connection

Basic configuration for the Kafka connection. You must choose default serializers, but you can override these for each generator.

{
"connections": {
"dev-kafka": {
"kind": "kafka",
"producerConfigs": {
"bootstrap.servers": "localhost:9092",
"key.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"value.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer"
}
}
}
}

Overriding serializers

Use producerConfigs at the generator-level to override the producer properties. This example overrides the default JSON serializer for the value and uses a String serializer instead.

{
"generators": [
{
"topic": "sandbox",
"value": {
"_gen": "sequentialString",
"expr": "id-~d"
},
"producerConfigs": {
"value.serializer": "org.apache.kafka.common.serialization.StringSerializer"
}
}
],
"connections": {
"kafka": {
"kind": "kafka",
"producerConfigs": {
"bootstrap.servers": "localhost:9092",
"key.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"value.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer"
}
}
}
}

Overriding topic properties

Use topicPolicy in the connections map to override how topics are automatically created. This example overrides the default partition and replication factor set on the brokers.

{
"kafka": {
"kind": "kafka",
"topicPolicy": {
"policy": "create",
"partitions": 6,
"replicationFactor": 2
},
"producerConfigs": {
"bootstrap.servers": "localhost:9092",
"key.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"value.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer"
}
}
}

Drop and recreate topics

Use the policy dropAndCreate to drop the topics and recreate them before each run. This can be useful if you're using a stream processor off the back of Kafka.

{
"kafka": {
"kind": "kafka",
"topicPolicy": {
"policy": "dropAndCreate"
},
"producerConfigs": {
"bootstrap.servers": "localhost:9092",
"key.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"value.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer"
}
}
}

Manual topic creation

Use the policy manual to prevent ShadowTraffic from automatically creating topics for you. When you do this, your topics must already exist before ShadowTraffic tries to write to them.

{
"kafka": {
"kind": "kafka",
"topicPolicy": {
"policy": "manual"
},
"producerConfigs": {
"bootstrap.servers": "localhost:9092",
"key.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"value.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer"
}
}
}

Connecting to Confluent Cloud

Use additional security configuration to connect to hosted Kafka providers, like Confluent Cloud.

{
"kafka": {
"kind": "kafka",
"producerConfigs": {
"bootstrap.servers": "xxx.confluent.cloud:9092",
"schema.registry.url": "https://xxx.confluent.cloud",
"basic.auth.credentials.source": "USER_INFO",
"value.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"key.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"basic.auth.user.info": "xxx:xxx",
"sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='xxx' password='xxx';",
"sasl.mechanism": "PLAIN",
"security.protocol": "SASL_SSL"
}
}
}

Specification

Connection JSON schema

{
"type": "object",
"properties": {
"kind": {
"type": "string",
"const": "kafka"
},
"producerConfigs": {
"type": "object",
"properties": {
"compression": {
"type": "string"
},
"bootstrap.servers": {
"type": "string"
},
"schema.registry.url": {
"type": "string"
},
"linger.ms": {
"type": "string"
},
"value.serializer": {
"type": "string"
},
"key.serializer": {
"type": "string"
},
"batch.size": {
"type": "string"
},
"acks": {
"type": "string"
},
"sasl.jaas.config": {
"type": "string"
},
"sasl.mechanism": {
"type": "string",
"enum": [
"PLAIN",
"SCRAM-SHA-256",
"SCRAM-SHA-512"
]
},
"security.protocol": {
"type": "string",
"enum": [
"SASL_SSL",
"SASL_PLAINTEXT"
]
}
},
"required": [
"bootstrap.servers",
"key.serializer",
"value.serializer"
]
},
"topicPolicy": {
"type": "object",
"properties": {
"policy": {
"type": "string",
"enum": [
"manual",
"create",
"dropAndCreate"
]
},
"partitions": {
"type": "integer",
"minimum": 1
},
"replicationFactor": {
"type": "integer",
"minimum": 1
}
},
"required": [
"policy"
]
}
},
"required": [
"producerConfigs"
]
}

Generator JSON schema

{
"type": "object",
"properties": {
"connection": {
"type": "string"
},
"topic": {
"type": "string"
},
"localConfigs": {
"type": "object",
"properties": {
"maxEvents": {
"oneOf": [
{
"type": "integer",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
},
"discard": {
"type": "object",
"properties": {
"rate": {
"type": "number",
"minimum": 0,
"maximum": 1
}
},
"required": [
"rate"
]
},
"key": {
"type": "object",
"properties": {
"null": {
"rate": {
"type": {
"type": "number",
"minimum": 0,
"maximum": 1
}
}
}
}
},
"repeat": {
"type": "object",
"properties": {
"rate": {
"type": "number",
"minimum": 0,
"maximum": 1
},
"times": {
"oneOf": [
{
"type": "integer",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
},
"required": [
"rate",
"times"
]
},
"value": {
"type": "object",
"properties": {
"null": {
"rate": {
"type": {
"type": "number",
"minimum": 0,
"maximum": 1
}
}
}
}
},
"events": {
"type": "object",
"properties": {
"exactly": {
"oneOf": [
{
"type": "integer",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
}
},
"delay": {
"type": "object",
"properties": {
"rate": {
"type": "number",
"minimum": 0,
"maximum": 1
},
"ms": {
"oneOf": [
{
"type": "integer",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
},
"required": [
"rate",
"ms"
]
},
"history": {
"type": "object",
"properties": {
"events": {
"type": "object",
"properties": {
"max": {
"type": "integer",
"minimum": 0
}
}
}
}
},
"throttle": {
"type": "object",
"properties": {
"ms": {
"oneOf": [
{
"type": "number",
"minimum": 0
},
{
"type": "object"
}
]
}
}
}
}
},
"producerConfigs": {
"type": "object",
"properties": {
"key.serializer": {
"type": "string"
},
"value.serializer": {
"type": "string"
}
}
}
},
"required": [
"topic"
],
"anyOf": [
{
"required": [
"key"
]
},
{
"required": [
"value"
]
}
]
}