Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
El conector de Kafka de Azure Databricks admite varios métodos de autenticación para conectarse a Kafka. En este artículo se describen algunos de los métodos de autenticación más comunes en Databricks. La lista completa de métodos de autenticación admitidos se puede encontrar en la documentación de Kafka.
Conexión a Azure Event Hubs con una entidad de servicio
Azure Databricks admite la autenticación de trabajos de Spark con servicios de Event Hubs. Esta autenticación se realiza mediante OAuth con Microsoft Entra ID.
Conéctate con las credenciales del servicio del catálogo de Unity
Desde el lanzamiento de Databricks Runtime 16.1, Azure Databricks admite credenciales de servicio de Catálogo de Unity para autenticarse en Azure Event Hubs. Databricks recomienda este enfoque, especialmente cuando se ejecuta el streaming de Kafka en clústeres compartidos o proceso sin servidor.
Para usar una credencial de servicio del catálogo de Unity para la autenticación, realice los pasos siguientes:
- Cree una nueva credencial de servicio de "Unity Catalog". Si no está familiarizado con este proceso, consulte Creación de credenciales de servicio para obtener instrucciones sobre cómo crear una.
- Asegúrese de que el conector de acceso asociado a la credencial de servicio tiene los permisos necesarios para conectarse a Azure Event Hubs.
- Proporcione el nombre de la credencial del servicio catálogo de Unity como opción de origen en la configuración de Kafka. Establezca la opción
databricks.serviceCredentialen el nombre de la credencial de servicio.
En el ejemplo siguiente se configura Kafka como origen mediante una credencial de servicio:
Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"databricks.serviceCredential": "<service-credential-name>",
# Optional: set this only if Databricks can't infer the scope for your Kafka service.
# "databricks.serviceCredential.scope": "https://<event-hubs-server>/.default",
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
Scala
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
"subscribe" -> "<topic>",
"databricks.serviceCredential" -> "<service-credential-name>",
// Optional: set this only if Databricks can't infer the scope for your Kafka service.
// "databricks.serviceCredential.scope" -> "https://<event-hubs-server>/.default",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SQL
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-hostname>:9092',
subscribe => '<topic>',
serviceCredential => '<service-credential-name>'
);
Nota: Cuando se usa una credencial de servicio de catálogo de Unity para conectarse a Kafka, ya no se necesitan las siguientes opciones:
kafka.sasl.mechanismkafka.sasl.jaas.configkafka.security.protocolkafka.sasl.client.callback.handler.classkafka.sasl.oauthbearer.token.endpoint.url
Conexión con un identificador de cliente y un secreto
Azure Databricks admite la autenticación de Microsoft Entra ID con un identificador de cliente y un secreto en los siguientes entornos de proceso:
- Databricks Runtime 12.2 LTS y versiones posteriores en recursos de cómputo configurados con el modo de acceso dedicado (anteriormente modo de acceso exclusivo para un solo usuario).
- Databricks Runtime 14.3 LTS y versiones posteriores en computadoras configuradas con el modo de acceso estándar (anteriormente modo de acceso compartido).
- Canalizaciones declarativas de Spark de Lakeflow configuradas sin Unity Catalog.
Azure Databricks no admite la autenticación de ID de Microsoft Entra con un certificado en ningún entorno de computación ni en las canalizaciones declarativas de Spark de Lakeflow configuradas con Unity Catalog.
Esta autenticación no funciona en la informática con modo de acceso estándar ni en las canalizaciones declarativas de Spark de Unity Catalog Lakeflow.
Para realizar la autenticación con el identificador de Microsoft Entra, debe tener los siguientes valores:
Un identificador de inquilino. Puede encontrarlo en la pestaña Servicios de Microsoft Entra ID.
Un clientID (también conocido como Application ID).
Un secreto de cliente. Una vez que lo tenga, debe agregarlo como secreto al área de trabajo de Databricks. Para agregar este secreto, consulte Administración de secretos.
Un tema de EventHubs. Puede encontrar una lista de temas en la sección Event Hubs en la sección Entidades de una página específica de espacio de nombres de Event Hubs. Para trabajar con varios temas, puede establecer el rol IAM en el nivel de Event Hubs.
Un servidor de EventHubs. Puede encontrarlo en la página de información general del espacio de nombres de Event Hubs específico:
Además, para usar Entra ID, es necesario indicar a Kafka que use el mecanismo SASL de OAuth (SASL es un protocolo genérico y OAuth es un tipo de "mecanismo" de SASL):
-
kafka.security.protocoldebe serSASL_SSL -
kafka.sasl.mechanismdebe serOAUTHBEARER -
kafka.sasl.login.callback.handler.classdebe ser un nombre completo de la clase Java con un valor dekafkashadedpara el controlador de devolución de llamada de autorización de nuestra clase Kafka sombreada. Consulte el siguiente ejemplo para ver la clase exacta.
En el ejemplo siguiente se configura Kafka para conectarse a Azure Event Hubs mediante Microsoft Entra ID con autenticación de un identificador de cliente y un secreto:
Python
# This is the only section you need to modify for auth purposes
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")
event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------
sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'
kafka_options = {
"kafka.bootstrap.servers": f"{event_hubs_server}:9093", # Port 9093 is the EventHubs Kafka port
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,
# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}
df = spark.readStream.format("kafka").options(**kafka_options)
display(df)
Scala
// This is the only section you need to modify for auth purposes
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")
val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------
val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093", // Port 9093 is the EventHubs Kafka port
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,
// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)
val scalaDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
display(scalaDF)
SQL
CREATE OR REFRESH STREAMING TABLE <table_name>
AS
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<event-hubs-server>:9093',
subscribe => '<event-hubs-topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'OAUTHBEARER',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="<client-id>" clientSecret="<client-secret>" scope="https://<event-hubs-server>/.default" ssl.protocol="SSL";',
`kafka.sasl.oauthbearer.token.endpoint.url` => 'https://login.microsoft.com/<tenant-id>/oauth2/v2.0/token',
`kafka.sasl.login.callback.handler.class` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler'
);
Uso de SASL/PLAIN para autenticarse
Para conectarse a Kafka mediante la autenticación SASL/PLAIN (nombre de usuario y contraseña), configure las siguientes opciones. Use el nombre de clase sombreado PlainLoginModule :
Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "PLAIN",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";',
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
Scala
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
"subscribe" -> "<topic>",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "PLAIN",
"kafka.sasl.jaas.config" ->
"""kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";""",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SQL
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'PLAIN',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";'
);
Azure Databricks recomienda almacenar la contraseña como un secreto en lugar de incluirla directamente en el código. Para obtener más información, consulte Gestión de secretos.
Uso de SASL/SCRAM para autenticarse
Para conectarse a Kafka mediante SASL/SCRAM (SCRAM-SHA-256 o SCRAM-SHA-512), configure las siguientes opciones. Use el nombre de clase sombreado ScramLoginModule :
Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "SCRAM-SHA-512",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";',
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
Scala
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
"subscribe" -> "<topic>",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "SCRAM-SHA-512",
"kafka.sasl.jaas.config" ->
"""kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";""",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SQL
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'SCRAM-SHA-512',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";'
);
Nota:
Reemplace por SCRAM-SHA-512SCRAM-SHA-256 si el clúster de Kafka está configurado para usar SCRAM-SHA-256.
Azure Databricks recomienda almacenar la contraseña como un secreto en lugar de incluirla directamente en el código. Para obtener más información, consulte Gestión de secretos.
Uso de SSL para conectar Azure Databricks a Kafka
Para habilitar las conexiones SSL/TLS a Kafka, establezca kafka.security.protocol en SSL y proporcione las opciones de configuración del almacén de confianza y del almacén de claves, con el prefijo kafka.. Para las conexiones SSL que solo requieren autenticación de servidor (TLS unidireccional), necesita un almacén de confianza. Para mTLS (mutual TLS), donde el agente de Kafka también autentica al cliente, necesita un almacén de certificados y un almacén de claves.
Están disponibles las siguientes opciones de SSL/TLS. Para obtener la lista completa de propiedades SSL, consulte la documentación de configuración de SSL de Apache Kafka y cifrado y autenticación con SSL en la documentación de Confluent.
| Opción | Descripción |
|---|---|
kafka.security.protocol |
Establézcalo en SSL para habilitar el cifrado TLS. |
kafka.ssl.truststore.location |
Ruta de acceso al archivo del almacén de confianza que contiene certificados de CA confiables. |
kafka.ssl.truststore.password |
Contraseña del archivo de almacén de confianza. |
kafka.ssl.truststore.type |
Formato de archivo del almacén de confianza (valor predeterminado: JKS). |
kafka.ssl.keystore.location |
Ruta de acceso al archivo de almacén de claves que contiene el certificado de cliente y la clave privada (necesaria para mTLS). |
kafka.ssl.keystore.password |
Contraseña del archivo de almacén de claves. |
kafka.ssl.key.password |
Contraseña de la clave privada en el almacén de llaves. |
kafka.ssl.endpoint.identification.algorithm |
Algoritmo de verificación de nombre de host. Tiene como valor predeterminado https. Para deshabilitar, definir como una cadena vacía. |
Si usa SSL, Databricks recomienda lo siguiente:
- Almacene los certificados en un volumen de Catálogo de Unity. Los usuarios que tienen acceso para leer desde el volumen pueden usar los certificados de Kafka. Para obtener más información, vea ¿Qué son los volúmenes del Unity Catalog?.
- Almacene las contraseñas de certificado como secretos en un ámbito secreto. Para obtener más información, consulte Administración de ámbitos secretos.
En el ejemplo siguiente se usan ubicaciones de almacenamiento de objetos y secretos de Databricks para habilitar una conexión SSL:
Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
Scala
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <truststore-password-key-name>))
SQL
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SSL',
`kafka.ssl.truststore.location` => '<truststore-location>',
`kafka.ssl.keystore.location` => '<keystore-location>',
`kafka.ssl.keystore.password` => secret('<certificate-scope-name>', '<keystore-password-key-name>'),
`kafka.ssl.truststore.password` => secret('<certificate-scope-name>', '<truststore-password-key-name>')
);
Conexión de Kafka en HDInsight a Azure Databricks
Cree un clúster de Kafka de HDInsight.
Consulte Conexión con Apache Kafka en HDInsight mediante una instancia de Azure Virtual Network para obtener instrucciones.
Configure los agentes de Kafka para que anuncien la dirección correcta.
Siga las instrucciones que se indican en Configuración de Kafka para anunciar direcciones IP. Si administra Kafka usted mismo en Azure Virtual Machines, asegúrese de que la configuración
advertised.listenersde los agentes esté establecida en la dirección IP interna de los hosts.Cree un clúster de Azure Databricks.
Empareje el clúster de Kafka con el clúster de Azure Databricks.
Siga las instrucciones que se indican en Emparejamiento de redes virtuales.
Uso de nombres de clase de Kafka sombreados de Databricks
Azure Databricks agrupa versiones propietarias y sombreadas de las bibliotecas cliente de Kafka. Todos los nombres de clase de cliente de Kafka a los que se hace referencia en las opciones de configuración de autenticación deben usar el prefijo de nombre de clase sombreado en lugar del nombre de clase de código abierto estándar. Esto se aplica a cualquier clase a la que se hace referencia en opciones como kafka.sasl.jaas.config, kafka.sasl.login.callback.handler.classy kafka.sasl.client.callback.handler.class.
Usar nombres de clase no sombreados da como resultado un error RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCED. Consulte las preguntas más frecuentes para obtener más información.
Control de posibles errores
No se pudo crear un nuevo
KafkaAdminClientEste error interno de Kafka se produce si alguna de las siguientes opciones de autenticación es incorrecta:
- Id. de cliente (también conocido como id. de aplicación)
- Id. de inquilino
- Servidor de Event Hubs
Para resolver el error, compruebe que los valores son correctos para estas opciones. Además, es posible que vea este error si modifica las opciones de configuración proporcionadas de forma predeterminada en el ejemplo (por ejemplo
kafka.security.protocol, ).No se devuelven registros
Si está intentando mostrar o procesar el DataFrame, pero no obtiene resultados, verá lo siguiente en la interfaz de usuario.
Este mensaje significa que la autenticación se realizó correctamente, pero EventHubs no devolvió ningún dato. Algunas razones posibles (aunque no exhaustivas) son:
- Ha especificado el tema de EventHubs incorrecto.
- La opción de configuración predeterminada de Kafka para
startingOffsetseslatesty, actualmente, no se recibe ningún dato a su través. Se puede configurarstartingOffsetsenearliestpara empezar a leer datos a partir de los offsets más antiguos de Kafka.