Skip to main content

Connections

databricks

Commentary

added in 1.17.0

Connects to a fully managed Databricks Delta Lake instance.

You can authenticate using either OAuth M2M 1 or Personal Access tokens 2.

A new transaction will be written to Databricks every 20 ms or 500 rows, whichever happens first. You can override row size/write timing with batchConfigs. 3

Automatic table creation

By default for convenience, any tables ShadowTraffic writes to will be automatically created. This makes it easier to iterate on your generators without flipping back and forth between ShadowTraffic and Databricks. 4

ShadowTraffic does this by scanning the structure of your generators and creating a suitable DDL, then executing it on your behalf.

If ShadowTraffic doesn't create the table exactly as you'd want it, you can override each column using the sqlHint function modifier 5, or the entire DDL with sqlDdlHint.

Manual table control

If you don't want ShadowTraffic to control your tables, you can turn this behavior off by setting tablePolicy to manual in the connection map. It'll then be up to you to make sure your tables exist before trying to write to them.


Examples

Connecting with M2M

Specify clientId and clientSecret to connect with Databricks OAuth M2M. ShadowTraffic will automatically refresh your token each time it expires.

{
"connections": {
"dbx": {
"kind": "databricks",
"connectionConfigs": {
"host": "https://xxx.cloud.databricks.com",
"clientId": "yyy",
"clientSecret": "zzz",
"warehouseId": "abc",
"catalog": "def",
"schema": "ghi"
}
}
}
}

Connecting with PAT

Alternatively, you can specify token in the configuration map to connect with a Databricks Personal Access Token (PAT).

{
"connections": {
"dbx": {
"kind": "databricks",
"connectionConfigs": {
"host": "https://xxx.cloud.databricks.com",
"token": "yyy",
"warehouseId": "abc",
"catalog": "def",
"schema": "ghi"
}
}
}
}

Setting the batch rate

Optionally, set batchConfigs to control how frequently transactions are written. In this example, a transaction is executed whenever 500 milliseconds pass or 5000 rows are accumulated—whichever comes first.

{
"connections": {
"dbx": {
"kind": "databricks",
"connectionConfigs": {
"host": "https://xxx.cloud.databricks.com",
"token": "yyy",
"warehouseId": "abc",
"catalog": "def",
"schema": "ghi"
},
"batchConfigs": {
"lingerMs": 500,
"batchElements": 5000
}
}
}
}

Setting statement timeouts

Set statementTimeout, in seconds, to specify how long any given SQL statement to Databricks may take before aborting.

{
"connections": {
"dbx": {
"kind": "databricks",
"connectionConfigs": {
"host": "https://xxx.cloud.databricks.com",
"token": "yyy",
"warehouseId": "abc",
"catalog": "def",
"schema": "ghi"
},
"statementTimeout": 20
}
}
}

Automatic table creation

By default, your tables don't need to be defined. ShadowTraffic will automatically create it for you (and clear out any existing data). You can explicitly configure this behavior by setting tablePolicy to dropAndCreate.

{
"generators": [
{
"table": "sandbox",
"row": {
"id": {
"_gen": "uuid"
},
"level": {
"_gen": "uniformDistribution",
"bounds": [
1,
10
]
},
"active": {
"_gen": "boolean"
}
}
}
],
"connections": {
"dbx": {
"kind": "databricks",
"connectionConfigs": {
"host": "https://xxx.cloud.databricks.com",
"token": "yyy",
"warehouseId": "abc",
"catalog": "def",
"schema": "ghi"
},
"tablePolicy": "dropAndCreate"
}
}
}

Overriding column types

Use the optional sqlHint function modifier on any generator to override how its column is defined.

{
"generators": [
{
"table": "sandbox",
"row": {
"id": {
"_gen": "uuid"
},
"level": {
"_gen": "uniformDistribution",
"bounds": [
1,
10
],
"sqlHint": "INTEGER"
},
"active": {
"_gen": "boolean"
}
}
}
],
"connections": {
"dbx": {
"kind": "databricks",
"connectionConfigs": {
"host": "https://xxx.cloud.databricks.com",
"token": "yyy",
"warehouseId": "abc",
"catalog": "def",
"schema": "ghi"
}
}
}
}

Overwriting existing tables

Instead of the default dropAndCreate table policy, you can instead use overwrite mode. overwrite will perform an initial zero-row overwrite insertion to clear out your table contents while leaving its metadata intact, and subsequently perform regular insertions.

{
"generators": [
{
"table": "sandbox",
"row": {
"name": {
"_gen": "string",
"expr": "#{Name.fullName}"
},
"age": {
"_gen": "normalDistribution",
"mean": 40,
"sd": 10,
"decimals": 0
}
}
}
],
"connections": {
"dbx": {
"kind": "databricks",
"connectionConfigs": {
"host": "https://xxx.cloud.databricks.com",
"token": "yyy",
"warehouseId": "abc",
"catalog": "def",
"schema": "ghi"
},
"tablePolicy": "overwrite"
}
}
}

Evolving schemas

If you want to operate on an existing table and dynamically change its columns, set evolveSchema to true. ShadowTraffic will attempt to alter your table and add any newly specified columns before appending rows.

{
"generators": [
{
"table": "sandbox",
"evolveSchema": true,
"row": {
"id": {
"_gen": "uuid"
},
"siteZone": {
"_gen": "oneOf",
"choices": [
"A",
"B",
"C"
]
},
"tier": {
"_gen": "oneOf",
"choices": [
"gold",
"silver",
"bronze"
]
}
}
}
],
"connections": {
"dbx": {
"kind": "databricks",
"connectionConfigs": {
"host": "https://xxx.cloud.databricks.com",
"token": "yyy",
"warehouseId": "abc",
"catalog": "def",
"schema": "ghi"
},
"tablePolicy": "overwrite"
}
}
}

Setting table tags

Set tags to supply a key/value map of tag metadata on the table.

{
"generators": [
{
"table": "sandbox",
"tags": {
"key-1": "value-1",
"key-2": "value-2"
},
"row": {
"id": {
"_gen": "uuid"
},
"level": {
"_gen": "uniformDistribution",
"bounds": [
1,
10
]
},
"active": {
"_gen": "boolean"
}
}
}
],
"connections": {
"dbx": {
"kind": "databricks",
"connectionConfigs": {
"host": "https://xxx.cloud.databricks.com",
"token": "yyy",
"warehouseId": "abc",
"catalog": "def",
"schema": "ghi"
}
}
}
}

Specification

Connection JSON schema

{
"type": "object",
"properties": {
"kind": {
"type": "string",
"const": "databricks"
},
"connectionConfigs": {
"type": "object",
"properties": {
"host": {
"type": "string"
},
"token": {
"type": "string"
},
"clientId": {
"type": "string"
},
"clientSecret": {
"type": "string"
},
"warehouseId": {
"type": "string"
},
"catalog": {
"type": "string"
},
"schema": {
"type": "string"
}
},
"required": [
"host",
"warehouseId",
"catalog",
"schema"
]
},
"tablePolicy": {
"type": "string",
"enum": [
"manual",
"dropAndCreate",
"overwrite"
]
},
"statementTimeout": {
"type": "integer",
"minimum": 0
},
"batchConfigs": {
"type": "object",
"properties": {
"lingerMs": {
"type": "integer",
"minimum": 0
},
"batchElements": {
"type": "integer",
"minimum": 0
}
}
}
},
"required": [
"connectionConfigs"
]
}

Generator JSON schema

{
"type": "object",
"properties": {
"connection": {
"type": "string"
},
"table": {
"type": "string"
},
"row": {
"type": "object"
},
"tags": {
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"evolveSchema": {
"type": "boolean"
},
"localConfigs": {
"type": "object",
"properties": {
"throttleMs": {
"oneOf": [
{
"type": "number",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
},
"maxEvents": {
"oneOf": [
{
"type": "integer",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
},
"kafkaKeyProtobufHint": {
"type": "object",
"properties": {
"schemaFile": {
"type": "string"
},
"message": {
"type": "string"
}
},
"required": [
"schemaFile",
"message"
]
},
"jsonSchemaHint": {
"type": "object"
},
"maxBytes": {
"type": "integer",
"minimum": 1
},
"discard": {
"type": "object",
"properties": {
"rate": {
"type": "number",
"minimum": 0,
"maximum": 1
},
"retainHistory": {
"type": "boolean"
}
},
"required": [
"rate"
]
},
"repeat": {
"type": "object",
"properties": {
"rate": {
"type": "number",
"minimum": 0,
"maximum": 1
},
"times": {
"oneOf": [
{
"type": "integer",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
},
"required": [
"rate",
"times"
]
},
"sqlDdlHint": {
"type": "string"
},
"protobufSchemaHint": {
"type": "object",
"patternProperties": {
"^.*$": {
"type": "object",
"properties": {
"schemaFile": {
"type": "string"
},
"message": {
"type": "string"
}
},
"required": [
"schemaFile",
"message"
]
}
}
},
"schemaRegistrySubject": {
"type": "object"
},
"maxHistoryEvents": {
"type": "integer",
"minimum": 0
},
"maxMs": {
"type": "integer",
"minimum": 0
},
"time": {
"type": "integer"
},
"events": {
"type": "object",
"properties": {
"exactly": {
"oneOf": [
{
"type": "integer",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
}
},
"delay": {
"type": "object",
"properties": {
"rate": {
"type": "number",
"minimum": 0,
"maximum": 1
},
"ms": {
"oneOf": [
{
"type": "integer",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
},
"required": [
"rate",
"ms"
]
},
"history": {
"type": "object",
"properties": {
"events": {
"type": "object",
"properties": {
"max": {
"type": "integer",
"minimum": 0
}
}
}
}
},
"avroSchemaHint": {
"type": "object"
},
"throttle": {
"type": "object",
"properties": {
"ms": {
"oneOf": [
{
"type": "number",
"minimum": 0
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
}
}
},
"throughput": {
"oneOf": [
{
"type": "integer",
"minimum": 1
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
},
"timeMultiplier": {
"oneOf": [
{
"type": "number"
},
{
"type": "object",
"properties": {
"_gen": {
"type": "string"
}
},
"required": [
"_gen"
]
}
]
},
"kafkaValueProtobufHint": {
"type": "object",
"properties": {
"schemaFile": {
"type": "string"
},
"message": {
"type": "string"
}
},
"required": [
"schemaFile",
"message"
]
}
}
}
},
"required": [
"table",
"row"
]
}