Załaduj dane za pomocą platformy Delta Live Tables

Dane można załadować z dowolnego źródła danych obsługiwanego przez platformę Apache Spark w usłudze Azure Databricks przy użyciu platformy Delta Live Tables. Zestawy danych (tabele i widoki) można zdefiniować w tabelach platformy Delta Live Tables względem dowolnego zapytania zwracającego Spark DataFrame, w tym DataFrames i Pandas przesyłania strumieniowego i biblioteki dla Spark DataFrames. W przypadku zadań pozyskiwania danych usługa Databricks zaleca używanie tabel przesyłania strumieniowego w większości przypadków użycia. Tabele przesyłania strumieniowego są dobre do pozyskiwania danych z magazynu obiektów w chmurze przy użyciu modułu automatycznego ładowania lub z magistrali komunikatów, takich jak Kafka. W poniższych przykładach przedstawiono kilka typowych wzorców.

Ważne

Nie wszystkie źródła danych mają obsługę języka SQL. Notesy SQL i Python można mieszać w potoku Delta Live Tables, aby używać języka SQL dla wszystkich operacji poza pozyskiwaniem.

Aby uzyskać szczegółowe informacje na temat pracy z bibliotekami, które nie są domyślnie spakowane na platformie Delta Live Tables, zobacz Zależności potoku.

Załaduj pliki z magazynu obiektów w chmurze

Usługa Databricks zaleca używanie automatycznego modułu ładującego z tabelami Delta Live Tables w przypadku większości zadań pozyskiwania danych z magazynu obiektów w chmurze. Automatyczne ładowanie i tabele na żywo delty są przeznaczone do przyrostowego i idempotentnego ładowania stale rosnących danych w miarę ich napływu do magazynu w chmurze. W poniższych przykładach użyto narzędzia Auto Loader do tworzenia zestawów danych z plików CSV i JSON:

Uwaga

Aby załadować pliki z automatycznym modułem ładującym w potoku z obsługą wykazu aparatu Unity, należy użyć lokalizacji zewnętrznych. Aby dowiedzieć się więcej o korzystaniu z rozwiązania Unity Catalog z tabelami Delta Live Tables, zobacz Używanie ozwiązania Unity Catalog z potokami platformy Delta Live Tables.

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

Zobacz What is Auto Loader? and Auto Loader SQL syntax (Co to jest moduł automatycznego ładowania? i składnia SQL modułu ładującego automatycznego).

Ostrzeżenie

Jeśli używasz automatycznego modułu ładującego z powiadomieniami o plikach i uruchamiasz pełne odświeżanie potoku lub tabeli przesyłania strumieniowego, musisz ręcznie wyczyścić zasoby. Aby wykonać oczyszczanie, możesz użyć elementu CloudFilesResourceManager w notesie.

Załaduj dane z magistrali komunikatów

Potoki delty tabel na żywo można skonfigurować do pozyskiwania danych z magistrali komunikatów przy użyciu tabel przesyłania strumieniowego. Usługa Databricks zaleca łączenie tabel przesyłania strumieniowego z ciągłym wykonywaniem i ulepszonym skalowaniem automatycznym w celu zapewnienia najbardziej wydajnego pozyskiwania na potrzeby ładowania z magistrali komunikatów o małych opóźnieniach. Zobacz Co to jest rozszerzone skalowanie automatyczne?.

Na przykład poniższy kod konfiguruje tabelę przesyłania strumieniowego w celu pozyskiwania danych z platformy Kafka:

import dlt

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "<server:ip>")
      .option("subscribe", "topic1")
      .option("startingOffsets", "latest")
      .load()
  )

Operacje podrzędne można zapisywać w czystym języku SQL, aby wykonywać przekształcenia przesyłania strumieniowego na tych danych, jak w poniższym przykładzie:

CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
  *
FROM
  STREAM(LIVE.kafka_raw)
WHERE ...

Przykład pracy z usługą Event Hubs można znaleźć w temacie Use Azure Event Hubs as a Delta Live Tables data source (Używanie usługi Azure Event Hubs jako źródła danych delty tabel na żywo).

Zobacz Konfigurowanie źródeł danych przesyłanych strumieniowo.

Załaduj dane z systemów zewnętrznych

Usługa Delta Live Tables obsługuje ładowanie danych z dowolnego źródła danych obsługiwanego przez usługę Azure Databricks. Zobacz Połączenie do źródeł danych. Możesz również załadować dane zewnętrzne przy użyciu usługi Lakehouse Federation dla obsługiwanych źródeł danych. Ponieważ federacja usługi Lakehouse wymaga środowiska Databricks Runtime w wersji 13.1 lub nowszej, aby można było używać federacji usługi Lakehouse, potok musi być skonfigurowany do korzystania z kanału w wersji zapoznawczej.

Niektóre źródła danych nie mają równoważnej obsługi w języku SQL. Jeśli nie możesz użyć federacji lakehouse z jednym z tych źródeł danych, możesz użyć autonomicznego notesu języka Python do pozyskiwania danych ze źródła. Ten notes można następnie dodać jako bibliotekę źródłową z notesami SQL w celu utworzenia potoku delta live tables. Poniższy przykład deklaruje zmaterializowany widok, aby uzyskać dostęp do bieżącego stanu danych w zdalnej tabeli PostgreSQL:

import dlt

@dlt.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

Załaduj małe lub statyczne zestawy danych z magazynu obiektów w chmurze

Małe lub statyczne zestawy danych można załadować przy użyciu składni ładowania platformy Apache Spark. Funkcja Delta Live Tables obsługuje wszystkie formaty plików obsługiwane przez platformę Apache Spark w usłudze Azure Databricks. Aby uzyskać pełną listę, zobacz Opcje formatowania danych.

W poniższych przykładach pokazano ładowanie kodu JSON w celu utworzenia tabel delta live tables:

Python

@dlt.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH LIVE TABLE clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;

Uwaga

Konstrukcja SELECT * FROM format.`path`; SQL jest wspólna dla wszystkich środowisk SQL w usłudze Azure Databricks. Jest to zalecany wzorzec bezpośredniego dostępu do plików przy użyciu języka SQL z tabelami delta live.

Bezpieczny dostęp do poświadczeń magazynu za pomocą wpisów tajnych w potoku

Wpisów tajnych usługi Azure Databricks można używać do przechowywania poświadczeń, takich jak klucze dostępu lub hasła. Aby skonfigurować wpis tajny w potoku, użyj właściwości Spark w konfiguracji klastra ustawień potoku. Zobacz Konfigurowanie ustawień obliczeniowych.

W poniższym przykładzie użyto wpisu tajnego do przechowywania klucza dostępu wymaganego do odczytu danych wejściowych z konta magazynu usługi Azure Data Lake Storage Gen2 (ADLS Gen2) przy użyciu funkcji automatycznego ładowania. Tej samej metody można użyć do skonfigurowania dowolnego wpisu tajnego wymaganego przez potok, na przykład kluczy platformy AWS w celu uzyskania dostępu do usługi S3 lub hasła do magazynu metadanych Apache Hive.

Aby dowiedzieć się więcej na temat pracy z usługą Azure Data Lake Storage Gen2, zobacz Połączenie do usługi Azure Data Lake Storage Gen2 i usługi Blob Storage.

Uwaga

Należy dodać spark.hadoop. prefiks do spark_conf klucza konfiguracji, który ustawia wartość wpisu tajnego.

{
    "id": "43246596-a63f-11ec-b909-0242ac120002",
    "clusters": [
      {
        "spark_conf": {
          "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
        },
        "autoscale": {
          "min_workers": 1,
          "max_workers": 5,
          "mode": "ENHANCED"
        }
      }
    ],
    "development": true,
    "continuous": false,
    "libraries": [
      {
        "notebook": {
          "path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
        }
      }
    ],
    "name": "DLT quickstart using ADLS2"
}

Replace

  • <storage-account-name> z nazwą konta magazynu usługi ADLS Gen2.
  • <scope-name> z nazwą zakresu wpisu tajnego usługi Azure Databricks.
  • <secret-name> z nazwą klucza zawierającego klucz dostępu do konta usługi Azure Storage.
import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Replace

  • <container-name> z nazwą kontenera konta usługi Azure Storage, który przechowuje dane wejściowe.
  • <storage-account-name> z nazwą konta magazynu usługi ADLS Gen2.
  • <path-to-input-dataset> ze ścieżką do wejściowego zestawu danych.