Connect to Lakebase

Important

This feature is in Public Preview.

Use Structured Streaming to write to Lakebase with built-in batching, automatic retries, and workspace-managed authentication.

When to use the Lakebase sink

Use the Lakebase sink for low-latency streaming writes to Lakebase. This sink doesn't require you to implement custom foreachBatch functions to handle batching, connection management, and error handling.

Common use cases include:

  • Update application databases in real-time for operational dashboards or customer-facing features.
  • Sync continuously changing data, such as aggregated or filtered streaming results, into a transactional database.
  • Write the output of a Structured Streaming query into a Lakebase table with sub-second latency using real-time mode.

To sync data from Lakebase to Delta tables in the Lakehouse, the reverse direction, see Lakebase Change Data Feed.

Requirements

  • Databricks Runtime 18.3 or above
  • Classic compute with dedicated or standard access modes.
  • A Lakebase database

Connect to a database

The Lakebase sink supports the following connection methods:

Lakebase tables registered with Unity Catalog

For Lakebase tables registered with Unity Catalog, the connector automatically manages the credentials and uses the identity of the user or service principal running the query. If the table doesn't exist, the connector creates the table.

To register a Lakebase database with Unity Catalog, see Register a Lakebase database in Unity Catalog.

To write to a Lakebase table, use the upsertkey option and the .toTable() method with a fully-qualified table name, catalog.schema.table:

Python

(df.writeStream
  .format("postgresql")
  .outputMode("update")
  .option("upsertkey", "<primary-key-column>")
  .option("checkpointLocation", "/checkpoints/<query-name>")
  .toTable("<catalog>.<schema>.<table>")
)

Scala

df.writeStream
  .format("postgresql")
  .outputMode("update")
  .option("upsertkey", "<primary-key-column>")
  .option("checkpointLocation", "/checkpoints/<query-name>")
  .toTable("<catalog>.<schema>.<table>")

Lakebase tables not registered with Unity Catalog

For Lakebase tables not registered with Unity Catalog, the connector automatically manages the credentials and uses the identity of the user or service principal running the query.

To write to a Lakebase table, use the endpoint, database, dbtable, and upsertkey options:

Python

(df.writeStream
  .format("postgresql")
  .outputMode("update")
  .option("endpoint", "<project-id>.<branch-id>.<endpoint-id>")
  .option("database", "<database>")
  .option("dbtable", "<schema>.<table>")
  .option("upsertkey", "<primary-key-column>")
  .option("checkpointLocation", "/checkpoints/<query-name>")
  .start()
)

Scala

df.writeStream
  .format("postgresql")
  .outputMode("update")
  .option("endpoint", "<project-id>.<branch-id>.<endpoint-id>")
  .option("database", "<database>")
  .option("dbtable", "<schema>.<table>")
  .option("upsertkey", "<primary-key-column>")
  .option("checkpointLocation", "/checkpoints/<query-name>")
  .start()

Configuration options

The sink raises an error for unrecognized options, JDBC_STREAMING_SINK_INVALID_OPTIONS.

The following options apply to all connection methods:

Key Default Description
batchinterval 100 milliseconds The maximum time to hold rows in the buffer before flushing. For example, "50 milliseconds".
batchsize 1000 The maximum number of rows for each database transaction.
checkpointLocation None Required. Path to the checkpoint directory.
upsertkey None A comma-separated list of column names that form the upsert key. For example, "id" or "user_id,event_type". The target table must have a PRIMARY KEY constraint on the specified columns. If you don't specify an upsert key, the sink infers the primary key from the target table schema. If no primary key exists, the query inserts the row, instead of updating.

Lakebase tables not registered with Unity Catalog

The following options apply when you connect to a Lakebase table not registered with Unity Catalog:

Key Default Description
database None The target PostgreSQL database name.
dbtable None The target table name in schema.table format. If you don't specify a schema, the default schema value is public.
endpoint None Specify the Lakebase endpoint in project_id.branch_id.endpoint_id format.

Upsert behavior

When upsert keys exist, either specified with upsertkey or inferred by the sink from the table's primary keys, the sink upserts into the table with PostgreSQL's INSERT INTO ... ON CONFLICT (<upsert_key>) DO UPDATE SET ... syntax.

When no upsert keys exist, the sink performs inserts. A query's output mode has no effect on the upsert or insert behavior.

The upsertkey columns must:

  • Be a non-empty subset of the DataFrame columns.
  • Reference a target table column with a PRIMARY KEY constraint.
  • Be comparable types, such as numeric or string types. To prevent database deadlocks during concurrent writes, the sink sorts rows by upsert key within each batch. Upsert keys do not support complex or struct types.

Column names are automatically quoted with the PostgreSQL default, double quotes ", which handles reserved keywords, mixed-case names, and special characters.

The sink does not quote table names and passes them as-is to the database. You must quote table names with special characters, such as "my-schema"."my-table".

Performance Tuning

Batching and backpressure

A flush is triggered when either condition is met:

  • The buffer reaches batchsize rows, which defaults to 1000.
  • The buffer age exceeds batchinterval, which defaults to 100 milliseconds.

When the database cannot keep up with the incoming data rate, the sink propagates backpressure upstream to the source.

Latency and throughput guidance:

  • For low-latency workloads with real-time mode, decrease batchinterval to guarantee a shorter maximum time before flushing. See Real-time mode in Structured Streaming.
  • For high-throughput workloads, increase batchsize to reduce overhead for each transaction.

Connection behavior

The sink uses connection pooling on executors. By default, each task uses one database connection.

Databricks recommends that you use the default value of 1 task for each connection. If you increase the number of tasks for each connection, you might cause connection contentions and increase latencies for high throughput connections.

To configure the ratio of tasks to connections, set the spark.databricks.sql.streaming.jdbc.tasksPerConnection Spark configuration. If the target database has a low connection limit, reduce the number of shuffle partitions or increase spark.databricks.sql.streaming.jdbc.tasksPerConnection.

The sink automatically retries transient JDBC errors, including connection failures, deadlocks, and rate limiting. If the sink exhausts all retries, the query fails.

Supported triggers and output modes

Triggers

This table shows support for Structured Streaming trigger types:

Trigger Supported
realTime Yes
ProcessingTime Yes
AvailableNow Yes
Once Yes

Output modes

This table shows support for Structured Streaming output modes:

Output mode Supported
update Yes
append Yes. Behavior is identical to update. The query upserts when the target table has a primary key, otherwise the query inserts. See Upsert behavior.
complete No

Limitations

  • Serverless compute and Lakeflow Spark Declarative Pipelines are not supported.
  • Only Lakebase is supported as a write target. External PostgreSQL-compatible databases are not supported.