Connections
s3
Commentary
added in 0.2.0
Connects to Amazon S3. The following environment variables must be set:
AWS_ACCESS_KEY_ID
andAWS_SECRET_ACCESS_KEY
, orAWS_SESSION_TOKEN
.AWS_REGION
You can set Docker environment variables with either -e
or --env-file
, similar to how the license environment variables are passed.
The target bucket must exist prior to writing data.
A new object will be created following the default batch rate, which can be overriden by time, elements, or serialized bytes. 1
You can also control the object format 2 and, optionally, compression. 3
Objects are created with the key name <key-prefix>-<ulid>.<file-suffix>
, where ulid
is a monotically increasing ULID. This means all objects in the bucket are sortable by key name.
You can also connect to S3-compatible services like Tigris 4 or MinIO. 5
Examples
Configuring the connection
This connection doesn't require any other mandatory configuration.
{
"connections": {
"s3-staging-org": {
"kind": "s3"
}
}
}
Set the batch rate
By default, a new blob will be created every 500
ms or 5000
elements, whichever happens first. You can also optionally create a new blob after a certain amount of serialized bytes have been accumulated.
To override these:
- use
lingerMs
to set the limit on time - use
batchElements
to set it on number of events - use
batchBytes
to set it on size
{
"connections": {
"s3-staging-org": {
"kind": "s3",
"batchConfigs": {
"lingerMs": 2000,
"batchElements": 10000,
"batchBytes": 5242880
}
}
}
}
Setting the key and format
Use bucketConfigs
to set a keyPrefix
and format
for the object. keyPrefix
is a fully qualified path that may contain slashes (e.g. /my/folder/object-
).
format
can be any of json
, jsonl
, log
, and parquet
.
Additionally:
pretty
set totrue
will causejson
to pretty print.explodeJsonlArrays
set totrue
will causejsonl
arrays to span one element per line.format
set tolog
means thatdata
must be a string.compression
can optionally be set togzip
.
{
"generators": [
{
"bucket": "sandbox",
"bucketConfigs": {
"keyPrefix": "foo-",
"format": "jsonl"
},
"data": {
"a": {
"_gen": "uuid"
},
"b": {
"_gen": "boolean"
}
}
}
],
"connections": {
"s3-staging-org": {
"kind": "s3"
}
}
}
Setting the compression
You can optionally compress the object content with compression
. Currently, only gzip
is supported for gzipped content.
{
"bucket": "sandbox",
"bucketConfigs": {
"keyPrefix": "bar-",
"format": "jsonl",
"compression": "gzip"
},
"data": {
"a": {
"_gen": "boolean"
},
"b": {
"_gen": "uuid"
}
}
}
Connecting to Tigris
Set endpoint
to the Tigris global endpoint. By contrast to the Tigris docs, you must set the AWS_REGION
environment variable to an existing region, such as us-east-1
, instead of auto
. The particular region doesn't matter. This is a quirk of the underlying AWS library that ShadowTraffic uses.
{
"connections": {
"s3-staging-org": {
"kind": "s3",
"connectionConfigs": {
"endpoint": "https://fly.storage.tigris.dev"
}
}
}
}
Connecting to MinIO
Set endpoint
to the MinIO server and set the respective AWS_*
variables to connect to the instance. You want also need to enable path style access, which can be done with the respective parameter below.
{
"connections": {
"s3-staging-org": {
"kind": "s3",
"connectionConfigs": {
"endpoint": "http://minio.example.com:5938",
"pathStyleAccess": true
}
}
}
}
Changing the log level
By default, the underlying S3 client library will log messages at the INFO
level. You can override that using logLevel
set to FATAL
, WARN
, ERROR
, DEBUG
, TRACE
, or explicitly set to INFO
.
It can be especially useful to set logging to DEBUG
if you're experiencing unexpected permission issues connecting to S3 because it will log the raw request/response pairs.
{
"connections": {
"s3-staging-org": {
"kind": "s3",
"logLevel": "DEBUG"
}
}
}
Adjusting concurrency
When ShadowTraffic writes to S3, it does so asynchronously: it generates some events and uses a number of threads to write them in parallel.
This mostly works great, but sometimes you might want to adjust that degree of concurrency. This is especially helpful if you're generating very large objects.
You can change it by using the following two optional parameters under writerConfigs
:
bufferDepth
: the maximum number of events to buffer for parallel writers to consume from. By default, this number is10000
.concurrency
: the number of parallel writers. By default, this number is the number of cores ShadowTraffic has access to.
{
"connections": {
"s3": {
"kind": "s3",
"writerConfigs": {
"bufferDepth": 5000,
"concurrency": 6
}
}
}
}
Specification
Connection JSON schema
{
"type": "object",
"properties": {
"kind": {
"type": "string",
"const": "s3"
},
"batchConfigs": {
"type": "object",
"properties": {
"lingerMs": {
"type": "integer",
"minimum": 0
},
"batchElements": {
"type": "integer",
"minimum": 0
},
"batchBytes": {
"type": "integer",
"minimum": 0
}
}
},
"writerConfigs": {
"type": "object",
"properties": {
"bufferDepth": {
"type": "integer",
"minimum": 1
},
"concurrency": {
"type": "integer",
"minimum": 1
}
}
},
"logLevel": {
"type": "string",
"enum": [
"INFO",
"WARN",
"ERROR",
"FATAL",
"DEBUG",
"TRACE"
]
},
"connectionConfigs": {
"type": "object",
"properties": {
"endpoint": {
"type": "string"
},
"pathStyleAccess": {
"type": "boolean"
}
}
}
}
}
Generator JSON schema
{
"type": "object",
"properties": {
"connection": {
"type": "string"
},
"name": {
"type": "string"
},
"bucket": {
"type": "string"
},
"data": {
"type": "object"
},
"localConfigs": {
"type": "object",
"properties": {
"throttleMs": {
"oneOf": [
{
"type": "number",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
},
"maxEvents": {
"oneOf": [
{
"type": "integer",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
},
"kafkaKeyProtobufHint": {
"type": "object",
"properties": {
"schemaFile": {
"type": "string"
},
"message": {
"type": "string"
}
},
"required": [
"schemaFile",
"message"
]
},
"jsonSchemaHint": {
"type": "object"
},
"maxBytes": {
"type": "integer",
"minimum": 1
},
"discard": {
"type": "object",
"properties": {
"rate": {
"type": "number",
"minimum": 0,
"maximum": 1
},
"retainHistory": {
"type": "boolean"
}
},
"required": [
"rate"
]
},
"repeat": {
"type": "object",
"properties": {
"rate": {
"type": "number",
"minimum": 0,
"maximum": 1
},
"times": {
"oneOf": [
{
"type": "integer",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
},
"required": [
"rate",
"times"
]
},
"protobufSchemaHint": {
"type": "object",
"patternProperties": {
"^.*$": {
"type": "object",
"properties": {
"schemaFile": {
"type": "string"
},
"message": {
"type": "string"
}
},
"required": [
"schemaFile",
"message"
]
}
}
},
"maxHistoryEvents": {
"type": "integer",
"minimum": 0
},
"maxMs": {
"type": "integer",
"minimum": 0
},
"time": {
"type": "integer"
},
"events": {
"type": "object",
"properties": {
"exactly": {
"oneOf": [
{
"type": "integer",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
}
},
"delay": {
"type": "object",
"properties": {
"rate": {
"type": "number",
"minimum": 0,
"maximum": 1
},
"ms": {
"oneOf": [
{
"type": "integer",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
},
"required": [
"rate",
"ms"
]
},
"history": {
"type": "object",
"properties": {
"events": {
"type": "object",
"properties": {
"max": {
"type": "integer",
"minimum": 0
}
}
}
}
},
"avroSchemaHint": {
"type": "object"
},
"throttle": {
"type": "object",
"properties": {
"ms": {
"oneOf": [
{
"type": "number",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
}
},
"throughput": {
"oneOf": [
{
"type": "integer",
"minimum": 1
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
},
"timeMultiplier": {
"oneOf": [
{
"type": "number"
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
},
"kafkaValueProtobufHint": {
"type": "object",
"properties": {
"schemaFile": {
"type": "string"
},
"message": {
"type": "string"
}
},
"required": [
"schemaFile",
"message"
]
}
}
},
"bucketConfigs": {
"type": "object",
"properties": {
"keyPrefix": {
"type": "string"
},
"format": {
"type": "string",
"enum": [
"json",
"jsonl",
"parquet",
"log"
]
},
"pretty": {
"type": "boolean"
},
"compression": {
"type": "string",
"enum": [
"gzip"
]
},
"subdir": {
"oneOf": [
{
"type": "string"
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
},
"required": [
"keyPrefix",
"format"
]
}
},
"required": [
"bucket",
"data",
"bucketConfigs"
]
}