Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Na tej stronie opisano opcje konfiguracji odczytu i zapisu na platformie Apache Kafka przy użyciu przesyłania strumieniowego ze strukturą w usłudze Azure Databricks.
Łącznik platformy Kafka usługi Azure Databricks jest oparty na łączniku platformy Apache Spark Kafka i obsługuje wszystkie standardowe opcje konfiguracji platformy Kafka. Każda opcja poprzedzona prefiksem kafka. jest przekazywana bezpośrednio do bazowego klienta platformy Kafka. Na przykład .option("kafka.max.poll.records", "500") ustawia właściwość konsumenta platformy max.poll.records Kafka. Zobacz dokumentację konfiguracji platformy Kafka , aby uzyskać pełną listę dostępnych właściwości platformy Kafka.
Aby uzyskać dodatkowe opcje źródła i ujścia przesyłania strumieniowego ze strukturą, które nie są wymienione na tej stronie, zobacz Przewodnik integracji ze strukturą przesyłania strumieniowego i platformy Kafka.
Wymagane opcje
W przypadku odczytu i zapisu wymagana jest następująca opcja:
| Option | Wartość | Opis |
|---|---|---|
kafka.bootstrap.servers |
Rozdzielona przecinkami lista host:port | Konfiguracja platformy Kafka bootstrap.servers . Jeśli okaże się, że nie ma danych z platformy Kafka, najpierw sprawdź listę adresów brokera. Jeśli lista adresów brokera jest niepoprawna, może nie występować żadne błędy. Dzieje się tak, ponieważ klient Kafka zakłada, że brokerzy ostatecznie znów będą dostępni i w przypadku błędów sieci będzie próbować ponawiać w nieskończoność. |
Podczas odczytywania z platformy Kafka należy również określić jedną z następujących opcji, aby zidentyfikować tematy, które mają być używane:
| Option | Wartość | Opis |
|---|---|---|
subscribe |
Rozdzielona przecinkami lista tematów | Lista tematów do subskrybowania. |
subscribePattern |
Ciąg wyrażenia regularnego języka Java | Wzorzec używany do subskrybowania tematów. |
assign |
Ciąg JSON {"topicA":[0,1],"topicB":[2,4]} |
Określone partycje tematu do korzystania. |
Podczas zapisywania na platformie Kafka możesz opcjonalnie ustawić opcję określania topic tematu docelowego dla wszystkich wierszy. Jeśli nie zostanie ustawiona, ramka danych musi zawierać kolumnę topic .
Typowe opcje czytnika
Podczas odczytywania z platformy Kafka są często używane następujące opcje:
| Option | Wartość | Wartość domyślna | Opis |
|---|---|---|---|
minPartitions |
INT |
none | Minimalna liczba partycji do odczytu z platformy Kafka. Zwykle platforma Spark tworzy jedną partycję na partycję tematu platformy Kafka. Ustawienie tego wyższego podziału dużych partycji platformy Kafka na mniejsze partycje platformy Spark w celu zwiększenia równoległości. Przydatne do obsługi niesymetryczności danych lub szczytowych obciążeń. Uwaga: włączenie tej funkcji ponownie inicjuje odbiorców platformy Kafka w każdym wyzwalaczu, co może mieć wpływ na wydajność podczas korzystania z protokołu SSL. |
maxRecordsPerPartition |
LONG |
none | Maksymalna liczba rekordów na partycję platformy Spark. Po ustawieniu platforma Spark dzieli partycje platformy Kafka, dzięki czemu każda partycja Platformy Spark ma co najwyżej te wiele rekordów. Można używać z parametrem minPartitions; jeśli obie są ustawione, platforma Spark używa zależnie od tego, które wyniki będą zawierać więcej partycji. |
failOnDataLoss |
BOOLEAN |
true |
Czy zapytanie nie powiodło się, jeśli jest możliwe, że dane zostały utracone. Zapytania mogą na stałe nie odczytywać danych z platformy Kafka z różnych powodów, takich jak usunięte wątki, przycięcie wątku przed jego przetworzeniem itp. Staramy się oszacować konserwatywnie, czy dane mogły być utracone, czy nie. Czasami może to spowodować fałszywe alarmy. Ustaw tę opcję na false, jeśli nie działa zgodnie z oczekiwaniami, lub chcesz, aby zapytanie kontynuowało przetwarzanie pomimo utraty danych. |
maxOffsetsPerTrigger |
LONG |
none | [Tylko przesyłanie strumieniowe] Limit szybkości maksymalnej liczby przesunięć przetworzonych na interwał wyzwalacza. Łączna liczba przesunięć jest proporcjonalnie podzielona między partycje tematu. W przypadku bardziej zaawansowanego sterowania przepływem można również użyć minOffsetsPerTrigger (minimalne przesunięcia przed wyzwoleniem) i maxTriggerDelay (maksymalny czas oczekiwania, wartość domyślna 15m). Aby uzyskać szczegółowe informacje, zobacz Przewodnik integracji platformy Spark Kafka . |
startingOffsets |
earliest, latestlub ciąg JSON |
latest |
Określa, gdzie rozpocząć odczytywanie po rozpoczęciu zapytania. Służy earliest do odczytywania od najwcześniejszych dostępnych przesunięć, latest aby odczytywać tylko nowe dane po uruchomieniu strumienia lub ciąg JSON w celu określenia przesunięcia początkowego dla każdej partycji tematu (na przykład {"topicA":{"0":23,"1":-2},"topicB":{"0":-2}}). W formacie JSON -2 odwołuje się do najwcześniejszych i -1 najnowszych.W przypadku zapytań przesyłanych strumieniowo ma to zastosowanie tylko w przypadku uruchomienia nowego zapytania; Wznawianie zawsze pobiera się od miejsca, w którym zapytanie zostało przerwane. Nowo odnalezione partycje zaczynają się od earliest.Uwaga: w przypadku zapytań wsadowych latest (niejawnie lub przy użyciu -1 w formacie JSON) nie jest dozwolone. Aby zamiast tego rozpocząć od określonego znacznika czasu, użyj polecenia startingTimestamp lub startingOffsetsByTimestamp. |
endingOffsets |
latest lub ciąg JSON |
latest |
[Tylko usługa Batch] Punkt końcowy po zakończeniu zapytania wsadowego. Służy latest do odczytywania najnowszych przesunięć lub ciągu JSON w celu określenia przesunięcia końcowego dla każdej partycji tematu (na przykład {"topicA":{"0":50,"1":-1},"topicB":{"0":-1}}). W formacie JSON -1 odwołuje się do najnowszego; -2 (najuchętniejsze) jest niedozwolone. Aby zakończyć zamiast tego określony znacznik czasu, użyj polecenia endingTimestamp lub endingOffsetsByTimestamp. |
groupIdPrefix |
STRING |
spark-kafka-source (przesyłanie strumieniowe) lub spark-kafka-relation (partia) |
Prefiks dla automatycznie generowanego identyfikatora grupy odbiorców. Łącznik automatycznie generuje unikatowy dla group.id każdego zapytania. Ta opcja dostosowuje prefiks tego wygenerowanego identyfikatora. Ignorowane, jeśli kafka.group.id jest ustawiona. |
kafka.group.id |
STRING |
none | Identyfikator grupy do użycia podczas odczytywania z platformy Kafka. Użyj tego z ostrożnością. Domyślnie każde zapytanie generuje unikatowy identyfikator grupy do odczytywania danych. Gwarantuje to, że każde zapytanie ma własną grupę odbiorców, która nie ma wpływu na żadnego innego użytkownika, i dlatego może odczytywać wszystkie partycje subskrybowanych tematów. W niektórych scenariuszach (na przykład autoryzacja oparta na grupach platformy Kafka) możesz użyć określonych autoryzowanych identyfikatorów grup do odczytywania danych. Opcjonalnie można ustawić identyfikator grupy. Jednak zrób to z skrajną ostrożnością, ponieważ może to spowodować nieoczekiwane zachowanie.
|
includeHeaders |
BOOLEAN |
false |
Określa, czy w danych wyjściowych mają być uwzględniane nagłówki komunikatów platformy Kafka. |
bytesEstimateWindowLength |
STRING |
300s |
[Tylko przesyłanie strumieniowe] Przedział czasu służący do szacowania pozostałych bajtów za pośrednictwem estimatedTotalBytesBehindLatest metryki. Akceptuje ciągi czasu trwania, takie jak 10m (10 minut) lub 600s (600 sekund). Zobacz Pobieranie metryk platformy Kafka. |
Typowe opcje modułu zapisywania
Podczas zapisywania na platformie Kafka są często używane następujące opcje:
| Option | Wartość | Wartość domyślna | Opis |
|---|---|---|---|
topic |
STRING |
none | Ustawia temat dla wszystkich wierszy. Spowoduje to zastąpienie dowolnej topic kolumny w danych. |
includeHeaders |
BOOLEAN |
false |
Określa, czy w wierszu mają być uwzględniane nagłówki platformy Kafka. |
Ważna
Środowisko Databricks Runtime 13.3 LTS i nowsze zawiera bardziej aktualną wersję biblioteki kafka-clients, która domyślnie umożliwia idempotentne zapisy. Jeśli ujście platformy Kafka używa wersji 2.8.0 lub nowszej ze skonfigurowanymi listami ACL, ale bez IDEMPOTENT_WRITE włączenia, zapisy nie powiedzą się. Rozwiąż ten problem, uaktualniając do platformy Kafka 2.8.0 lub nowszej albo ustawiając wartość .option("kafka.enable.idempotence", "false").
Opcje uwierzytelniania
Usługa Azure Databricks obsługuje wiele metod uwierzytelniania dla platformy Kafka, w tym poświadczeń usługi wykazu aparatu Unity, sasL/SSL i opcji specyficznych dla chmury dla usług AWS MSK, Azure Event Hubs i Google Cloud Managed Kafka.
Usługa Azure Databricks zaleca używanie poświadczeń usługi wykazu aparatu Unity do uwierzytelniania w usługach Kafka zarządzanych przez chmurę:
| Option | Wartość | Opis |
|---|---|---|
databricks.serviceCredential |
STRING |
Nazwa poświadczenia usługi wykazu aparatu Unity do uwierzytelniania w usługach Kafka zarządzanych przez chmurę (AWS MSK, Azure Event Hubs lub Google Cloud Managed Kafka). Dostępne w środowisku Databricks Runtime 16.1 lub nowszym. |
databricks.serviceCredential.scope |
STRING |
Zakres protokołu OAuth dla poświadczeń usługi. Ustaw tę opcję tylko wtedy, gdy usługa Azure Databricks nie może automatycznie wnioskować o zakresie usługi Kafka. |
W przypadku korzystania z poświadczeń usługi wykazu aparatu Unity nie trzeba określać opcji SASL/SSL, takich jak kafka.sasl.mechanism, kafka.sasl.jaas.configlub kafka.security.protocol.
Typowe opcje SASL/SSL obejmują:
| Option | Wartość | Opis |
|---|---|---|
kafka.security.protocol |
STRING |
Protokół używany do komunikowania się z brokerami (na przykład , SASL_SSL, SSLPLAINTEXT). |
kafka.sasl.mechanism |
STRING |
Mechanizm SASL (na przykład PLAIN, , SCRAM-SHA-256SCRAM-SHA-512, OAUTHBEARER, AWS_MSK_IAM). |
kafka.sasl.jaas.config |
STRING |
Ciąg konfiguracji logowania USŁUGI JAAS. |
kafka.sasl.login.callback.handler.class |
STRING |
W pełni kwalifikowana nazwa klasy programu obsługi wywołania zwrotnego logowania na potrzeby uwierzytelniania SASL. |
kafka.sasl.client.callback.handler.class |
STRING |
W pełni kwalifikowana nazwa klasy programu obsługi wywołania zwrotnego klienta na potrzeby uwierzytelniania SASL. |
kafka.ssl.truststore.location |
STRING |
Lokalizacja pliku magazynu zaufania SSL. |
kafka.ssl.truststore.password |
STRING |
Hasło do pliku magazynu zaufania SSL. |
kafka.ssl.keystore.location |
STRING |
Lokalizacja pliku magazynu kluczy SSL. |
kafka.ssl.keystore.password |
STRING |
Hasło do pliku magazynu kluczy SSL. |
Aby uzyskać pełne instrukcje dotyczące konfigurowania uwierzytelniania, zobacz Uwierzytelnianie.
Dodatkowe zasoby
- Przewodnik dotyczący przesyłania strumieniowego ze strukturą i integracji platformy Kafka (dokumentacja platformy Apache Spark)
- Konfiguracje platformy Apache Kafka