Partage via


Authentification

Le connecteur Azure Databricks Kafka prend en charge plusieurs méthodes d’authentification pour la connexion à Kafka. Cet article décrit certaines des méthodes d’authentification les plus courantes sur Databricks. Vous trouverez la liste complète des méthodes d’authentification prises en charge dans la documentation Kafka.

Authentification du principal de service avec l’ID Microsoft Entra et Azure Event Hubs

Azure Databricks prend en charge l’authentification des travaux Spark avec les services Event Hubs. Cette authentification se fait par le biais d’OAuth avec Microsoft Entra ID.

Diagramme d’authentification AAD

Se connecter avec les informations d’identification du service catalogue Unity

Depuis la publication de Databricks Runtime 16.1, Azure Databricks prend en charge les informations d’identification du service Unity Catalog pour l’authentification de l’accès à AWS Managed Streaming pour Azure Event Hubs. Databricks recommande cette approche, en particulier lors de l’exécution du streaming Kafka sur des clusters partagés ou un calcul serverless.

Pour utiliser les informations d’identification d’un service catalogue Unity pour l’authentification, procédez comme suit :

  • Créez un identifiant de service pour Unity Catalog. Si vous n’êtes pas familiarisé avec ce processus, consultez Créer des informations d’identification de service pour obtenir des instructions sur la création d’un processus.
    • Vérifiez que le connecteur d’accès attaché à vos informations d’identification de service dispose des autorisations nécessaires pour se connecter à Azure Event Hubs.
  • Indiquez le nom de vos informations d’identification du service catalogue Unity en tant qu’option source dans votre configuration Kafka. Définissez l’option databricks.serviceCredential sur le nom de votre identifiant de service.

L’exemple suivant configure Kafka en tant que source à l’aide d’informations d’identification de service :

Python

kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
  "subscribe": "<topic>",
  "databricks.serviceCredential": "<service-credential-name>",
  # Optional: set this only if Databricks can't infer the scope for your Kafka service.
  # "databricks.serviceCredential.scope": "https://<event-hubs-server>/.default",
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Scala

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
  "subscribe" -> "<topic>",
  "databricks.serviceCredential" -> "<service-credential-name>",
  // Optional: set this only if Databricks can't infer the scope for your Kafka service.
  // "databricks.serviceCredential.scope" -> "https://<event-hubs-server>/.default",
)

val df = spark.readStream.format("kafka").options(kafkaOptions).load()

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<bootstrap-hostname>:9092',
  subscribe => '<topic>',
  serviceCredential => '<service-credential-name>'
);

Remarque : Lorsque vous utilisez des informations d’identification du service catalogue Unity pour vous connecter à Kafka, les options suivantes ne sont plus nécessaires :

  • kafka.sasl.mechanism
  • kafka.sasl.jaas.config
  • kafka.security.protocol
  • kafka.sasl.client.callback.handler.class
  • kafka.sasl.oauthbearer.token.endpoint.url

Se connecter avec un ID client et un secret

Azure Databricks prend en charge l’authentification Microsoft Entra ID avec un ID client et une clé secrète client dans les environnements Compute suivants :

  • Databricks Runtime 12.2 LTS et versions ultérieures sur le calcul configuré avec le mode d’accès dédié (anciennement mode d’accès utilisateur unique).
  • Databricks Runtime 14.3 LTS et versions ultérieures sur le calcul configuré avec le mode d’accès standard (anciennement mode d’accès partagé).
  • Pipelines déclaratifs Spark Lakeflow configurés sans catalogue Unity.

Azure Databricks ne prend pas en charge l’authentification Microsoft Entra ID avec un certificat dans un environnement de calcul ou dans les pipelines déclaratifs Spark Lakeflow configurés avec le catalogue Unity.

Cette authentification ne fonctionne pas sur le calculateur avec le mode d'accès standard ni sur les pipelines déclaratifs Spark de Lakeflow du catalogue Unity.

Pour effectuer l’authentification avec l’ID Microsoft Entra, vous devez avoir les valeurs suivantes :

  • ID de locataire. Vous pouvez le trouver dans l’onglet des services Microsoft Entra ID.

  • Un ID client (également appelé ID d’application).

  • Une clé secrète client. Une fois que vous l’avez, vous devez l’ajouter en tant que clé secrète à votre espace de travail Databricks. Pour ajouter cette clé secrète, consultez Gestion des secrets.

  • Une rubrique EventHubs. Vous pouvez trouver la liste des rubriques dans la section Event Hubs sous la section Entités, sur une page spécifique d’un Espace de noms Event Hubs. Pour utiliser plusieurs rubriques, vous pouvez définir un rôle IAM au niveau d’Event Hubs.

  • Un serveur EventHubs. Vous pouvez le trouver sur la page de présentation de votre espace de noms Event Hubs spécifique :

    Espace de noms Event Hubs

En outre, pour utiliser Entra ID, nous devons indiquer à Kafka d’utiliser le mécanisme OAuth SASL (SASL est un protocole générique, et OAuth est un type de « mécanisme » SASL) :

  • kafka.security.protocol doit être SASL_SSL
  • kafka.sasl.mechanism doit être OAUTHBEARER
  • kafka.sasl.login.callback.handler.class doit être un nom complet de la classe Java avec la valeur kafkashaded sur le gestionnaire de rappel de connexion de notre classe Kafka ombrée. Consultez l’exemple suivant pour connaître la classe exacte.

L’exemple suivant configure Kafka pour se connecter à Azure Event Hubs à l’aide de l’authentification Microsoft Entra ID avec un ID client et un secret :

Python

# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")

event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------

sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'

kafka_options = {
    "kafka.bootstrap.servers": f"{event_hubs_server}:9093", # Port 9093 is the EventHubs Kafka port
    "kafka.sasl.jaas.config": sasl_config,
    "kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
    "subscribe": event_hubs_topic,

    # You should not need to modify these
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.mechanism": "OAUTHBEARER",
    "kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}

df = spark.readStream.format("kafka").options(**kafka_options)

display(df)

Scala

// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")

val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------

val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> s"$eventHubsServer:9093", // Port 9093 is the EventHubs Kafka port
  "kafka.sasl.jaas.config" -> saslConfig,
  "kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
  "subscribe" -> eventHubsTopic,

  // You should not need to modify these
  "kafka.security.protocol" -> "SASL_SSL",
  "kafka.sasl.mechanism" -> "OAUTHBEARER",
  "kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)

val scalaDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()

display(scalaDF)

SQL

CREATE OR REFRESH STREAMING TABLE <table_name>
AS
SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<event-hubs-server>:9093',
  subscribe => '<event-hubs-topic>',
  `kafka.security.protocol` => 'SASL_SSL',
  `kafka.sasl.mechanism` => 'OAUTHBEARER',
  `kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="<client-id>" clientSecret="<client-secret>" scope="https://<event-hubs-server>/.default" ssl.protocol="SSL";',
  `kafka.sasl.oauthbearer.token.endpoint.url` => 'https://login.microsoft.com/<tenant-id>/oauth2/v2.0/token',
  `kafka.sasl.login.callback.handler.class` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler'
);

Utiliser SSL pour connecter Azure Databricks à Kafka

Pour activer les connexions SSL à Kafka, suivez les instructions fournies dans la documentation de Confluent Chiffrement et authentification avec SSL. Vous pouvez fournir les configurations décrites ici, préfixées par kafka., comme options. Par exemple, l’emplacement du magasin de confiance est spécifié avec la propriété kafka.ssl.truststore.location.

Si vous utilisez SSL, Databricks vous recommande :

  • Stockez vos certificats dans un volume de catalogue Unity. Les utilisateurs qui ont accès à la lecture à partir du volume pourront utiliser vos certificats Kafka.
  • Stockez vos mots de passe de certificat en tant que secrets dans une étendue de secrets.

L’exemple suivant utilise des emplacements de stockage d’objets et des secrets Databricks pour activer une connexion SSL :

Python

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", ...)
  .option("kafka.security.protocol", "SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", ...)
  .option("kafka.security.protocol", "SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <truststore-password-key-name>))

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<bootstrap-server>:9092',
  subscribe => '<topic>',
  `kafka.security.protocol` => 'SSL',
  `kafka.ssl.truststore.location` => '<truststore-location>',
  `kafka.ssl.keystore.location` => '<keystore-location>',
  `kafka.ssl.keystore.password` => secret('<certificate-scope-name>', '<keystore-password-key-name>'),
  `kafka.ssl.truststore.password` => secret('<certificate-scope-name>', '<truststore-password-key-name>')
);

Connecter Kafka sur HDInsight à Azure Databricks

  1. Créez un cluster Kafka sur HDInsight.

    Pour obtenir des instructions, consultez Se connecter à Kafka sur HDInsight via un réseau virtuel Azure.

  2. Configurez les répartiteurs Kafka pour qu’ils publient l’adresse correcte.

    Suivez les instructions fournies dans Configurer Kafka pour la publication d’adresses IP. Si vous gérez Kafka vous-même sur des machines virtuelles Azure, assurez-vous que la configuration advertised.listeners des courtiers est définie sur l’adresse IP interne des hôtes.

  3. Créez un cluster Azure Databricks

  4. Appairez le cluster Kafka au cluster Azure Databricks.

    Suivez les instructions fournies dans Appairer des réseaux virtuels.

Gestion des erreurs potentielles

  • Échec de la création d’un nouveau KafkaAdminClient

    Cette erreur Kafka interne est levée si l’une des options d’authentification suivantes est incorrecte :

    • ID client (également appelé ID d’application)
    • ID du locataire
    • Serveur Event Hubs

    Pour résoudre cette erreur, vérifiez que les valeurs sont correctes pour ces options. En outre, vous pouvez voir cette erreur si vous modifiez les options de configuration fournies par défaut dans l’exemple (telles que kafka.security.protocol).

  • Aucun enregistrement retourné

    Si vous essayez d’afficher ou de traiter votre DataFrame, mais que vous n’obtenez pas de résultats, vous verrez ce qui suit dans l’interface utilisateur.

    Message Aucun résultat

    Ce message signifie que l’authentification a réussi, mais EventHubs n’a retourné aucune donnée. Parmi les causes possibles (non exhaustives) :

    • Vous avez spécifié une rubrique EventHubs incorrecte.
    • L’option de configuration Kafka par défaut est startingOffsetslatest, et vous ne recevez actuellement aucune donnée via la rubrique. Vous pouvez définir startingOffsets sur earliest pour commencer à lire des données à partir des décalages les plus anciens de Kafka.