Uruchamianie pierwszego obciążenia przesyłania strumieniowego ze strukturą

Ten artykuł zawiera przykłady kodu i wyjaśnienie podstawowych pojęć niezbędnych do uruchamiania pierwszych zapytań przesyłania strumieniowego ze strukturą w usłudze Azure Databricks. Możesz użyć przesyłania strumieniowego ze strukturą na potrzeby obciążeń przetwarzania przyrostowego i niemal w czasie rzeczywistym.

Przesyłanie strumieniowe ze strukturą to jedna z kilku technologii, które zasilają tabele przesyłania strumieniowego w tabelach delta live. Usługa Databricks zaleca używanie tabel delta Live Tables dla wszystkich nowych obciążeń ETL, pozyskiwania i przesyłania strumieniowego ze strukturą. Zobacz Co to jest delta live tables?.

Uwaga

Podczas gdy tabele delta live zapewniają nieco zmodyfikowaną składnię deklarowania tabel przesyłania strumieniowego, ogólna składnia konfigurowania odczytów i przekształceń przesyłania strumieniowego dotyczy wszystkich przypadków użycia przesyłania strumieniowego w usłudze Azure Databricks. Delta Live Tables upraszcza również przesyłanie strumieniowe, zarządzając informacjami o stanie, metadanymi i wieloma konfiguracjami.

Odczytywanie ze strumienia danych

Przesyłanie strumieniowe ze strukturą umożliwia przyrostowe pozyskiwanie danych z obsługiwanych źródeł danych. Niektóre z najbardziej typowych źródeł danych używanych w obciążeniach przesyłania strumieniowego ze strukturą usługi Azure Databricks obejmują następujące elementy:

  • Pliki danych w magazynie obiektów w chmurze
  • Magistrale komunikatów i kolejki
  • Delta Lake

Usługa Databricks zaleca używanie automatycznego modułu ładującego do pozyskiwania strumieniowego z magazynu obiektów w chmurze. Automatyczne ładowanie obsługuje większość formatów plików obsługiwanych przez przesyłanie strumieniowe ze strukturą. Zobacz Co to jest moduł automatycznego ładowania?.

Każde źródło danych udostępnia wiele opcji umożliwiających określenie sposobu ładowania partii danych. Podczas konfigurowania czytnika główne opcje mogą wymagać ustawienia w następujących kategoriach:

  • Opcje określające źródło danych lub format (na przykład typ pliku, ograniczniki i schemat).
  • Opcje służące do konfigurowania dostępu do systemów źródłowych (na przykład ustawień portów i poświadczeń).
  • Opcje określające miejsce rozpoczęcia w strumieniu (na przykład przesunięcie platformy Kafka lub odczytanie wszystkich istniejących plików).
  • Opcje kontrolujące ilość przetwarzanych danych w każdej partii (na przykład maksymalne przesunięcia, pliki lub bajty na partię).

Używanie automatycznego modułu ładującego do odczytywania danych przesyłanych strumieniowo z magazynu obiektów

W poniższym przykładzie pokazano ładowanie danych JSON za pomocą modułu automatycznego ładującego, które używa cloudFiles do oznaczania formatu i opcji. Opcja schemaLocation umożliwia wnioskowanie i ewolucję schematu. Wklej następujący kod w komórce notesu usługi Databricks i uruchom komórkę, aby utworzyć ramkę danych przesyłania strumieniowego o nazwie raw_df:

file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

raw_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .load(file_path)
)

Podobnie jak inne operacje odczytu w usłudze Azure Databricks, konfigurowanie odczytu przesyłania strumieniowego w rzeczywistości nie powoduje załadowania danych. Musisz wyzwolić akcję na danych przed rozpoczęciem strumienia.

Uwaga

Wywołanie display() ramki danych przesyłania strumieniowego uruchamia zadanie przesyłania strumieniowego. W przypadku większości przypadków użycia przesyłania strumieniowego ze strukturą akcja, która wyzwala strumień, powinna zapisywać dane w ujściu. Zobacz Przygotowywanie kodu przesyłania strumieniowego ze strukturą do środowiska produkcyjnego.

Wykonywanie transformacji przesyłania strumieniowego

Przesyłanie strumieniowe ze strukturą obsługuje większość przekształceń dostępnych w usługach Azure Databricks i Spark SQL. Modele MLflow można nawet załadować jako funkcje zdefiniowane przez użytkownika i przewidywać przesyłanie strumieniowe jako transformację.

Poniższy przykład kodu wykonuje prostą transformację, aby wzbogacić pozyskane dane JSON o dodatkowe informacje przy użyciu funkcji Spark SQL:

from pyspark.sql.functions import col, current_timestamp

transformed_df = (raw_df.select(
    "*",
    col("_metadata.file_path").alias("source_file"),
    current_timestamp().alias("processing_time")
    )
)

transformed_df Wynik zawiera instrukcje zapytania dotyczące ładowania i przekształcania każdego rekordu w miarę ich nadejścia w źródle danych.

Uwaga

Przesyłanie strumieniowe ze strukturą traktuje źródła danych jako niezwiązane lub nieskończone zestawy danych. W związku z tym niektóre przekształcenia nie są obsługiwane w obciążeniach przesyłania strumieniowego ze strukturą, ponieważ wymagają one sortowania nieskończonej liczby elementów.

Większość agregacji i wielu sprzężeń wymaga zarządzania informacjami o stanie za pomocą znaków wodnych, okien i trybu danych wyjściowych. Zobacz Stosowanie znaków wodnych, aby kontrolować progi przetwarzania danych.

Zapisywanie w ujściu danych

Ujście danych jest celem operacji zapisu przesyłania strumieniowego. Typowe ujścia używane w obciążeniach przesyłania strumieniowego usługi Azure Databricks obejmują następujące elementy:

  • Delta Lake
  • Magistrale komunikatów i kolejki
  • Bazy danych par klucz-wartość

Podobnie jak w przypadku źródeł danych, większość ujść danych zapewnia wiele opcji kontrolowania sposobu zapisywania danych w systemie docelowym. Podczas konfigurowania modułu zapisywania główne opcje mogą być konieczne do ustawienia w następujących kategoriach:

  • Tryb danych wyjściowych (domyślnie dołączaj).
  • Lokalizacja punktu kontrolnego (wymagana dla każdego składnika zapisywania).
  • Interwały wyzwalaczy; Zobacz Konfigurowanie interwałów wyzwalacza przesyłania strumieniowego ze strukturą.
  • Opcje określające ujście lub format danych (na przykład typ pliku, ograniczniki i schemat).
  • Opcje służące do konfigurowania dostępu do systemów docelowych (na przykład ustawień portów i poświadczeń).

Wykonywanie przyrostowego zapisu wsadowego w usłudze Delta Lake

Poniższy przykład zapisuje w usłudze Delta Lake przy użyciu określonej ścieżki pliku i punktu kontrolnego.

Ważne

Zawsze upewnij się, że określono unikatową lokalizację punktu kontrolnego dla każdego skonfigurowanego składnika zapisywania przesyłania strumieniowego. Punkt kontrolny zapewnia unikatową tożsamość strumienia, śledząc wszystkie przetworzone rekordy i informacje o stanie skojarzone z zapytaniem przesyłanym strumieniowo.

Ustawienie availableNow wyzwalacza powoduje, że przesyłanie strumieniowe ze strukturą przetwarza wszystkie wcześniej nieprzetworzone rekordy z źródłowego zestawu danych, a następnie zamyka, aby można było bezpiecznie wykonać następujący kod bez obaw o pozostawienie strumienia uruchomionego:

target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

transformed_df.writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .option("path", target_path)
    .start()

W tym przykładzie żadne nowe rekordy nie docierają do naszego źródła danych, więc powtórzenie wykonywania tego kodu nie powoduje pozyskiwania nowych rekordów.

Ostrzeżenie

Wykonywanie przesyłania strumieniowego ze strukturą może uniemożliwić automatyczne zakończenie zamykania zasobów obliczeniowych. Aby uniknąć nieoczekiwanych kosztów, pamiętaj o przerwaniu zapytań przesyłania strumieniowego.

Przygotowywanie kodu przesyłania strumieniowego ze strukturą do środowiska produkcyjnego

Usługa Databricks zaleca używanie tabel delta Live Tables w przypadku większości obciążeń przesyłania strumieniowego ze strukturą. Poniższe zalecenia stanowią punkt wyjścia do przygotowywania obciążeń przesyłania strumieniowego ze strukturą do środowiska produkcyjnego:

  • Usuń niepotrzebny kod z notesów, które będą zwracać wyniki, takie jak display i count.
  • Nie uruchamiaj obciążeń przesyłania strumieniowego ze strukturą w klastrach interaktywnych; zawsze zaplanuj strumienie jako zadania.
  • Aby ułatwić automatyczne odzyskiwanie zadań przesyłania strumieniowego, skonfiguruj zadania z nieskończonymi ponownymi próbami.
  • Nie używaj automatycznego skalowania dla obciążeń ze strukturą przesyłania strumieniowego.

Aby uzyskać więcej zaleceń, zobacz Zagadnienia dotyczące produkcji przesyłania strumieniowego ze strukturą.

Odczytywanie danych z usługi Delta Lake, przekształcanie i zapisywanie w usłudze Delta Lake

Usługa Delta Lake ma rozbudowaną obsługę pracy z przesyłaniem strumieniowym ze strukturą jako źródłem i ujściem. Zobacz Delta table streaming reads and writes (Odczyty i zapisy w tabeli delty).

W poniższym przykładzie pokazano przykładową składnię umożliwiającą przyrostowe ładowanie wszystkich nowych rekordów z tabeli delty, łączenie ich z migawką innej tabeli delty i zapisywanie ich w tabeli delty:

(spark.readStream
    .table("<table-name1>")
    .join(spark.read.table("<table-name2>"), on="<id>", how="left")
    .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", "<checkpoint-path>")
    .toTable("<table-name3>")
)

Musisz mieć odpowiednie uprawnienia skonfigurowane do odczytu tabel źródłowych i zapisu w tabelach docelowych oraz określonej lokalizacji punktu kontrolnego. Wypełnij wszystkie parametry oznaczone nawiasami kątowymi (<>) przy użyciu odpowiednich wartości dla źródeł danych i ujść.

Uwaga

Delta Live Tables zapewnia w pełni deklaratywną składnię tworzenia potoków usługi Delta Lake i automatycznie zarządza właściwościami, takimi jak wyzwalacze i punkty kontrolne. Zobacz Co to jest delta live tables?.

Odczytywanie danych z platformy Kafka, przekształcanie i zapisywanie na platformie Kafka

Platforma Apache Kafka i inne magistrale obsługi komunikatów zapewniają jedne z najniższych opóźnień dostępnych dla dużych zestawów danych. Za pomocą usługi Azure Databricks można zastosować przekształcenia do danych pozyskanych z platformy Kafka, a następnie zapisywać dane z powrotem na platformie Kafka.

Uwaga

Zapisywanie danych w magazynie obiektów w chmurze zwiększa dodatkowe obciążenie związane z opóźnieniami. Jeśli chcesz przechowywać dane z magistrali komunikatów w usłudze Delta Lake, ale wymagają najmniejszego możliwego opóźnienia w przypadku obciążeń przesyłanych strumieniowo, usługa Databricks zaleca skonfigurowanie oddzielnych zadań przesyłania strumieniowego w celu pozyskiwania danych do usługi Lakehouse i stosowania przekształceń niemal w czasie rzeczywistym na potrzeby ujścia magistrali komunikatów podrzędnych.

Poniższy przykład kodu przedstawia prosty wzorzec wzbogacania danych z platformy Kafka przez dołączenie ich do danych w tabeli delty, a następnie zapisanie z powrotem na platformie Kafka:

(spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "<topic>")
    .option("startingOffsets", "latest")
    .load()
    .join(spark.read.table("<table-name>"), on="<id>", how="left")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("topic", "<topic>")
    .option("checkpointLocation", "<checkpoint-path>")
    .start()
)

Musisz mieć odpowiednie uprawnienia skonfigurowane do uzyskiwania dostępu do usługi Kafka. Wypełnij wszystkie parametry oznaczone nawiasami kątowymi (<>) przy użyciu odpowiednich wartości dla źródeł danych i ujść. Zobacz Przetwarzanie strumieniowe przy użyciu platform Apache Kafka i Azure Databricks.