Skip to main content

Connections

kafka

Commentary

added in 0.0.5

Connects to an Apache Kafka cluster.

Automatic topic creation

By default for convenience, any topics ShadowTraffic writes to will be automatically created if they don't exist. This makes it easier to iterate on your generators without flipping back and forth between ShadowTraffic and the Kafka admin tools.

Topics will be created with the broker default partition count and replication factor, but you can control these with tablePolicy. 1

Manual topic control

If you don't want ShadowTraffic to control your topics, set topicPolicy to manual. 2

Built-in serializers

All listed serializers correspond to their Apache Kafka equivalent.

One additional serializer, io.shadowtraffic.kafka.serdes.JsonSerializer, has been provided for convenience.

The full list of built-in serializers is:

org.apache.kafka.common.serialization.StringSerializer
org.apache.kafka.common.serialization.ByteArraySerializer
org.apache.kafka.common.serialization.IntegerSerializer
org.apache.kafka.common.serialization.LongSerializer
org.apache.kafka.common.serialization.DoubleSerializer
org.apache.kafka.common.serialization.FloatSerializer
org.apache.kafka.common.serialization.ShortSerializer
org.apache.kafka.common.serialization.ByteBufferSerializer
org.apache.kafka.common.serialization.BytesSerializer
org.apache.kafka.common.serialization.UuidSerializer
io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer
io.confluent.kafka.serializers.KafkaAvroSerializer
io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
io.shadowtraffic.kafka.serdes.JsonSerializer

Schema Registry

To use Schema Registry, simply set schema.registry.url in the connection map when using any Confluent-provided serializers, like KafkaAvroSerializer. You can also set the accompanying basic.auth.user.info config if the Schema Registry instance requires authentication. 3

Custom serializers

ShadowTraffic also supports custom serializers. To use them, create a class that implements the org.apache.kafka.common.serialization.Serializer interface.

The generic type you use for that interface depends on the kind of data you are generating. If you're generating a primitive type, like {"value" {"_gen" "boolean"}}, you should use Serializer<Boolean> - likewise for whatever other primitive types you're using.

But if you're generating a nested type, like:

{"value": {"foo": {"_gen": "boolean"}}}

...implement Serializer<Map<Object, Object>>.

To use your serializer, create a jar with all the dependencies the serializer needs, then run ShadowTraffic with your serializer on the classpath, like so:

docker run --net=host --env-file license.env -v ./your-serializer.jar:/home/serde.jar -v ./your-config.json:/home/config.json  --entrypoint=java shadowtraffic/shadowtraffic:latest -cp '/home/shadowtraffic.jar:/home/serde.jar' io.shadowtraffic.generator.entrypoint --config /home/config.json

Examples

Configuring the connection

Basic configuration for the Kafka connection. You must choose default serializers, but you can override these for each generator.

{
"connections": {
"dev-kafka": {
"kind": "kafka",
"producerConfigs": {
"bootstrap.servers": "localhost:9092",
"key.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"value.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer"
}
}
}
}

Overriding producer configuration

To change how the underlying Kafka producer behaves, you can supply additional configuration to producerConfigs at either the connection or generator level. You can use any key/value pair the Kafka Java producer accepts.

When custom producer settings are supplied at both the configuration and generator level, the generator's values will take priority.

In this example, the connection specifies that the value side of the Kafka record should use the JSON serializer. But the generator overrides this, specifying that it should use the String serializer instead.

{
"generators": [
{
"topic": "sandbox",
"value": {
"_gen": "sequentialString",
"expr": "id-~d"
},
"producerConfigs": {
"value.serializer": "org.apache.kafka.common.serialization.StringSerializer"
}
}
],
"connections": {
"kafka": {
"kind": "kafka",
"producerConfigs": {
"bootstrap.servers": "localhost:9092",
"key.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"value.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer"
}
}
}
}

Overriding topic properties

Use topicPolicy in the connections map to override how topics are automatically created. This example overrides the default partition and replication factor set on the brokers.

{
"kafka": {
"kind": "kafka",
"topicPolicy": {
"policy": "create",
"partitions": 6,
"replicationFactor": 2
},
"producerConfigs": {
"bootstrap.servers": "localhost:9092",
"key.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"value.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer"
}
}
}

Drop and recreate topics

Use the policy dropAndCreate to drop the topics and recreate them before each run. This can be useful if you're using a stream processor off the back of Kafka.

{
"kafka": {
"kind": "kafka",
"topicPolicy": {
"policy": "dropAndCreate"
},
"producerConfigs": {
"bootstrap.servers": "localhost:9092",
"key.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"value.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer"
}
}
}

Manual topic creation

Use the policy manual to prevent ShadowTraffic from automatically creating topics for you. When you do this, your topics must already exist before ShadowTraffic tries to write to them.

{
"kafka": {
"kind": "kafka",
"topicPolicy": {
"policy": "manual"
},
"producerConfigs": {
"bootstrap.servers": "localhost:9092",
"key.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"value.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer"
}
}
}

Connecting to Schema Registry

Use schema.registry.url and any additional configuration to use serializers that communicate with Schema Registry.

{
"kafka": {
"kind": "kafka",
"producerConfigs": {
"bootstrap.servers": "localhost:9092",
"schema.registry.url": "http://localhost:8081",
"key.serializer": "io.confluent.kafka.serializers.KafkaAvroSerializer",
"value.serializer": "io.confluent.kafka.serializers.KafkaAvroSerializer"
}
}
}

Connecting to Confluent Cloud

Use additional security configuration to connect to hosted Kafka providers, like Confluent Cloud.

For the broker, use security.protocol, sasl.jaas.config, and sasl.mechanism.

For Schema Registry, use basic.auth.credentials.source and basic.auth.user.info

{
"confluent": {
"kind": "kafka",
"producerConfigs": {
"bootstrap.servers": "xxx.confluent.cloud:9092",
"schema.registry.url": "https://xxx.confluent.cloud",
"basic.auth.credentials.source": "USER_INFO",
"value.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"key.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"basic.auth.user.info": "xxx:xxx",
"sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='xxx' password='xxx';",
"sasl.mechanism": "PLAIN",
"security.protocol": "SASL_SSL"
}
}
}

Connecting to Aiven

To connect to Aiven, you need to download the CA certicate for your cluster and generate keystore/truststore files. Aiven has a good article explaining how to do this with their access key, access certificate, and CA certificates. When you're done, be sure to mount your keystore/truststore files into your container to ShadowTraffic can access them, like so:

docker run ... -v $(pwd)/ssl:/home/ssl

Where ssl is a local directory containing your authentication files.

To authenticate with a client certicate, use these connection settings:

{
"connections": {
"aiven": {
"kind": "kafka",
"producerConfigs": {
"ssl.truststore.location": "/home/ssl/client.truststore.jks",
"bootstrap.servers": "xxxx.aivencloud.com:16550",
"ssl.key.password": "xxxxx",
"ssl.keystore.password": "xxxxx",
"ssl.keystore.location": "/home/ssl/client.keystore.p12",
"ssl.truststore.password": "yyyyyy",
"ssl.keystore.type": "PKCS12",
"value.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"key.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"ssl.truststore.type": "JKS",
"security.protocol": "SSL"
}
}
}
}

To authenticate with SASL, use these connection settings. Get your username and password from the Aiven console.

{
"connections": {
"aiven": {
"kind": "kafka",
"producerConfigs": {
"ssl.truststore.location": "/home/ssl/client.truststore.jks",
"bootstrap.servers": "xxxx.aivencloud.com:16561",
"ssl.truststore.password": "yyyyyy",
"value.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"key.serializer": "io.shadowtraffic.kafka.serdes.JsonSerializer",
"ssl.truststore.type": "JKS",
"sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username='xxx' password='xxx';",
"sasl.mechanism": "SCRAM-SHA-256",
"security.protocol": "SASL_SSL"
}
}
}
}

Automatic schema creation

When you use the JSON Schema or Avro serializers, ShadowTraffic will by default automatically attempt to guess your schema and submit to Schema Registry. This is helpful if you're developing new streams and don't want to fuss around with maintaining both your data generation logic and schema.

For example, if you run this configuration in ShadowTraffic:

{
"generators": [
{
"topic": "sandbox",
"value": {
"id": {
"_gen": "uuid"
},
"state": {
"_gen": "boolean"
}
}
}
],
"connections": {
"kafka": {
"kind": "kafka",
"producerConfigs": {
"bootstrap.servers": "localhost:9092",
"schema.registry.url": "http://localhost:8081",
"key.serializer": "io.confluent.kafka.serializers.KafkaAvroSerializer",
"value.serializer": "io.confluent.kafka.serializers.KafkaAvroSerializer"
}
}
}
}

It will create this schema in Schema Registry for you:

{
"type": "record",
"name": "sandboxValue",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "state",
"type": "boolean"
}
]
}

If you already have your desired schema, or want to override the schema ShadowTraffic picked automatically, you can supply it explicitly. 4 5

Using Avro schema files

If you already have Avro schemas in their own files, use the loadJsonFile function to load them directly and supply the schemas explicitly with avroSchemaHint.

{
"topic": "sandbox",
"key": {
"_gen": "boolean"
},
"value": {
"_gen": "sequentialString",
"expr": "id-~d"
},
"localConfigs": {
"avroSchemaHint": {
"key": {
"_gen": "loadJsonFile",
"file": "/path/to/keySchema.avsc"
},
"value": {
"_gen": "loadJsonFile",
"file": "/path/to/ValueSchema.avsc"
}
}
}
}

Using Avro type unions

If your Avro schema has a type union, be sure to explicitly supply the type of the value in the generator.

In this example, id can either be string or null. To satisfy the Avro encoding, the generated value is wrapped with {"string": ...}.

{
"topic": "sandbox",
"value": {
"id": {
"string": {
"_gen": "sequentialString",
"expr": "id-~d"
}
}
},
"localConfigs": {
"avroSchemaHint": {
"value": {
"type": "record",
"name": "recordValue",
"fields": [
{
"name": "id",
"type": [
"null",
"string"
],
"default": null
}
]
}
}
}
}

Resolving Avro schema references

If your Avro schema has external schema references and you want them registered with Schema Registry first, you can supply them through avroSchemas at the connection level.

These schemas will be registered in the order listed, with the specified subject name, and be available to use in any generator.

In this example, there are two schemas: Parent and MyChild. Parent references MyChild by name, which works because it's registered under the subject name MySubject before any data is ever generated.

{
"generators": [
{
"topic": "sandbox",
"value": {
"top": {
"_gen": "uuid"
},
"child": {
"bottom": {
"_gen": "uuid"
}
}
},
"localConfigs": {
"avroSchemaHint": {
"value": {
"type": "record",
"name": "Parent",
"fields": [
{
"name": "top",
"type": "string"
},
{
"name": "child",
"type": "MyChild"
}
]
}
}
}
}
],
"connections": {
"kafka": {
"kind": "kafka",
"producerConfigs": {
"bootstrap.servers": "localhost:9092",
"schema.registry.url": "http://localhost:8081",
"key.serializer": "io.confluent.kafka.serializers.KafkaAvroSerializer",
"value.serializer": "io.confluent.kafka.serializers.KafkaAvroSerializer"
},
"avroSchemas": [
{
"subject": "MySubject",
"schema": {
"type": "record",
"name": "MyChild",
"fields": [
{
"name": "bottom",
"type": "string"
}
]
}
}
]
}
}
}

Using Protobuf schema files

If you're serializing your events with Protobuf, use the protobufSchemaHint local configuration to supply the schema explicitly.

Using JSON Schema files

If you're serializing your events with JSON Schema and want to explicitly supply your schema (instead of allowing ShadowTraffic to automatically generate a schema for you), use the jsonSchemaHint local configuration.

Setting record headers

Use the optional headers field to set record headers. headers must be an array of maps, with each map containing a key and value field.

Both key and value must be strings, the latter of which will be encoded as UTF-8 bytes. If value is not a string, ShadowTraffic will attempt to coerce it into one.

{
"topic": "sandbox",
"headers": [
{
"key": "region",
"value": {
"_gen": "oneOf",
"choices": [
"us-east-1",
"us-west-2"
]
}
}
],
"value": {
"id": {
"_gen": "uuid"
}
}
}

Changing the log level

By default, the underlying Kafka client library will log messages at the INFO level. You can override that using logLevel set to FATAL, WARN, ERROR, DEBUG, TRACE, or explicitly set to INFO.

{
"connections": {
"kafka": {
"kind": "kafka",
"logLevel": "DEBUG",
"producerConfigs": {
"bootstrap.servers": "localhost:9092",
"key.serializer": "io.confluent.kafka.serializers.KafkaAvroSerializer",
"value.serializer": "io.confluent.kafka.serializers.KafkaAvroSerializer"
}
}
}
}

Specification

Connection JSON schema

{
"type": "object",
"properties": {
"kind": {
"type": "string",
"const": "kafka"
},
"producerConfigs": {
"type": "object",
"properties": {
"compression": {
"type": "string"
},
"bootstrap.servers": {
"type": "string"
},
"schema.registry.url": {
"type": "string"
},
"linger.ms": {
"type": "string"
},
"value.serializer": {
"type": "string"
},
"key.serializer": {
"type": "string"
},
"batch.size": {
"type": "string"
},
"acks": {
"type": "string"
},
"sasl.jaas.config": {
"type": "string"
},
"sasl.mechanism": {
"type": "string"
},
"security.protocol": {
"type": "string"
}
},
"required": [
"bootstrap.servers",
"key.serializer",
"value.serializer"
]
},
"topicPolicy": {
"type": "object",
"properties": {
"policy": {
"type": "string",
"enum": [
"manual",
"create",
"dropAndCreate"
]
},
"partitions": {
"type": "integer",
"minimum": 1
},
"replicationFactor": {
"type": "integer",
"minimum": 1
}
},
"required": [
"policy"
]
},
"avroSchemas": {
"type": "array",
"items": {
"type": "object",
"properties": {
"subject": {
"type": "string"
},
"schema": {}
},
"required": [
"subject",
"schema"
]
}
},
"logLevel": {
"type": "string",
"enum": [
"INFO",
"WARN",
"ERROR",
"FATAL",
"DEBUG",
"TRACE"
]
}
},
"required": [
"producerConfigs"
]
}

Generator JSON schema

{
"type": "object",
"properties": {
"connection": {
"type": "string"
},
"name": {
"type": "string"
},
"topic": {
"type": "string"
},
"key": {},
"value": {},
"headers": {
"type": "array",
"items": {
"type": "object",
"properties": {
"key": {
"oneOf": [
{
"type": "string"
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
},
"value": {
"oneOf": [
{
"type": "string"
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
},
"required": [
"key",
"value"
]
}
},
"localConfigs": {
"type": "object",
"properties": {
"throttleMs": {
"oneOf": [
{
"type": "number",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
},
"maxEvents": {
"oneOf": [
{
"type": "integer",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
},
"kafkaKeyProtobufHint": {
"type": "object",
"properties": {
"schemaFile": {
"type": "string"
},
"message": {
"type": "string"
}
},
"required": [
"schemaFile",
"message"
]
},
"jsonSchemaHint": {
"type": "object"
},
"maxBytes": {
"type": "integer",
"minimum": 1
},
"discard": {
"type": "object",
"properties": {
"rate": {
"type": "number",
"minimum": 0,
"maximum": 1
}
},
"required": [
"rate"
]
},
"key": {
"type": "object",
"properties": {
"null": {
"type": "object",
"properties": {
"rate": {
"oneOf": [
{
"type": "number",
"minimum": 0,
"maximum": 1
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
},
"required": [
"rate"
]
}
}
},
"repeat": {
"type": "object",
"properties": {
"rate": {
"type": "number",
"minimum": 0,
"maximum": 1
},
"times": {
"oneOf": [
{
"type": "integer",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
},
"required": [
"rate",
"times"
]
},
"value": {
"type": "object",
"properties": {
"null": {
"type": "object",
"properties": {
"rate": {
"oneOf": [
{
"type": "number",
"minimum": 0,
"maximum": 1
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
},
"required": [
"rate"
]
}
}
},
"protobufSchemaHint": {
"type": "object",
"patternProperties": {
"^.*$": {
"type": "object",
"properties": {
"schemaFile": {
"type": "string"
},
"message": {
"type": "string"
}
},
"required": [
"schemaFile",
"message"
]
}
}
},
"maxHistoryEvents": {
"type": "integer",
"minimum": 0
},
"maxMs": {
"type": "integer",
"minimum": 0
},
"time": {
"type": "integer"
},
"events": {
"type": "object",
"properties": {
"exactly": {
"oneOf": [
{
"type": "integer",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
}
},
"delay": {
"type": "object",
"properties": {
"rate": {
"type": "number",
"minimum": 0,
"maximum": 1
},
"ms": {
"oneOf": [
{
"type": "integer",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
},
"required": [
"rate",
"ms"
]
},
"history": {
"type": "object",
"properties": {
"events": {
"type": "object",
"properties": {
"max": {
"type": "integer",
"minimum": 0
}
}
}
}
},
"avroSchemaHint": {
"type": "object"
},
"throttle": {
"type": "object",
"properties": {
"ms": {
"oneOf": [
{
"type": "number",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
}
},
"throughput": {
"oneOf": [
{
"type": "integer",
"minimum": 1
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
},
"timeMultiplier": {
"oneOf": [
{
"type": "number"
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
},
"kafkaValueProtobufHint": {
"type": "object",
"properties": {
"schemaFile": {
"type": "string"
},
"message": {
"type": "string"
}
},
"required": [
"schemaFile",
"message"
]
}
}
},
"producerConfigs": {
"type": "object",
"properties": {
"key.serializer": {
"type": "string"
},
"value.serializer": {
"type": "string"
}
}
}
},
"required": [
"topic"
],
"anyOf": [
{
"required": [
"key"
]
},
{
"required": [
"value"
]
}
]
}