Sdílet prostřednictvím


Připojení k Apache Kafka

Tento článek popisuje, jak použít Apache Kafka jako zdroj nebo jímku při spouštění úloh strukturovaného streamování v Azure Databricks.

Další informace o platformě Kafka najdete v dokumentaci k Apache Kafka.

Čtení dat ze systému Kafka

Azure Databricks poskytuje kafka klíčové slovo jako datový formát pro konfiguraci připojení k Kafka. Následuje příklad pro streamované čtení:

Python

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()

SQL

CREATE OR REFRESH STREAMING TABLE <table_name> AS
SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>'
);

Azure Databricks podporuje také sémantiku dávkového čtení, jak je znázorněno v následujícím příkladu:

Python

df = (spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
)

Scala

val df = spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'earliest',
  endingOffsets => 'latest'
);

Pro přírůstkové dávkové načítání doporučuje Databricks používat Kafka s Trigger.AvailableNow. Viz AvailableNow: Přírůstkové dávkové zpracování.

V Databricks Runtime 13.3 LTS a novějších poskytuje Azure Databricks také funkci SQL pro čtení dat Kafka. Streamování s SQL se podporuje jenom v deklarativních kanálech Sparku Lakeflow nebo s streamovanými tabulkami v Databricks SQL. Viz tabulkovou funkci read_kafka.

Konfigurace čtečky strukturovaného streamování Kafka

Pro zdroj Kafka musí být nastavená následující možnost pro dávkové i streamované dotazy:

Možnost Hodnota Description
kafka.bootstrap.servers Čárkami oddělený seznam hostitel:port Bootstrap servery clusteru Kafka

Kromě toho je nutné vybrat jednu z následujících možností pro určení, k jakým tématům se chcete přihlásit k odběru:

Možnost Hodnota Description
subscribe Seznam témat oddělených čárkami. Seznam témat pro přihlášení k odběru.
subscribePattern Řetězec regulárního výrazu Java. Vzor použitý k přihlášení k odběru témat.
assign Řetězec JSON {"topicA":[0,1],"topic":[2,4]}. Určité části tématu ke spotřebě.

Úplný seznam dostupných možností najdete na stránce Možnosti .

Schéma pro záznamy Kafka

Záznamy vrácené čtenářem strukturovaného streamování Kafka budou mít následující schéma:

Sloupec Typ
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampType int

key a value jsou vždy deserializovány jako bajtová pole s ByteArrayDeserializer. K explicitní deserializaci klíčů a hodnot použijte operace datového rámce (například cast("string") nebo from_avro).

Zápis dat do Kafky

Následuje příklad streamování zápisu do Kafka:

Python

(df.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()
)

Scala

df.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()

Azure Databricks podporuje také sémantiku dávkového zápisu do datových jímek Kafka, jak je znázorněno v následujícím příkladu:

Python

(df.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()
)

Scala

df.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()

Konfigurace zapisovače strukturovaného streamování Kafka

Důležité

Databricks Runtime 13.3 LTS a vyšší obsahuje novější verzi kafka-clients knihovny, která ve výchozím nastavení umožňuje zápisy idempotentní. Pokud Kafka sink používá verzi 2.8.0 nebo nižší s nakonfigurovanými ACL, ale bez povolenou IDEMPOTENT_WRITE, zápis selže s chybovou zprávou org.apache.kafka.common.KafkaException:Cannot execute transactional method because we are in an error state.

Tuto chybu můžete vyřešit upgradem na Kafka verze 2.8.0 nebo vyšší nebo nastavením .option(“kafka.enable.idempotence”, “false”) při konfiguraci zapisovače strukturovaného streamování.

Níže jsou uvedené běžné možnosti nastavené při psaní do systému Kafka:

Možnost Hodnota Výchozí hodnota Description
kafka.boostrap.servers Čárkami oddělený seznam <host:port> žádné [Povinné] Konfigurace Kafka bootstrap.servers .
topic STRING nenastaveno [Volitelné] Nastaví téma pro zápis všech řádků. Tato možnost přepíše libovolný sloupec tématu, který v datech existuje.
includeHeaders BOOLEAN false [Volitelné] Zda se mají do řádku zahrnout hlavičky Kafka.

Úplný seznam dostupných možností najdete na stránce Možnosti .

Schéma pro zapisovač Kafka

Při zápisu dat do systému Kafka může zadaný datový rámec obsahovat následující pole:

Název sloupce Povinné či volitelné Typ
key volitelný STRING nebo BINARY
value povinné STRING nebo BINARY
headers volitelný ARRAY
topic volitelné (ignorováno, pokud je možnost topic nastavena jako writer) STRING
partition volitelný INT

Autentizace

Azure Databricks podporuje více metod ověřování pro Kafka, včetně přihlašovacích údajů ke službě Unity Catalog, SASL/SSL a možností specifických pro cloud pro AWS MSK, Azure Event Hubs a Google Cloud Managed Kafka. Viz Ověřování.

Načtěte metriky Kafka

Pomocí metrik avgOffsetsBehindLatest, maxOffsetsBehindLatest a minOffsetsBehindLatest můžete sledovat, jak daleko streamovací dotaz zaostává za zpracováním v systému Kafka. Tyto sestavy hlásí průměrné, maximální a minimální offsetové zpoždění ve všech předplacených particích tématu vzhledem k nejnovějším posunům v Kafka. Podívejte se na interaktivní čtení metrik.

Pokud chcete odhadnout, kolik dat dotaz ještě nespotřeboval, použijte metriku estimatedTotalBytesBehindLatest. Tato metrika odhaduje celkový počet bajtů zbývajících ve všech odběratelských oddílech na základě dávek zpracovaných v posledních 300 sekundách. Nastavením této možnosti bytesEstimateWindowLength můžete upravit časové období použité pro tento odhad. Pokud ho chcete například nastavit na 10 minut:

Python

df = (spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") // m for minutes, you can also use "600s" for 600 seconds

Pokud spouštíte stream v poznámkovém bloku, můžete tyto metriky zobrazit v záložce Nezpracovaná data na řídicím panelu průběhu dotazu streamování.

{
  "sources": [
    {
      "description": "KafkaV2[Subscribe[topic]]",
      "metrics": {
        "avgOffsetsBehindLatest": "4.0",
        "maxOffsetsBehindLatest": "4",
        "minOffsetsBehindLatest": "4",
        "estimatedTotalBytesBehindLatest": "80.0"
      }
    }
  ]
}

Další informace najdete v tématu Monitorování dotazů strukturovaného streamování v Azure Databricks .

Příklad kódu: Kafka do Delta

Následující příklad ukazuje kompletní pracovní postup pro nepřetržitě streamovaná data z Kafka do tabulky Delta. Tento model je ideální pro úlohy příjmu dat téměř v reálném čase.

Tento příklad používá pevné schéma JSON. Pro jiné formáty, jako je Avro nebo Protobuf, použijte from_avro nebo from_protobuf. Můžete se také integrovat s repositářem schématu. Viz příklad s registrem schématu.

Python

from pyspark.sql.functions import from_json, col

# Define simple JSON schemas for key and value
key_schema = "user_id STRING"
value_schema = "event_type STRING, event_ts TIMESTAMP"

# Configure Kafka options with service credentials
kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-server>:9092",
  "subscribe": "<topic-name>",
  "databricks.serviceCredential": "<service-credential-name>",
}

# Read from Kafka and parse JSON
parsed_df = (spark.readStream
  .format("kafka")
  .options(**kafka_options)
  .load()
  .select(
    from_json(col("key").cast("string"), key_schema).alias("key"),
    from_json(col("value").cast("string"), value_schema).alias("value")
  )
  .select("key.*", "value.*")
)

# Write to Delta table
query = (parsed_df.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .trigger(processingTime="10 seconds")
  .toTable("catalog.schema.events_table")
)

query.awaitTermination()

Scala

import org.apache.spark.sql.functions.{from_json, col}
import org.apache.spark.sql.streaming.Trigger

// Define JSON schemas for key and value
val keySchema = "user_id STRING"
val valueSchema = "event_type STRING, event_ts TIMESTAMP"

// Configure Kafka options with service credentials
val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-server>:9092",
  "subscribe" -> "<topic-name>",
  "databricks.serviceCredential" -> "<service-credential-name>"
)

// Read from Kafka and parse JSON
val parsedDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()
  .select(
    from_json(col("key").cast("string"), keySchema).alias("key"),
    from_json(col("value").cast("string"), valueSchema).alias("value")
  )
  .select("key.*", "value.*")

// Write to Delta table
val query = parsedDF.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .toTable("catalog.schema.events_table")

query.awaitTermination()

SQL

-- Create a streaming table from Kafka using read_kafka
CREATE OR REFRESH STREAMING TABLE catalog.schema.events_table AS
SELECT
  key::string:user_id AS user_id,
  value::string:event_type AS event_type,
  to_timestamp(value::string:event_ts) AS event_ts
FROM STREAM read_kafka(
  bootstrapServers => '<bootstrap-server>:9092',
  subscribe => '<topic-name>',
  serviceCredential => '<service-credential-name>'
);