Options

This page describes configuration options for reading from and writing to Apache Kafka using Structured Streaming on Azure Databricks.

The Azure Databricks Kafka connector is built on top of the Apache Spark Kafka connector and supports all standard Kafka configuration options. Any option prefixed with kafka. is passed through directly to the underlying Kafka client. For example, .option("kafka.max.poll.records", "500") sets the Kafka consumer's max.poll.records property. See the Kafka configuration documentation for the full list of available Kafka properties.

For a complete list of Structured Streaming source and sink options, see Kafka and the Structured Streaming + Kafka Integration Guide.

Required options

For details on required options, see Kafka.

The following option is required for both reading and writing:

Key Description
kafka.bootstrap.servers A comma-separated list of host:port addresses for Kafka brokers. Sets the Kafka client's bootstrap.servers property.
If you find there is no data from Kafka, check this broker address list for incorrect addresses. If the broker address list is incorrect, there might not be any errors. Kafka clients assume the brokers will be available eventually and retry forever when they receive network errors.

For Kafka reads, you must also specify exactly one of the following options to identify which topics to consume:

  • subscribe
  • subscribePattern
  • assign

When writing to Kafka, you can optionally set the topic option to specify a destination topic for all rows. If not set, the DataFrame must include a topic column.

Common reader options

The following options are commonly used when reading from Kafka:

Key Description
minPartitions The minimum number of partitions to read from Kafka.
maxRecordsPerPartition The maximum number of records per Spark partition.
failOnDataLoss Whether to fail the query when it's possible that data was lost.
maxOffsetsPerTrigger The maximum number of offsets processed per trigger interval.
startingOffsets The offset that the query begins the read from.
endingOffsets Where to stop reading for batch queries.
groupIdPrefix The customized prefix for the auto-generated consumer group ID.
kafka.group.id The group ID to use while reading from Kafka.
Use this with caution because it can cause unexpected behavior. By default, each query generates a unique group ID for reading data. This ensures that each query has its own consumer group that avoids interference from other consumers, and allows each query to read all of the partitions of its subscribed topics. In some scenarios, such as Kafka group-based authorization, you can use specific authorized group IDs to read data.
Queries with the same group ID might interfere with each other and only read partial data. Interference might occur when you run concurrent batch and streaming workloads, or when you start and restart queries in quick succession.
To minimize issues, set the Kafka consumer configuration session.timeout.ms to be very small.
includeHeaders Whether to include Kafka message headers in the output.
bytesEstimateWindowLength The time window used to estimate remaining bytes via the estimatedTotalBytesBehindLatest metric.

For a complete list of Structured Streaming source and sink options, see Kafka and the Structured Streaming + Kafka Integration Guide.

Common writer options

The following options are commonly used when writing to Kafka:

Key Description
topic Sets the topic for all rows. This takes precedence over any topic column in the data.
includeHeaders Whether to include Kafka headers in the row.

Important

Databricks Runtime 13.3 LTS and above includes a newer version of the kafka-clients library that enables idempotent writes by default. If your Kafka sink uses version 2.8.0 or below with ACLs configured but without IDEMPOTENT_WRITE enabled, writes will fail. Resolve this by upgrading to Kafka 2.8.0 or above, or by setting .option("kafka.enable.idempotence", "false").

For a complete list of Structured Streaming source and sink options, see Kafka and the Structured Streaming + Kafka Integration Guide.

Authentication options

Azure Databricks supports multiple authentication methods for Kafka, including Unity Catalog service credentials, SASL/SSL, and cloud-specific options for AWS MSK, Azure Event Hubs, and Google Cloud Managed Kafka.

Azure Databricks recommends that you use Unity Catalog service credentials for authentication to cloud-managed Kafka services:

Option Description
databricks.serviceCredential The name of a Unity Catalog service credential for authenticating to cloud-managed Kafka services (AWS MSK, Azure Event Hubs, or Google Cloud Managed Kafka). Available in Databricks Runtime 16.1 and above.
databricks.serviceCredential.scope The OAuth scope for the service credential. Only set this if Azure Databricks cannot automatically infer the scope for your Kafka service.

When you use a Unity Catalog service credential, you do not need to specify SASL/SSL options like kafka.sasl.mechanism, kafka.sasl.jaas.config, or kafka.security.protocol.

Common SASL/SSL options include:

Option Description
kafka.security.protocol The protocol used to communicate with brokers (for example, SASL_SSL, SSL, PLAINTEXT).
kafka.sasl.mechanism The SASL mechanism (for example, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER, AWS_MSK_IAM).
kafka.sasl.jaas.config The JAAS login configuration string.
kafka.sasl.login.callback.handler.class The fully qualified class name of a login callback handler for SASL authentication.
kafka.sasl.client.callback.handler.class The fully qualified class name of a client callback handler for SASL authentication.
kafka.ssl.truststore.location The location of the SSL trust store file.
kafka.ssl.truststore.password The password for the SSL trust store file.
kafka.ssl.keystore.location The location of the SSL key store file.
kafka.ssl.keystore.password The password for the SSL key store file.

For complete authentication setup instructions, see Authentication.

Additional resources