Share via


Adatok betöltése Delta Live Tables használatával

Az Azure Databricksben az Apache Spark által támogatott bármely adatforrásból betölthet adatokat a Delta Live Tables használatával. A Delta Live Tablesben adatkészleteket (táblákat és nézeteket) definiálhat bármely olyan lekérdezéshez, amely Spark DataFrame-et ad vissza, beleértve a streamelési DataFrame-eket és a Pandas for Spark DataFrame-eket is. Az adatbetöltési feladatokhoz a Databricks a streamelési táblák használatát javasolja a legtöbb használati esetben. A streamelési táblák alkalmasak arra, hogy adatokat töltsenek be a felhőbeli objektumtárolóból az Automatikus betöltővel vagy az üzenetbuszokból, például a Kafkából. Az alábbi példák néhány gyakori mintát mutatnak be.

Fontos

Nem minden adatforrás rendelkezik SQL-támogatással. A Delta Live Tables-folyamatban SQL- és Python-jegyzetfüzetek kombinálásával SQL-t használhat a betöltésen túlmenően minden művelethez.

A Delta Live Tablesben alapértelmezés szerint nem csomagolt kódtárak használatával kapcsolatos részletekért lásd : Python-függőségek kezelése a Delta Live Tables-folyamatokhoz.

Fájlok betöltése a felhőobjektum-tárolóból

A Databricks az Automatikus betöltő használatát javasolja Delta Live Tables használatával a legtöbb adatbetöltési feladathoz a felhőobjektum-tárolóból. Az Automatikus betöltő és a Delta Live Table úgy lett kialakítva, hogy fokozatosan és idempotensen betöltse az egyre növekvő adatokat a felhőbeli tárolóba érkezve. Az alábbi példák az Automatikus betöltő használatával hoznak létre adatkészleteket CSV- és JSON-fájlokból:

Feljegyzés

Ahhoz, hogy az automatikus betöltővel töltsön be fájlokat egy Unity Catalog engedélyezett csővezetékbe, a külső helyek címet kell használnia. Ha többet szeretne tudni a Unity Catalog és a Delta Live Tables használatáról, olvassa el a Unity Catalog használata a Delta Live Tables-folyamatokkal című témakört.

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

Lásd : Mi az automatikus betöltő? és az automatikus betöltő SQL-szintaxisa.

Figyelmeztetés

Ha az Automatikus betöltőt fájlértesítésekkel használja, és teljes frissítést futtat a folyamathoz vagy a streamelési táblához, manuálisan kell törölnie az erőforrásokat. A CloudFilesResourceManagert egy jegyzetfüzetben használhatja a törlés végrehajtásához.

Adatok betöltése üzenetbuszból

A Delta Live Tables-folyamatokat úgy konfigurálhatja, hogy streamelő táblákkal betöltse az adatokat az üzenetbuszokból. A Databricks azt javasolja, hogy a streamelési táblákat a folyamatos végrehajtással és a továbbfejlesztett automatikus skálázással kombinálja, hogy a leghatékonyabb betöltést biztosítsa az üzenetbuszokról érkező alacsony késésű betöltéshez. Lásd: A Delta Live Tables-folyamatok fürtkihasználtságának optimalizálása továbbfejlesztett automatikus skálázással.

A következő kód például konfigurál egy streamelési táblát a Kafkából származó adatok betöltéséhez:

import dlt

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "<server:ip>")
      .option("subscribe", "topic1")
      .option("startingOffsets", "latest")
      .load()
  )

Az alsóbb rétegbeli műveleteket tiszta SQL-ben írhatja, hogy streamelési átalakításokat hajtson végre ezen az adatokon, ahogyan az alábbi példában is látható:

CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
  *
FROM
  STREAM(LIVE.kafka_raw)
WHERE ...

Az Event Hubs használatára példa: Az Azure Event Hubs használata Delta Live Tables-adatforrásként.

Lásd: Streamelési adatforrások konfigurálása.

Adatok betöltése külső rendszerekből

A Delta Live Tables támogatja az adatok betöltését az Azure Databricks által támogatott adatforrásokból. Lásd az adatforrások Csatlakozás. Külső adatokat is betölthet a Lakehouse Federation használatával a támogatott adatforrásokhoz. Mivel a Lakehouse-összevonáshoz a Databricks Runtime 13.3 LTS-es vagy újabb verziójára van szükség, a Lakehouse Federation használatához a folyamatot konfigurálni kell az előnézeti csatorna használatára.

Egyes adatforrások nem rendelkeznek egyenértékű támogatással az SQL-ben. Ha nem tudja használni a Lakehouse Federationt ezen adatforrások egyikével, használhat egy különálló Python-jegyzetfüzetet a forrásból származó adatok betöltéséhez. Ez a jegyzetfüzet ezután hozzáadható forrástárként SQL-jegyzetfüzetekkel egy Delta Live Tables-folyamat létrehozásához. Az alábbi példa egy materializált nézetet deklarál egy távoli PostgreSQL-táblában lévő adatok aktuális állapotának eléréséhez:

import dlt

@dlt.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

Kis méretű vagy statikus adathalmazok betöltése a felhőbeli objektumtárolóból

Kis méretű vagy statikus adathalmazokat az Apache Spark betöltési szintaxisával tölthet be. A Delta Live Tables az Apache Spark által az Azure Databricksen támogatott összes fájlformátumot támogatja. A teljes listát az Adatformátum beállításai című témakörben találja.

Az alábbi példák bemutatják a JSON betöltését Delta Live Tables-táblák létrehozásához:

Python

@dlt.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH LIVE TABLE clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;

Feljegyzés

Az SELECT * FROM format.`path`; SQL-szerkezet az Azure Databricks összes SQL-környezetében gyakori. Ez a delta élő táblákkal rendelkező SQL használatával történő közvetlen fájlhozzáférés ajánlott mintája.

Tároló hitelesítő adatainak biztonságos elérése titkos kulcsokkal egy folyamatban

Az Azure Databricks titkos kulcsokkal olyan hitelesítő adatokat tárolhat, mint a hozzáférési kulcsok vagy jelszavak. A folyamat titkos kódjának konfigurálásához használja a Spark tulajdonságot a folyamatbeállítások fürtkonfigurációjában. Lásd a számítási beállítások konfigurálását.

Az alábbi példa egy titkos kulcsot használ egy hozzáférési kulcs tárolásához, amely egy Azure Data Lake Storage Gen2 (ADLS Gen2) tárfiók bemeneti adatainak automatikus betöltő használatával történő beolvasásához szükséges. Ugyanezzel a módszerrel konfigurálhatja a folyamathoz szükséges titkos kulcsokat, például az AWS-kulcsokat az S3 eléréséhez, vagy egy Apache Hive-metaadattár jelszavát.

Az Azure Data Lake Storage Gen2 használatával kapcsolatos további információkért lásd az Azure Data Lake Storage Gen2 és Blob Storage Csatlakozás.

Feljegyzés

Hozzá kell adnia az spark.hadoop. előtagot a spark_conf titkos kulcs értékét meghatározó konfigurációs kulcshoz.

{
    "id": "43246596-a63f-11ec-b909-0242ac120002",
    "clusters": [
      {
        "spark_conf": {
          "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
        },
        "autoscale": {
          "min_workers": 1,
          "max_workers": 5,
          "mode": "ENHANCED"
        }
      }
    ],
    "development": true,
    "continuous": false,
    "libraries": [
      {
        "notebook": {
          "path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
        }
      }
    ],
    "name": "DLT quickstart using ADLS2"
}

Replace

  • <storage-account-name> az ADLS Gen2 tárfiók nevével.
  • <scope-name> az Azure Databricks titkos hatókörének nevével.
  • <secret-name> az Azure Storage-fiók hozzáférési kulcsát tartalmazó kulcs nevével.
import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Replace

  • <container-name> a bemeneti adatokat tároló Azure Storage-fióktároló nevével.
  • <storage-account-name> az ADLS Gen2 tárfiók nevével.
  • <path-to-input-dataset> a bemeneti adathalmaz elérési útjával.

Adatok betöltése az Azure Event Hubsból

Az Azure Event Hubs egy adatfolyam-szolgáltatás, amely Apache Kafka-kompatibilis felületet biztosít. A Delta Live Tables futtatókörnyezetében található strukturált streamelési Kafka-összekötő használatával betöltheti az Azure Event Hubsból érkező üzeneteket. Az Azure Event Hubsból érkező üzenetek betöltéséről és feldolgozásáról további információt az Azure Event Hubs használata Delta Live Tables-adatforrásként című témakörben talál.