Skip to main content

Connections

s3

Commentary

added in 0.2.0

Connects to Amazon S3. The following environment variables must be set:

  • AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY, or AWS_SESSION_TOKEN.
  • AWS_REGION

You can set Docker environment variables with either -e or --env-file, similar to how the license environment variables are passed.

The target bucket must exist prior to writing data.

A new object will be created following the default batch rate, which can be overriden by time, elements, or serialized bytes. 1

You can also control the object format 2 and, optionally, compression. 3

Objects are created with the key name <key-prefix>-<ulid>.<file-suffix>, where ulid is a monotically increasing ULID. This means all objects in the bucket are sortable by key name.

You can also connect to S3-compatible services like Tigris 4 or MinIO. 5


Examples

Configuring the connection

This connection doesn't require any other mandatory configuration.

{
"connections": {
"s3-staging-org": {
"kind": "s3"
}
}
}

Set the batch rate

By default, a new blob will be created every 500 ms or 5000 elements, whichever happens first. You can also optionally create a new blob after a certain amount of serialized bytes have been accumulated.

To override these:

  • use lingerMs to set the limit on time
  • use batchElements to set it on number of events
  • use batchBytes to set it on size
{
"connections": {
"s3-staging-org": {
"kind": "s3",
"batchConfigs": {
"lingerMs": 2000,
"batchElements": 10000,
"batchBytes": 5242880
}
}
}
}

Setting the key and format

Use bucketConfigs to set a keyPrefix and format for the object. keyPrefix is a fully qualified path that may contain slashes (e.g. /my/folder/object-).

format can be any of json, jsonl, log, and parquet.

Additionally:

  • pretty set to true will cause json to pretty print.
  • explodeJsonlArrays set to true will cause jsonl arrays to span one element per line.
  • format set to log means that data must be a string.
  • compression can optionally be set to gzip.
{
"generators": [
{
"bucket": "sandbox",
"bucketConfigs": {
"keyPrefix": "foo-",
"format": "jsonl"
},
"data": {
"a": {
"_gen": "uuid"
},
"b": {
"_gen": "boolean"
}
}
}
],
"connections": {
"s3-staging-org": {
"kind": "s3"
}
}
}

Setting the compression

You can optionally compress the object content with compression. Currently, only gzip is supported for gzipped content.

{
"bucket": "sandbox",
"bucketConfigs": {
"keyPrefix": "bar-",
"format": "jsonl",
"compression": "gzip"
},
"data": {
"a": {
"_gen": "boolean"
},
"b": {
"_gen": "uuid"
}
}
}

Connecting to Tigris

Set endpoint to the Tigris global endpoint. By contrast to the Tigris docs, you must set the AWS_REGION environment variable to an existing region, such as us-east-1, instead of auto. The particular region doesn't matter. This is a quirk of the underlying AWS library that ShadowTraffic uses.

{
"connections": {
"s3-staging-org": {
"kind": "s3",
"connectionConfigs": {
"endpoint": "https://fly.storage.tigris.dev"
}
}
}
}

Connecting to MinIO

Set endpoint to the MinIO server and set the respective AWS_* variables to connect to the instance. You want also need to enable path style access, which can be done with the respective parameter below.

{
"connections": {
"s3-staging-org": {
"kind": "s3",
"connectionConfigs": {
"endpoint": "http://minio.example.com:5938",
"pathStyleAccess": true
}
}
}
}

Changing the log level

By default, the underlying S3 client library will log messages at the INFO level. You can override that using logLevel set to FATAL, WARN, ERROR, DEBUG, TRACE, or explicitly set to INFO.

It can be especially useful to set logging to DEBUG if you're experiencing unexpected permission issues connecting to S3 because it will log the raw request/response pairs.

{
"connections": {
"s3-staging-org": {
"kind": "s3",
"logLevel": "DEBUG"
}
}
}

Specification

Connection JSON schema

{
"type": "object",
"properties": {
"kind": {
"type": "string",
"const": "s3"
},
"batchConfigs": {
"type": "object",
"properties": {
"lingerMs": {
"type": "integer",
"minimum": 0
},
"batchElements": {
"type": "integer",
"minimum": 0
},
"batchBytes": {
"type": "integer",
"minimum": 0
}
}
},
"logLevel": {
"type": "string",
"enum": [
"INFO",
"WARN",
"ERROR",
"FATAL",
"DEBUG",
"TRACE"
]
},
"connectionConfigs": {
"type": "object",
"properties": {
"endpoint": {
"type": "string"
},
"pathStyleAccess": {
"type": "boolean"
}
}
}
}
}

Generator JSON schema

{
"type": "object",
"properties": {
"connection": {
"type": "string"
},
"name": {
"type": "string"
},
"bucket": {
"type": "string"
},
"data": {
"type": "object"
},
"localConfigs": {
"type": "object",
"properties": {
"throttleMs": {
"oneOf": [
{
"type": "number",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
},
"maxEvents": {
"oneOf": [
{
"type": "integer",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
},
"kafkaKeyProtobufHint": {
"type": "object",
"properties": {
"schemaFile": {
"type": "string"
},
"message": {
"type": "string"
}
},
"required": [
"schemaFile",
"message"
]
},
"jsonSchemaHint": {
"type": "object"
},
"maxBytes": {
"type": "integer",
"minimum": 1
},
"discard": {
"type": "object",
"properties": {
"rate": {
"type": "number",
"minimum": 0,
"maximum": 1
},
"retainHistory": {
"type": "boolean"
}
},
"required": [
"rate"
]
},
"repeat": {
"type": "object",
"properties": {
"rate": {
"type": "number",
"minimum": 0,
"maximum": 1
},
"times": {
"oneOf": [
{
"type": "integer",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
},
"required": [
"rate",
"times"
]
},
"protobufSchemaHint": {
"type": "object",
"patternProperties": {
"^.*$": {
"type": "object",
"properties": {
"schemaFile": {
"type": "string"
},
"message": {
"type": "string"
}
},
"required": [
"schemaFile",
"message"
]
}
}
},
"maxHistoryEvents": {
"type": "integer",
"minimum": 0
},
"maxMs": {
"type": "integer",
"minimum": 0
},
"time": {
"type": "integer"
},
"events": {
"type": "object",
"properties": {
"exactly": {
"oneOf": [
{
"type": "integer",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
}
},
"delay": {
"type": "object",
"properties": {
"rate": {
"type": "number",
"minimum": 0,
"maximum": 1
},
"ms": {
"oneOf": [
{
"type": "integer",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
},
"required": [
"rate",
"ms"
]
},
"history": {
"type": "object",
"properties": {
"events": {
"type": "object",
"properties": {
"max": {
"type": "integer",
"minimum": 0
}
}
}
}
},
"avroSchemaHint": {
"type": "object"
},
"throttle": {
"type": "object",
"properties": {
"ms": {
"oneOf": [
{
"type": "number",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
}
},
"throughput": {
"oneOf": [
{
"type": "integer",
"minimum": 1
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
},
"timeMultiplier": {
"oneOf": [
{
"type": "number"
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
},
"kafkaValueProtobufHint": {
"type": "object",
"properties": {
"schemaFile": {
"type": "string"
},
"message": {
"type": "string"
}
},
"required": [
"schemaFile",
"message"
]
}
}
},
"bucketConfigs": {
"type": "object",
"properties": {
"keyPrefix": {
"type": "string"
},
"format": {
"type": "string",
"enum": [
"json",
"jsonl",
"parquet",
"log"
]
},
"pretty": {
"type": "boolean"
},
"compression": {
"type": "string",
"enum": [
"gzip"
]
},
"subdir": {
"oneOf": [
{
"type": "string"
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
},
"required": [
"keyPrefix",
"format"
]
}
},
"required": [
"bucket",
"data",
"bucketConfigs"
]
}