Share via


Authentication

The Azure Databricks Kafka connector supports multiple authentication methods for connecting to Kafka. This article covers some of the most common authentication methods on Databricks. The full list of supported authentication methods can be found in the Kafka documentation.

Service principal authentication with Microsoft Entra ID and Azure Event Hubs

Azure Databricks supports the authentication of Spark jobs with Event Hubs services. This authentication is done via OAuth with Microsoft Entra ID.

AAD Authentication diagram

Connect with Unity Catalog service credentials

Since the release of Databricks Runtime 16.1, Azure Databricks supports Unity Catalog service credentials for authenticating access to AWS Managed Streaming for Azure Event Hubs. Databricks recommends this approach, particularly when running Kafka streaming on shared clusters or serverless compute.

To use an Unity Catalog service credential for authentication, perform the following steps:

  • Create a new Unity Catalog service credential. If you are not familiar with this process, see Create service credentials for instructions on creating one.
    • Ensure that the access connector attached to your service credential has the necessary permissions to connect to Azure Event Hubs.
  • Provide the name of your Unity Catalog service credential as a source option in your Kafka configuration. Set the option databricks.serviceCredential to the name of your service credential.

The following example configures Kafka as a source using a service credential:

Python

kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
  "subscribe": "<topic>",
  "databricks.serviceCredential": "<service-credential-name>",
  # Optional: set this only if Databricks can't infer the scope for your Kafka service.
  # "databricks.serviceCredential.scope": "https://<event-hubs-server>/.default",
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Scala

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
  "subscribe" -> "<topic>",
  "databricks.serviceCredential" -> "<service-credential-name>",
  // Optional: set this only if Databricks can't infer the scope for your Kafka service.
  // "databricks.serviceCredential.scope" -> "https://<event-hubs-server>/.default",
)

val df = spark.readStream.format("kafka").options(kafkaOptions).load()

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<bootstrap-hostname>:9092',
  subscribe => '<topic>',
  serviceCredential => '<service-credential-name>'
);

Note: When you use a Unity Catalog service credential to connect to Kafka, the following options are no longer needed:

  • kafka.sasl.mechanism
  • kafka.sasl.jaas.config
  • kafka.security.protocol
  • kafka.sasl.client.callback.handler.class
  • kafka.sasl.oauthbearer.token.endpoint.url

Connect with a client ID and secret

Azure Databricks supports Microsoft Entra ID authentication with a client ID and secret in the following compute environments:

  • Databricks Runtime 12.2 LTS and above on compute configured with dedicated access mode (formerly single user access mode).
  • Databricks Runtime 14.3 LTS and above on compute configured with standard access mode (formerly shared access mode).
  • Lakeflow Spark Declarative Pipelines configured without Unity Catalog.

Azure Databricks does not support Microsoft Entra ID authentication with a certificate in any compute environment, or in Lakeflow Spark Declarative Pipelines configured with Unity Catalog.

This authentication does not work on compute with standard access mode or on Unity Catalog Lakeflow Spark Declarative Pipelines.

To perform authentication with Microsoft Entra ID, you must have the following values:

  • A tenant ID. You can find this in the Microsoft Entra ID services tab.

  • A clientID (also known as Application ID).

  • A client secret. Once you have this, you should add it as a secret to your Databricks Workspace. To add this secret, see Secret management.

  • An EventHubs topic. You can find a list of topics in the Event Hubs section under the Entities section on a specific Event Hubs Namespace page. To work with multiple topics, you can set the IAM role at the Event Hubs level.

  • An EventHubs server. You can find this on the overview page of your specific Event Hubs namespace:

    Event Hubs namespace

Additionally, to use Entra ID, we need to tell Kafka to use the OAuth SASL mechanism (SASL is a generic protocol, and OAuth is a type of SASL “mechanism”):

  • kafka.security.protocol should be SASL_SSL
  • kafka.sasl.mechanism should be OAUTHBEARER
  • kafka.sasl.login.callback.handler.class should be a fully qualified name of the Java class with a value of kafkashaded to the login callback handler of our shaded Kafka class. See the following example for the exact class.

The following example configures Kafka to connect to Azure Event Hubs using Microsoft Entra ID authentication with a client ID and secret:

Python

# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")

event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------

sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'

kafka_options = {
    "kafka.bootstrap.servers": f"{event_hubs_server}:9093", # Port 9093 is the EventHubs Kafka port
    "kafka.sasl.jaas.config": sasl_config,
    "kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
    "subscribe": event_hubs_topic,

    # You should not need to modify these
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.mechanism": "OAUTHBEARER",
    "kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}

df = spark.readStream.format("kafka").options(**kafka_options)

display(df)

Scala

// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")

val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------

val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> s"$eventHubsServer:9093", // Port 9093 is the EventHubs Kafka port
  "kafka.sasl.jaas.config" -> saslConfig,
  "kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
  "subscribe" -> eventHubsTopic,

  // You should not need to modify these
  "kafka.security.protocol" -> "SASL_SSL",
  "kafka.sasl.mechanism" -> "OAUTHBEARER",
  "kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)

val scalaDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()

display(scalaDF)

SQL

CREATE OR REFRESH STREAMING TABLE <table_name>
AS
SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<event-hubs-server>:9093',
  subscribe => '<event-hubs-topic>',
  `kafka.security.protocol` => 'SASL_SSL',
  `kafka.sasl.mechanism` => 'OAUTHBEARER',
  `kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="<client-id>" clientSecret="<client-secret>" scope="https://<event-hubs-server>/.default" ssl.protocol="SSL";',
  `kafka.sasl.oauthbearer.token.endpoint.url` => 'https://login.microsoft.com/<tenant-id>/oauth2/v2.0/token',
  `kafka.sasl.login.callback.handler.class` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler'
);

Use SSL to connect Azure Databricks to Kafka

To enable SSL connections to Kafka, follow the instructions in the Confluent documentation Encryption and Authentication with SSL. You can provide the configurations described there, prefixed with kafka., as options. For example, the trust store location would be specified with the property kafka.ssl.truststore.location.

If you will be using SSL, Databricks recommends that you:

  • Store your certificates in a Unity Catalog volume. Users who have access to read from the volume will be able to use your Kafka certificates.
  • Store your certificate passwords as secrets in a secret scope.

The following example uses object storage locations and Databricks secrets to enable an SSL connection:

Python

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", ...)
  .option("kafka.security.protocol", "SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", ...)
  .option("kafka.security.protocol", "SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <truststore-password-key-name>))

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<bootstrap-server>:9092',
  subscribe => '<topic>',
  `kafka.security.protocol` => 'SSL',
  `kafka.ssl.truststore.location` => '<truststore-location>',
  `kafka.ssl.keystore.location` => '<keystore-location>',
  `kafka.ssl.keystore.password` => secret('<certificate-scope-name>', '<keystore-password-key-name>'),
  `kafka.ssl.truststore.password` => secret('<certificate-scope-name>', '<truststore-password-key-name>')
);

Connect Kafka on HDInsight to Azure Databricks

  1. Create an HDInsight Kafka cluster.

    See Connect to Kafka on HDInsight through an Azure Virtual Network for instructions.

  2. Configure the Kafka brokers to advertise the correct address.

    Follow the instructions in Configure Kafka for IP advertising. If you manage Kafka yourself on Azure Virtual Machines, make sure that the advertised.listeners configuration of the brokers is set to the internal IP of the hosts.

  3. Create an Azure Databricks cluster.

  4. Peer the Kafka cluster to the Azure Databricks cluster.

    Follow the instructions in Peer virtual networks.

Handling potential errors

  • Failed to create a new KafkaAdminClient

    This internal Kafka error is thrown if any of the following authentication options are incorrect:

    • Client ID (also known as Application ID)
    • Tenant ID
    • Event Hubs server

    To resolve the error, verify that the values are correct for these options. Additionally, you might see this error if you modify the configuration options provided by default in the example (such as kafka.security.protocol).

  • No records returned

    If you are trying to display or process your DataFrame but aren't getting results, you will see the following in the UI.

    No results message

    This message means that authentication was successful, but EventHubs didn't return any data. Some possible (though by no means exhaustive) reasons are:

    • You specified the wrong EventHubs topic.
    • The default Kafka configuration option for startingOffsets is latest, and you're not currently receiving any data through the topic yet. You can set startingOffsets to earliest to start reading data starting from Kafka's earliest offsets.