Udostępnij przez


Wykonywanie zapytań dotyczących danych przesyłanych strumieniowo

Usługi Azure Databricks można używać do wykonywania zapytań dotyczących źródeł danych przesyłanych strumieniowo przy użyciu przesyłania strumieniowego ze strukturą. Usługa Azure Databricks zapewnia szeroką obsługę obciążeń przesyłania strumieniowego w językach Python i Scala oraz obsługuje większość funkcji przesyłania strumieniowego ze strukturą za pomocą języka SQL.

W poniższych przykładach pokazano użycie ujścia pamięci do ręcznej inspekcji danych przesyłanych strumieniowo podczas interaktywnego programowania w notesach. Ze względu na limity liczby wierszy wyjściowych w interfejsie notesu, możesz nie widzieć wszystkich danych odczytywanych przez zapytania przesyłane strumieniowo. W przypadku obciążeń produkcyjnych należy wyzwalać zapytania przesyłane strumieniowo tylko przez zapisanie ich w tabeli docelowej lub systemie zewnętrznym.

Uwaga

Obsługa języka SQL dla interakcyjnych zapytań na danych przesyłanych strumieniowo jest ograniczona do notatników działających na uniwersalnych zasobach obliczeniowych. Można również użyć języka SQL podczas deklarowania tabel strumieniowych w usłudze Databricks SQL lub Lakeflow Spark Declarative Pipelines. Zobacz Tabele przesyłania strumieniowego i potoki deklaratywne platformy Spark w usłudze Lakeflow.

Wykonywanie zapytań dotyczących danych z systemów przesyłania strumieniowego

Usługa Azure Databricks udostępnia czytniki danych przesyłanych strumieniowo dla następujących systemów przesyłania strumieniowego:

  • Kafka
  • Kineza
  • PubSub
  • Pulsar

Podczas inicjowania zapytań względem tych systemów należy podać szczegóły konfiguracji, które różnią się w zależności od skonfigurowanego środowiska i wybranego systemu. Zobacz Łączniki standardowe w programie Lakeflow Connect.

Typowe obciążenia związane z systemami przesyłania strumieniowego obejmują przesył danych do Lakehouse i przetwarzanie strumieniowe w celu przesyłania danych do systemów zewnętrznych. Aby uzyskać więcej informacji na temat obciążeń przesyłania strumieniowego, zobacz Pojęcia dotyczące przesyłania strumieniowego ze strukturą.

W poniższych przykładach pokazano interakcyjne przesyłanie strumieniowe odczytane z platformy Kafka:

Python

display(spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'latest'
);

Wysyłanie zapytania do tabeli jako do strumienia danych.

Usługa Azure Databricks domyślnie tworzy wszystkie tabele przy użyciu usługi Delta Lake. Podczas wykonywania zapytania transmisji strumieniowej względem tabeli Delta zapytanie automatycznie pobiera nowe rekordy po zatwierdzeniu wersji tabeli. Domyślnie zapytania przesyłane strumieniowo oczekują, że tabele źródłowe będą zawierać tylko dołączone rekordy. Jeśli musisz pracować z danymi przesyłanymi strumieniowo, które zawierają aktualizacje i usunięcia, Databricks zaleca korzystanie z deklaratywnych potoków Lakeflow Spark i AUTO CDC ... INTO. Zobacz Interfejsy API AUTO CDC: upraszczają przechwytywanie zmian danych za pomocą potoków.

W poniższych przykładach pokazano wykonywanie strumieniowego odczytu w trybie interaktywnym z tabeli.

Python

display(spark.readStream.table("table_name"))

SQL

SELECT * FROM STREAM table_name

Wykonywanie zapytań dotyczących danych w magazynie obiektów w chmurze za pomocą modułu automatycznego ładowania

Możesz przesyłać strumieniowo dane z magazynu obiektów w chmurze przy użyciu modułu automatycznego ładowania, łącznika danych w chmurze usługi Azure Databricks. Można używać łącznika z plikami przechowywanymi w woluminach Unity Catalog lub w innych lokalizacjach magazynowania obiektów w chmurze. Usługa Databricks zaleca używanie woluminów do zarządzania dostępem do danych w magazynie obiektów w chmurze. Zobacz Łączenie ze źródłami danych i usługami zewnętrznymi.

Azure Databricks optymalizuje ten łącznik do strumieniowego pobierania danych z magazynu obiektów w chmurze, przechowywanych w popularnych formatach: ustrukturyzowane, częściowo ustrukturyzowane i niestrukturyzowane. Usługa Databricks zaleca przechowywanie pozyskanych danych w niemal nieprzetworzonym formacie, aby zmaksymalizować przepływność i zminimalizować potencjalną utratę danych z powodu uszkodzonych rekordów lub zmian schematu.

Aby uzyskać więcej zaleceń dotyczących pozyskiwania danych z magazynu obiektów w chmurze, zobacz Łączniki standardowe w programie Lakeflow Connect.

W poniższych przykładach pokazano interakcyjne przesyłanie strumieniowe odczytane z katalogu plików JSON w woluminie:

Python

display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))

SQL

SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')