Delen via


Authenticatie

De Azure Databricks Kafka-connector ondersteunt meerdere verificatiemethoden voor het maken van verbinding met Kafka. In dit artikel worden enkele van de meest voorkomende verificatiemethoden voor Databricks beschreven. De volledige lijst met ondersteunde verificatiemethoden vindt u in de Kafka-documentatie.

Verificatie van service-principal met Microsoft Entra ID en Azure Event Hubs

Azure Databricks ondersteunt de verificatie van Spark-taken met Event Hubs-services. Deze verificatie wordt uitgevoerd via OAuth met Microsoft Entra-id.

AAD-verificatiediagram

Verbinding maken met de Unity Catalog-service met behulp van service-referenties

Sinds de release van Databricks Runtime 16.1 ondersteunt Azure Databricks servicereferenties voor Unity Catalog voor verificatie van toegang tot AWS Managed Streaming voor Azure Event Hubs. Databricks raadt deze aanpak aan, met name bij het uitvoeren van Kafka-streaming op gedeelde clusters of serverloze compute.

Voer de volgende stappen uit om een Unity Catalog-servicereferentie te gebruiken voor verificatie:

  • Maak een nieuwe Unity Catalog-servicereferentie. Als u niet bekend bent met dit proces, raadpleeg dan Servicereferenties maken voor instructies over het aanmaken van servicereferenties.
    • Zorg ervoor dat de toegangsconnector die is gekoppeld aan uw servicereferentie, over de benodigde machtigingen beschikt om verbinding te maken met Azure Event Hubs.
  • Geef de naam van uw Unity Catalog-servicereferentie op als bronoptie in uw Kafka-configuratie. Stel de optie databricks.serviceCredential in op de naam van uw servicereferentie.

In het volgende voorbeeld wordt Kafka als bron geconfigureerd met behulp van een servicereferentie:

Python

kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
  "subscribe": "<topic>",
  "databricks.serviceCredential": "<service-credential-name>",
  # Optional: set this only if Databricks can't infer the scope for your Kafka service.
  # "databricks.serviceCredential.scope": "https://<event-hubs-server>/.default",
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Scala

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
  "subscribe" -> "<topic>",
  "databricks.serviceCredential" -> "<service-credential-name>",
  // Optional: set this only if Databricks can't infer the scope for your Kafka service.
  // "databricks.serviceCredential.scope" -> "https://<event-hubs-server>/.default",
)

val df = spark.readStream.format("kafka").options(kafkaOptions).load()

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<bootstrap-hostname>:9092',
  subscribe => '<topic>',
  serviceCredential => '<service-credential-name>'
);

Opmerking: wanneer u een Unity Catalog-servicereferentie gebruikt om verbinding te maken met Kafka, zijn de volgende opties niet meer nodig:

  • kafka.sasl.mechanism
  • kafka.sasl.jaas.config
  • kafka.security.protocol
  • kafka.sasl.client.callback.handler.class
  • kafka.sasl.oauthbearer.token.endpoint.url

Verbinding maken met een client-id en -geheim

Azure Databricks biedt ondersteuning voor Microsoft Entra ID-verificatie met een client-id en geheim in de volgende rekenomgevingen:

  • Databricks Runtime 12.2 LTS en hoger op compute dat is geconfigureerd met de exclusieve toegangsmodus (voorheen de modus voor toegang van één gebruiker).
  • Databricks Runtime 14.3 LTS en hoger op systemen die zijn geconfigureerd met standaard toegangsmodus (voorheen gedeelde toegangsmodus).
  • Lakeflow Spark-declaratieve pijplijnen die zijn geconfigureerd zonder Unity Catalog.

Azure Databricks biedt geen ondersteuning voor Microsoft Entra ID-verificatie met een certificaat in een rekenomgeving of in Lakeflow Spark-declaratieve pijplijnen die zijn geconfigureerd met Unity Catalog.

Deze verificatie werkt niet op berekeningen met de standaardtoegangsmodus of op Unity Catalog Lakeflow Spark-declaratieve pijplijnen.

Als u verificatie wilt uitvoeren met Microsoft Entra-id, moet u de volgende waarden hebben:

  • Een tenant-id. U vindt dit op het tabblad Services van Microsoft Entra ID .

  • Een clientID (ook wel toepassings-id genoemd).

  • Een clientgeheim. Zodra u dit hebt, moet u het toevoegen als een geheim aan uw Databricks-werkruimte. Zie Geheimbeheer om dit geheim toe te voegen.

  • Een EventHubs-onderwerp. U vindt een lijst met onderwerpen in de sectie Event Hubs onder de sectie Entiteiten op een specifieke Event Hubs-naamruimte pagina. Als u met meerdere onderwerpen wilt werken, kunt u de IAM-rol instellen op Event Hubs-niveau.

  • Een EventHubs-server. U vindt dit op de overzichtspagina van uw specifieke Event Hubs-naamruimte:

    Event Hubs-naamruimte

Daarnaast moeten we Kafka vertellen dat we Entra ID moeten gebruiken en het OAuth SASL-mechanisme moeten toepassen (SASL is een algemeen protocol en OAuth is een type SASL-mechanisme):

  • kafka.security.protocol moet zijn SASL_SSL
  • kafka.sasl.mechanism moet zijn OAUTHBEARER
  • kafka.sasl.login.callback.handler.class moet een volledig gekwalificeerde naam van de Java-klasse zijn met een waarde van kafkashaded voor de callback-handler voor aanmelding van onze gearceerde Kafka-klasse. Zie het volgende voorbeeld voor de exacte klasse.

In het volgende voorbeeld wordt Kafka geconfigureerd om verbinding te maken met Azure Event Hubs met behulp van Microsoft Entra ID-verificatie met een client-id en geheim:

Python

# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")

event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------

sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'

kafka_options = {
    "kafka.bootstrap.servers": f"{event_hubs_server}:9093", # Port 9093 is the EventHubs Kafka port
    "kafka.sasl.jaas.config": sasl_config,
    "kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
    "subscribe": event_hubs_topic,

    # You should not need to modify these
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.mechanism": "OAUTHBEARER",
    "kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}

df = spark.readStream.format("kafka").options(**kafka_options)

display(df)

Scala

// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")

val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------

val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> s"$eventHubsServer:9093", // Port 9093 is the EventHubs Kafka port
  "kafka.sasl.jaas.config" -> saslConfig,
  "kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
  "subscribe" -> eventHubsTopic,

  // You should not need to modify these
  "kafka.security.protocol" -> "SASL_SSL",
  "kafka.sasl.mechanism" -> "OAUTHBEARER",
  "kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)

val scalaDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()

display(scalaDF)

SQL

CREATE OR REFRESH STREAMING TABLE <table_name>
AS
SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<event-hubs-server>:9093',
  subscribe => '<event-hubs-topic>',
  `kafka.security.protocol` => 'SASL_SSL',
  `kafka.sasl.mechanism` => 'OAUTHBEARER',
  `kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="<client-id>" clientSecret="<client-secret>" scope="https://<event-hubs-server>/.default" ssl.protocol="SSL";',
  `kafka.sasl.oauthbearer.token.endpoint.url` => 'https://login.microsoft.com/<tenant-id>/oauth2/v2.0/token',
  `kafka.sasl.login.callback.handler.class` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler'
);

SSL gebruiken om Azure Databricks te verbinden met Kafka

Volg de instructies in de Confluent-documentatie Encryption and Authentication met SSLom SSL-verbindingen met Kafka in te schakelen. U kunt de configuraties opgeven die daar worden beschreven, voorafgegaan door kafka., als opties. De locatie van de truststore wordt bijvoorbeeld opgegeven met de eigenschap kafka.ssl.truststore.location.

Als u SSL gaat gebruiken, raadt Databricks u het volgende aan:

In het volgende voorbeeld worden objectopslaglocaties en Databricks-geheimen gebruikt om een SSL-verbinding in te schakelen:

Python

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", ...)
  .option("kafka.security.protocol", "SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", ...)
  .option("kafka.security.protocol", "SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <truststore-password-key-name>))

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<bootstrap-server>:9092',
  subscribe => '<topic>',
  `kafka.security.protocol` => 'SSL',
  `kafka.ssl.truststore.location` => '<truststore-location>',
  `kafka.ssl.keystore.location` => '<keystore-location>',
  `kafka.ssl.keystore.password` => secret('<certificate-scope-name>', '<keystore-password-key-name>'),
  `kafka.ssl.truststore.password` => secret('<certificate-scope-name>', '<truststore-password-key-name>')
);

Kafka in HDInsight verbinden met Azure Databricks

  1. Maak een HDInsight Kafka-cluster.

    Zie Verbinding maken met Kafka in HDInsight via een virtueel Azure-netwerk voor instructies.

  2. Configureer de Kafka-brokers om het juiste adres te adverteren.

    Volg de instructies in Kafka configureren voor IP-reclame. Als u Kafka zelf beheert op virtuele Azure-machines, moet u ervoor zorgen dat de advertised.listeners configuratie van de brokers is ingesteld op het interne IP-adres van de hosts.

  3. Maak een Azure Databricks-cluster.

  4. Koppel het Kafka-cluster aan het Azure Databricks-cluster.

    Volg de instructies in virtuele peernetwerken.

Mogelijke fouten afhandelen

  • Kan geen nieuwe maken KafkaAdminClient

    Deze interne Kafka-fout wordt gegenereerd als een van de volgende verificatieopties onjuist is:

    • Client-id (ook wel toepassings-id genoemd)
    • Tenant-id
    • Event Hubs-server

    Als u de fout wilt oplossen, controleert u of de waarden juist zijn voor deze opties. Bovendien ziet u deze fout mogelijk als u de standaardconfiguratieopties in het voorbeeld wijzigt (zoals kafka.security.protocol).

  • Er zijn geen records geretourneerd

    Als u uw DataFrame probeert weer te geven of te verwerken, maar geen resultaten krijgt, ziet u het volgende in de gebruikersinterface.

    Geen resultaten gevonden

    Dit bericht betekent dat de verificatie is geslaagd, maar EventHubs geen gegevens heeft geretourneerd. Een aantal mogelijke (hoewel niet volledig) redenen zijn:

    • U hebt het verkeerde EventHubs-onderwerp opgegeven.
    • De standaard Kafka-configuratieoptie voor startingOffsets is latest, en u ontvangt momenteel nog geen gegevens via de topic. U kunt startingOffsets instellen op earliest om gegevens te lezen vanaf de vroegste offsets van Kafka.