Przetwarzanie strumieniowe przy użyciu platformy Apache Kafka i usługi Azure Databricks

W tym artykule opisano, jak można używać platformy Apache Kafka jako źródła lub ujścia podczas uruchamiania obciążeń przesyłania strumieniowego ze strukturą w usłudze Azure Databricks.

Aby uzyskać więcej informacji na temat platformy Kafka, zobacz dokumentację platformy Kafka.

Odczytywanie danych z platformy Kafka

Poniżej przedstawiono przykład przesyłania strumieniowego z platformy Kafka:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

Usługa Azure Databricks obsługuje również semantyka odczytu wsadowego dla źródeł danych platformy Kafka, jak pokazano w poniższym przykładzie:

df = (spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
)

W przypadku ładowania przyrostowego wsadowego usługa Databricks zaleca używanie platformy Kafka z usługą Trigger.AvailableNow. Zobacz Konfigurowanie przyrostowego przetwarzania wsadowego.

W środowisku Databricks Runtime 13.3 LTS i nowszym usługa Azure Databricks udostępnia funkcję SQL do odczytywania danych platformy Kafka. Przesyłanie strumieniowe za pomocą języka SQL jest obsługiwane tylko w tabelach delta live lub w tabelach przesyłania strumieniowego w usłudze Databricks SQL. Zobacz read_kafka funkcji wartości tabeli.

Konfigurowanie czytnika przesyłania strumieniowego ze strukturą platformy Kafka

Usługa Azure Databricks udostępnia kafka słowo kluczowe jako format danych do konfigurowania połączeń z platformą Kafka w wersji 0.10 lub nowszej.

Poniżej przedstawiono najbardziej typowe konfiguracje platformy Kafka:

Istnieje wiele sposobów określania tematów do zasubskrybowania. Należy podać tylko jeden z następujących parametrów:

Opcja Wartość Opis
subskrybowanie Rozdzielona przecinkami lista tematów. Lista tematów do subskrybowania.
subscribePattern Ciąg wyrażenia regularnego języka Java. Wzorzec używany do subskrybowania tematów.
przypisywanie Ciąg {"topicA":[0,1],"topic":[2,4]}JSON . Określone tematyPartition do korzystania.

Inne istotne konfiguracje:

Opcja Wartość Wartość domyślna opis
kafka.bootstrap.servers Rozdzielona przecinkami lista host:port. empty [Wymagane] Konfiguracja platformy Kafka bootstrap.servers . Jeśli okaże się, że nie ma danych z platformy Kafka, najpierw sprawdź listę adresów brokera. Jeśli lista adresów brokera jest niepoprawna, może nie występować żadne błędy. Dzieje się tak, ponieważ klient platformy Kafka zakłada, że brokerzy staną się dostępne w końcu i w przypadku błędów sieci ponawiania próby na zawsze.
failOnDataLoss Usługa true lub false. true [Opcjonalnie] Czy zapytanie nie powiodło się, jeśli jest możliwe, że dane zostały utracone. Zapytania mogą trwale nie odczytać danych z platformy Kafka ze względu na wiele scenariuszy, takich jak usunięte tematy, obcięcie tematu przed przetworzeniem itd. Staramy się oszacować konserwatywnie, czy dane mogły być utracone, czy nie. Czasami może to spowodować fałszywe alarmy. Ustaw tę opcję na false wartość , jeśli nie działa zgodnie z oczekiwaniami lub chcesz, aby zapytanie kontynuowało przetwarzanie pomimo utraty danych.
minPartitions Liczba całkowita >= 0, 0 = wyłączona. 0 (wyłączone) [Opcjonalnie] Minimalna liczba partycji do odczytu z platformy Kafka. Platformę Spark można skonfigurować tak, aby używała dowolnej minimalnej liczby partycji do odczytu z platformy Kafka przy użyciu minPartitions opcji . Zwykle platforma Spark ma mapowanie 1–1 tematów platformy KafkaPartitions na partycje platformy Spark zużywane z platformy Kafka. Jeśli ustawisz minPartitions opcję na wartość większą niż temat platformy KafkaPartitions, platforma Spark podzieli duże partycje platformy Kafka na mniejsze elementy. Tę opcję można ustawić w okresach szczytowych obciążeń, niesymetryczności danych i w miarę zwiększania szybkości przetwarzania strumienia. Wiąże się to z kosztem inicjowania użytkowników platformy Kafka w każdym wyzwalaczu, co może mieć wpływ na wydajność w przypadku używania protokołu SSL podczas nawiązywania połączenia z platformą Kafka.
kafka.group.id Identyfikator grupy odbiorców platformy Kafka. nie ustawiono [Opcjonalnie] Identyfikator grupy do użycia podczas odczytywania z platformy Kafka. Użyj tego z ostrożnością. Domyślnie każde zapytanie generuje unikatowy identyfikator grupy do odczytywania danych. Gwarantuje to, że każde zapytanie ma własną grupę odbiorców, która nie ma wpływu na żadnego innego użytkownika, i dlatego może odczytywać wszystkie partycje subskrybowanych tematów. W niektórych scenariuszach (na przykład autoryzacja oparta na grupach platformy Kafka) możesz użyć określonych autoryzowanych identyfikatorów grup do odczytywania danych. Opcjonalnie można ustawić identyfikator grupy. Jednak zrób to z skrajną ostrożnością, ponieważ może to spowodować nieoczekiwane zachowanie.

* Współbieżnie uruchomione zapytania (zarówno wsadowe, jak i przesyłane strumieniowo) z tym samym identyfikatorem grupy prawdopodobnie zakłócają każdą kwerendę w celu odczytu tylko części danych.
* Może to również wystąpić, gdy zapytania są uruchamiane/uruchamiane ponownie w krótkim odstępie czasu. Aby zminimalizować takie problemy, ustaw konfigurację session.timeout.ms klienta platformy Kafka na bardzo małą.
startOffsets najwcześniejsza , najnowsza latest [Opcjonalnie] Punkt początkowy, gdy zapytanie jest uruchamiane, "najwcześniejsze", czyli od najwcześniejszych przesunięć, lub ciąg json określający przesunięcie początkowe dla każdej części tematu. W formacie json -2 jako przesunięcie może służyć do odwoływania się do najwcześniejszego, od -1 do najnowszego. Uwaga: w przypadku zapytań wsadowych najnowsza wersja (niejawnie lub przy użyciu -1 w formacie json) jest niedozwolona. W przypadku zapytań przesyłanych strumieniowo ma to zastosowanie tylko wtedy, gdy zostanie uruchomione nowe zapytanie, a wznowienie zawsze będzie pobierane z miejsca, w którym zapytanie zostało przerwane. Nowo odnalezione partycje podczas zapytania będą uruchamiane najwcześniej.

Aby uzyskać informacje o innych opcjonalnych konfiguracjach, zobacz Przewodnik integracji ze strukturą platformy Kafka.

Schemat rekordów platformy Kafka

Schemat rekordów platformy Kafka to:

Kolumna Type
key dane binarne
wartość dane binarne
topic string
partycji int
offset długi
timestamp długi
timestampType int

Obiekty key i value są zawsze deserializowane jako tablice bajtów za pomocą .ByteArrayDeserializer Użyj operacji ramki danych (takich jak cast("string")) w celu jawnego deserializacji kluczy i wartości.

Zapisywanie danych na platformie Kafka

Poniżej przedstawiono przykład przesyłania strumieniowego zapisu na platformie Kafka:

(df
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()
)

Usługa Azure Databricks obsługuje również semantyka zapisu wsadowego na ujściach danych platformy Kafka, jak pokazano w poniższym przykładzie:

(df
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()
)

Konfigurowanie modułu zapisywania przesyłania strumieniowego ze strukturą platformy Kafka

Ważne

Środowisko Databricks Runtime 13.3 LTS i nowsze zawierają nowszą wersję kafka-clients biblioteki, która domyślnie umożliwia zapisywanie idempotentne. Jeśli ujście platformy Kafka używa wersji 2.8.0 lub nowszej ze skonfigurowanymi listami ACL, ale bez IDEMPOTENT_WRITE włączenia tej opcji zapis kończy się niepowodzeniem z komunikatem org.apache.kafka.common.KafkaException:Cannot execute transactional method because we are in an error stateo błędzie .

Rozwiąż ten błąd, uaktualniając do platformy Kafka w wersji 2.8.0 lub nowszej lub przez ustawienie .option(“kafka.enable.idempotence”, “false”) podczas konfigurowania składnika zapisywania przesyłania strumieniowego ze strukturą.

Schemat dostarczony do elementu DataStreamWriter współdziała z ujściem platformy Kafka. Możesz użyć następujących pól:

Nazwa kolumny Wymagane lub opcjonalne Typ
key optional STRING lub BINARY
value wymagane STRING lub BINARY
headers optional ARRAY
topic opcjonalne (ignorowane, jeśli topic jest ustawiona jako opcja zapisywania) STRING
partition optional INT

Poniżej przedstawiono typowe opcje ustawione podczas zapisywania na platformie Kafka:

Opcja Wartość Wartość domyślna opis
kafka.boostrap.servers Rozdzielona przecinkami lista <host:port> Brak [Wymagane] Konfiguracja platformy Kafka bootstrap.servers .
topic STRING nie ustawiono [Opcjonalnie] Ustawia temat dla wszystkich wierszy do zapisania. Ta opcja zastępuje dowolną kolumnę tematu, która istnieje w danych.
includeHeaders BOOLEAN false [Opcjonalnie] Określa, czy w wierszu mają być uwzględniane nagłówki platformy Kafka.

Aby uzyskać informacje o innych opcjonalnych konfiguracjach, zobacz Przewodnik integracji ze strukturą platformy Kafka.

Pobieranie metryk platformy Kafka

Możesz uzyskać średnią, minimalną i maksymalną liczbę przesunięć, które zapytanie przesyłane strumieniowo znajduje się za najnowszym dostępnym przesunięciem wśród wszystkich subskrybowanych tematów z avgOffsetsBehindLatestmetrykami , maxOffsetsBehindLatesti minOffsetsBehindLatest . Zobacz Interaktywne odczytywanie metryk.

Uwaga

Dostępne w środowisku Databricks Runtime 9.1 lub nowszym.

Uzyskaj szacowaną łączną liczbę bajtów, z których proces zapytania nie korzysta z subskrybowanych tematów, sprawdzając wartość estimatedTotalBytesBehindLatest. To oszacowanie jest oparte na partiach, które zostały przetworzone w ciągu ostatnich 300 sekund. Przedział czasu, na podstawie którego jest szacowany, można zmienić, ustawiając opcję bytesEstimateWindowLength na inną wartość. Aby na przykład ustawić go na 10 minut:

df = (spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

Jeśli używasz strumienia w notesie, możesz zobaczyć te metryki na karcie Nieprzetworzone dane na pulpicie nawigacyjnym postępu zapytania przesyłania strumieniowego:

{
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[topic]]",
    "metrics" : {
      "avgOffsetsBehindLatest" : "4.0",
      "maxOffsetsBehindLatest" : "4",
      "minOffsetsBehindLatest" : "4",
      "estimatedTotalBytesBehindLatest" : "80.0"
    },
  } ]
}

Łączenie usługi Azure Databricks z platformą Kafka przy użyciu protokołu SSL

Aby włączyć połączenia SSL z platformą Kafka, postępuj zgodnie z instrukcjami w dokumentacji Platformy Confluent Szyfrowanie i uwierzytelnianie przy użyciu protokołu SSL. Konfiguracje opisane w tym miejscu można podać z prefiksem kafka., jako opcje. Na przykład należy określić lokalizację magazynu zaufania we właściwości kafka.ssl.truststore.location.

Usługa Databricks zaleca:

  • Przechowywanie certyfikatów w magazynie obiektów w chmurze. Dostęp do certyfikatów można ograniczyć tylko do klastrów, które mogą uzyskiwać dostęp do platformy Kafka. Zobacz Zarządzanie danymi za pomocą wykazu aparatu Unity.
  • Przechowuj hasła certyfikatu jako wpisy tajne w zakresie wpisów tajnych.

W poniższym przykładzie użyto lokalizacji magazynu obiektów i wpisów tajnych usługi Databricks w celu włączenia połączenia SSL:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", ...)
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Połączenie platformę Kafka w usłudze HDInsight w usłudze Azure Databricks

  1. Utwórz klaster platformy Kafka usługi HDInsight.

    Aby uzyskać instrukcje, zobacz Połączenie do platformy Kafka w usłudze HDInsight za pośrednictwem usługi Azure Virtual Network.

  2. Skonfiguruj brokerów platformy Kafka, aby anonsować poprawny adres.

    Postępuj zgodnie z instrukcjami w temacie Konfigurowanie platformy Kafka pod kątem reklam IP. Jeśli samodzielnie zarządzasz platformą Kafka w usłudze Azure Virtual Machines, upewnij się, że advertised.listeners konfiguracja brokerów jest ustawiona na wewnętrzny adres IP hostów.

  3. Utwórz klaster usługi Azure Databricks.

  4. Zaimów klaster Platformy Kafka do klastra usługi Azure Databricks.

    Postępuj zgodnie z instrukcjami w temacie Równorzędne sieci wirtualne.

Uwierzytelnianie jednostki usługi przy użyciu identyfikatora Entra firmy Microsoft (dawniej Azure Active Directory) i usługi Azure Event Hubs

Usługa Azure Databricks obsługuje uwierzytelnianie zadań platformy Spark za pomocą usług Event Hubs. To uwierzytelnianie odbywa się za pośrednictwem protokołu OAuth z identyfikatorem Entra firmy Microsoft (dawniej Azure Active Directory).

Diagram uwierzytelniania usługi AAD

Usługa Azure Databricks obsługuje uwierzytelnianie identyfikatora Entra firmy Microsoft z identyfikatorem klienta i wpisem tajnym w następujących środowiskach obliczeniowych:

  • Środowisko Databricks Runtime 12.2 LTS lub nowsze w środowisku obliczeniowym skonfigurowanym z trybem dostępu pojedynczego użytkownika.
  • Środowisko Databricks Runtime 14.3 LTS lub nowsze w środowisku obliczeniowym skonfigurowanym z trybem dostępu współdzielonego.
  • Potoki tabel na żywo delty skonfigurowane bez wykazu aparatu Unity.

Usługa Azure Databricks nie obsługuje uwierzytelniania identyfikatora Entra firmy Microsoft z certyfikatem w żadnym środowisku obliczeniowym ani potoków tabel delta live tables skonfigurowanych za pomocą wykazu aparatu Unity.

To uwierzytelnianie nie działa w przypadku udostępnionych klastrów ani tabel różnicowych w wykazie aparatu Unity.

Konfigurowanie platformy Kafka przesyłania strumieniowego ze strukturą Połączenie or

Aby przeprowadzić uwierzytelnianie przy użyciu identyfikatora Entra firmy Microsoft, potrzebne są następujące wartości:

  • Identyfikator dzierżawy. Tę pozycję można znaleźć na karcie usługi Microsoft Entra ID .

  • ClientID (znany również jako identyfikator aplikacji).

  • Wpis tajny klienta. Po utworzeniu tej funkcji należy dodać go jako wpis tajny do obszaru roboczego usługi Databricks. Aby dodać ten wpis tajny, zobacz Zarządzanie wpisami tajnymi.

  • Temat usługi EventHubs. Listę tematów można znaleźć w sekcji Event Hubs w sekcji Jednostki na określonej stronie Przestrzeni nazw usługi Event Hubs. Aby pracować z wieloma tematami, możesz ustawić rolę zarządzanie dostępem i tożsamościami na poziomie usługi Event Hubs.

  • Serwer usługi EventHubs. Możesz to znaleźć na stronie przeglądu określonej przestrzeni nazw usługi Event Hubs:

    Przestrzeń nazw usługi Event Hubs

Ponadto, aby używać identyfikatora Entra, musimy poinformować platformę Kafka, aby korzystała z mechanizmu SASL OAuth (SASL jest protokołem ogólnym, a protokół OAuth jest typem "mechanizmu" SASL):

  • kafka.security.protocol powinna być SASL_SSL
  • kafka.sasl.mechanism powinna być OAUTHBEARER
  • kafka.sasl.login.callback.handler.class powinna być w pełni kwalifikowaną nazwą klasy Java z wartością kafkashaded dla procedury obsługi wywołania zwrotnego logowania zacienionej klasy kafka. Zobacz poniższy przykład, aby uzyskać dokładną klasę.

Przykład

Następnie przyjrzyjmy się uruchomionego przykładowi:

Python

# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")

event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------

sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'

kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,

# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}

df = spark.readStream.format("kafka").options(**kafka_options)

display(df)

Scala

// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")

val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------

val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""

val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,

// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)

val scalaDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()

display(scalaDF)

Obsługa potencjalnych błędów

  • Opcje przesyłania strumieniowego nie są obsługiwane.

    Jeśli spróbujesz użyć tego mechanizmu uwierzytelniania w potoku Tabele na żywo delty skonfigurowanego z wykazem aparatu Unity, może zostać wyświetlony następujący błąd:

    Nieobsługiwany błąd przesyłania strumieniowego

    Aby rozwiązać ten błąd, użyj obsługiwanej konfiguracji obliczeniowej. Zobacz Uwierzytelnianie jednostki usługi za pomocą identyfikatora Entra firmy Microsoft (dawniej Azure Active Directory) i usługi Azure Event Hubs.

  • Nie można utworzyć nowego KafkaAdminClientelementu .

    Jest to błąd wewnętrzny zgłaszany przez platformę Kafka, jeśli którakolwiek z następujących opcji uwierzytelniania jest niepoprawna:

    • Identyfikator klienta (znany również jako identyfikator aplikacji)
    • Identyfikator dzierżawy
    • Serwer Usługi EventHubs

    Aby rozwiązać ten problem, sprawdź, czy wartości są poprawne dla tych opcji.

    Ponadto ten błąd może zostać wyświetlony, jeśli zmodyfikujesz opcje konfiguracji podane domyślnie w przykładzie (które zostały poproszone o niezmodyfikowanie), takie jak kafka.security.protocol.

  • Nie są zwracane żadne rekordy

    Jeśli próbujesz wyświetlić lub przetworzyć ramkę danych, ale nie otrzymujesz wyników, w interfejsie użytkownika zobaczysz następujące informacje.

    Brak komunikatu o wynikach

    Ten komunikat oznacza, że uwierzytelnianie zakończyło się pomyślnie, ale usługa EventHubs nie zwróciła żadnych danych. Niektóre możliwe (choć w żaden sposób wyczerpujące) przyczyny są następujące:

    • Określono niewłaściwy temat usługi EventHubs .
    • Domyślną opcją startingOffsets konfiguracji platformy Kafka jest latest, a obecnie nie otrzymujesz żadnych danych za pośrednictwem tematu. Możesz rozpocząć startingOffsetstoearliest odczytywanie danych, zaczynając od najwcześniejszych przesunięć platformy Kafka.