Skip to main content

Connections

azureBlobStorage

Commentary

added in 0.6.1

Connects to Azure Blob storage.

Credentials are read through access keys embedded in a connectionString. 1 Consider using env to avoid putting credentials into your configuration file.

The target container must exist prior to writing data.

A new blob will be created following the default batch rate, which can be overriden by time, elements, or serialized bytes. 2

In addition, setting any of the batch rates to 0 will cause ShadowTraffic to instantaneously flush each file. 3

Blobs are created with the key name <key-prefix>-<ulid>.<file-suffix>, where ulid is a monotically increasing ULID. This means all blobs in the container are sortable by key name.

You can choose from a range of serialization formats and compression types 4.

If you wish, instead of writing just one blob, you can also write multiple bobs in one shot. 5


Examples

Configuring the connection

Always set a connectionString to target the right container.

{
"connections": {
"azure": {
"kind": "azureBlobStorage",
"connectionConfigs": {
"connectionString": "xxxxxx"
}
}
}
}

Set the batch rate

By default, a new blob will be created every 500 ms or 5000 elements, whichever happens first. You can also optionally create a new blob after a certain amount of serialized bytes have been accumulated.

To override these:

  • use lingerMs to set the limit on time
  • use batchElements to set it on number of events
  • use batchBytes to set it on size
{
"connections": {
"azure": {
"kind": "azureBlobStorage",
"connectionConfigs": {
"connectionString": "xxxxxx"
},
"batchConfigs": {
"lingerMs": 2000,
"batchElements": 10000,
"batchBytes": 5242880
}
}
}
}

Instantly flush files

To disable file buffering, set any of lingerMs, batchElements, or batchBytes to 0. In other words, this will force each event to be immediately written to a file, with a new one rolled right after.

{
"connections": {
"azure": {
"kind": "azureBlobStorage",
"connectionConfigs": {
"connectionString": "xxxxxx"
},
"batchConfigs": {
"batchBytes": 0
}
}
}
}

Set blob content

Use container to set the container, containerConfigs to set the blob format, and data to set the content.

{
"generators": [
{
"container": "sandbox",
"containerConfigs": {
"keyPrefix": "foo-",
"format": "jsonl"
},
"data": {
"a": {
"_gen": "uuid"
},
"b": {
"_gen": "boolean"
}
},
"localConfigs": {
"throttleMs": 200
}
}
],
"connections": {
"azure": {
"kind": "azureBlobStorage",
"connectionConfigs": {
"connectionString": "xxxxxx"
}
}
}
}

Set the key and format

format can be any of json, jsonl, log, and parquet.

Additionally:

  • pretty set to true will cause json to pretty print.
  • explodeJsonlArrays set to true will cause jsonl arrays to span one element per line.
  • format set to log means that data must be a string.
  • compression can optionally be set to gzip.
{
"generators": [
{
"bucket": "sandbox",
"bucketConfigs": {
"keyPrefix": "foo-",
"format": "json",
"pretty": true,
"compression": "gzip"
},
"data": {
"a": {
"_gen": "boolean"
}
}
}
]
}

Writing to subdirectories

When a generator writes to an Azure container, by default it writes blobs to the specified key prefix. Sometimes, though, you may want a generator to write to multiple subdirectories on top of that prefix.

To do that, set subdir in the containerConfigs key. Presumably this will be a variable that changes over time.

In this example, 3 forks are launched which write the following blobs:

  • sandbox/a/foo-<n>.jsonl
  • sandbox/b/foo-<n>.jsonl
  • sandbox/c/foo-<n>.jsonl

Note that each unique value for subdir will get its own blob-roll milestones. In other words, subdirectory a will roll new blobs based on time/size/events at a rate independent from b and c.

{
"generators": [
{
"container": "sandbox",
"fork": {
"key": [
"a",
"b",
"c"
]
},
"containerConfigs": {
"subdir": {
"_gen": "var",
"var": "forkKey"
},
"keyPrefix": "foo-",
"format": "jsonl"
},
"data": {
"a": {
"_gen": "uuid"
},
"b": {
"_gen": "boolean"
}
},
"localConfigs": {
"throttleMs": 200
}
}
],
"connections": {
"azure": {
"kind": "azureBlobStorage",
"connectionConfigs": {
"connectionString": "xxxxxx"
}
}
}
}

Writing multiple blobs

Sometimes, you might want to write to multiple blobs on each generator iteration.

To do that, specify multiBlob: a map of string to container configuration overrides. You must specify each keyPrefix, and you can optionally specify individual subdir values.

When multiBlob is enabled, data must be a map who's keys match those in multiBlob. The values under each key are written according to the spec in multiBlob.

For example, the configuration below will write 10 blobs: 5 to /tmp/data/a/foo-*.json, and 5 to /tmp/data/b/bar-*.json.

{
"generators": [
{
"container": "sandbox",
"containerConfigs": {
"format": "json",
"multiBlob": {
"a": {
"keyPrefix": "foo-",
"subdir": "a"
},
"b": {
"keyPrefix": "bar-",
"subdir": "b"
}
}
},
"data": {
"a": {
"_gen": "repeatedly",
"n": 3,
"target": {
"_gen": "oneOf",
"choices": [
1,
2,
3
]
}
},
"b": {
"_gen": "repeatedly",
"n": 3,
"target": {
"_gen": "oneOf",
"choices": [
4,
5,
6
]
}
}
},
"localConfigs": {
"maxEvents": 5
}
}
],
"connections": {
"azure": {
"kind": "azureBlobStorage",
"connectionConfigs": {
"connectionString": "xxxxxx"
}
}
}
}

Adjusting concurrency

When ShadowTraffic writes to Azure Blob Storage, it does so asynchronously: it generates some events and uses a number of threads to write them in parallel.

This mostly works great, but sometimes you might want to adjust that degree of concurrency. This is especially helpful if you're generating very large objects.

You can change it by using the following two optional parameters under writerConfigs:

  • bufferDepth: the maximum number of events to buffer for parallel writers to consume from. By default, this number is 50.
  • concurrency: the number of parallel writers. By default, this number is the number of cores ShadowTraffic has access to.
{
"connections": {
"azure": {
"kind": "azureBlobStorage",
"connectionConfigs": {
"connectionString": "xxxxxx"
},
"writerConfigs": {
"bufferDepth": 16000,
"concurrency": 8
}
}
}
}

Specification

Connection JSON schema

{
"type": "object",
"properties": {
"kind": {
"type": "string",
"const": "azureBlobStorage"
},
"batchConfigs": {
"type": "object",
"properties": {
"lingerMs": {
"type": "integer",
"minimum": 0
},
"batchElements": {
"type": "integer",
"minimum": 0
},
"batchBytes": {
"type": "integer",
"minimum": 0
}
}
},
"writerConfigs": {
"type": "object",
"properties": {
"bufferDepth": {
"type": "integer",
"minimum": 1
},
"concurrency": {
"type": "integer",
"minimum": 1
}
}
},
"connectionConfigs": {
"type": "object",
"properties": {
"connectionString": {
"type": "string"
}
},
"required": [
"connectionString"
]
}
},
"required": [
"connectionConfigs"
]
}

Generator JSON schema

{
"type": "object",
"properties": {
"connection": {
"type": "string"
},
"name": {
"type": "string"
},
"container": {
"type": "string"
},
"data": {
"type": "object"
},
"localConfigs": {
"type": "object",
"properties": {
"throttleMs": {
"oneOf": [
{
"type": "number",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
},
"maxEvents": {
"oneOf": [
{
"type": "integer",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
},
"kafkaKeyProtobufHint": {
"type": "object",
"properties": {
"schemaFile": {
"type": "string"
},
"message": {
"type": "string"
}
},
"required": [
"schemaFile",
"message"
]
},
"jsonSchemaHint": {
"type": "object"
},
"maxBytes": {
"type": "integer",
"minimum": 1
},
"discard": {
"type": "object",
"properties": {
"rate": {
"type": "number",
"minimum": 0,
"maximum": 1
},
"retainHistory": {
"type": "boolean"
}
},
"required": [
"rate"
]
},
"repeat": {
"type": "object",
"properties": {
"rate": {
"type": "number",
"minimum": 0,
"maximum": 1
},
"times": {
"oneOf": [
{
"type": "integer",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
},
"required": [
"rate",
"times"
]
},
"protobufSchemaHint": {
"type": "object",
"patternProperties": {
"^.*$": {
"type": "object",
"properties": {
"schemaFile": {
"type": "string"
},
"message": {
"type": "string"
}
},
"required": [
"schemaFile",
"message"
]
}
}
},
"maxHistoryEvents": {
"type": "integer",
"minimum": 0
},
"maxMs": {
"type": "integer",
"minimum": 0
},
"time": {
"type": "integer"
},
"events": {
"type": "object",
"properties": {
"exactly": {
"oneOf": [
{
"type": "integer",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
}
},
"delay": {
"type": "object",
"properties": {
"rate": {
"type": "number",
"minimum": 0,
"maximum": 1
},
"ms": {
"oneOf": [
{
"type": "integer",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
},
"required": [
"rate",
"ms"
]
},
"history": {
"type": "object",
"properties": {
"events": {
"type": "object",
"properties": {
"max": {
"type": "integer",
"minimum": 0
}
}
}
}
},
"avroSchemaHint": {
"type": "object"
},
"throttle": {
"type": "object",
"properties": {
"ms": {
"oneOf": [
{
"type": "number",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
}
},
"throughput": {
"oneOf": [
{
"type": "integer",
"minimum": 1
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
},
"timeMultiplier": {
"oneOf": [
{
"type": "number"
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
},
"kafkaValueProtobufHint": {
"type": "object",
"properties": {
"schemaFile": {
"type": "string"
},
"message": {
"type": "string"
}
},
"required": [
"schemaFile",
"message"
]
}
}
},
"containerConfigs": {
"oneOf": [
{
"type": "object",
"properties": {
"keyPrefix": {
"oneOf": [
{
"type": "string"
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
},
"format": {
"type": "string",
"enum": [
"json",
"jsonl",
"parquet",
"log"
]
},
"pretty": {
"type": "boolean"
},
"compression": {
"type": "string",
"enum": [
"gzip"
]
},
"subdir": {
"oneOf": [
{
"type": "string"
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
},
"required": [
"keyPrefix",
"format"
]
},
{
"type": "object",
"properties": {
"format": {
"type": "string",
"enum": [
"json",
"jsonl",
"parquet",
"log"
]
},
"pretty": {
"type": "boolean"
},
"compression": {
"type": "string",
"enum": [
"gzip"
]
},
"subdir": {
"oneOf": [
{
"type": "string"
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
},
"multiBlob": {
"type": "object",
"additionalProperties": {
"type": "object",
"properties": {
"keyPrefix": {
"oneOf": [
{
"type": "string"
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
},
"subdir": {
"oneOf": [
{
"type": "string"
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
},
"required": [
"keyPrefix"
]
}
}
},
"required": [
"format",
"multiBlob"
]
}
]
}
},
"required": [
"container",
"data",
"containerConfigs"
]
}