Läsa in data med Delta Live Tables

Du kan läsa in data från alla datakällor som stöds av Apache Spark på Azure Databricks med delta-livetabeller. Du kan definiera datauppsättningar (tabeller och vyer) i Delta Live Tables mot alla frågor som returnerar en Spark DataFrame, inklusive strömmande DataFrames och Pandas för Spark DataFrames. För datainmatningsuppgifter rekommenderar Databricks att du använder strömningstabeller för de flesta användningsfall. Strömningstabeller är bra för att mata in data från molnobjektlagring med hjälp av Auto Loader eller från meddelandebussar som Kafka. Exemplen nedan visar några vanliga mönster.

Viktigt!

Alla datakällor har inte SQL-stöd. Du kan blanda SQL- och Python-notebook-filer i en Delta Live Tables-pipeline för att använda SQL för alla åtgärder utöver inmatning.

Mer information om hur du arbetar med bibliotek som inte är paketerade i Delta Live Tables som standard finns i Hantera Python-beroenden för Delta Live Tables-pipelines.

Läsa in filer från molnobjektlagring

Databricks rekommenderar att du använder Auto Loader med Delta Live Tables för de flesta datainmatningsuppgifter från molnobjektlagring. Auto Loader och Delta Live Tables är utformade för att stegvis och idempotent läsa in ständigt växande data när de kommer till molnlagringen. I följande exempel används Auto Loader för att skapa datauppsättningar från CSV- och JSON-filer:

Kommentar

För att ladda filer med Auto Loader i en Unity Catalog-aktiverad pipeline måste du använda externa platser. Mer information om hur du använder Unity Catalog med Delta Live Tables finns i Använda Unity Catalog med dina Delta Live Tables-pipelines.

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

Se Vad är SQL-syntax för automatisk inläsning? och automatisk inläsning.

Varning

Om du använder Auto Loader med filaviseringar och kör en fullständig uppdatering för din pipeline eller strömningstabell måste du rensa dina resurser manuellt. Du kan använda CloudFilesResourceManager i en notebook-fil för att utföra rensning.

Läsa in data från en meddelandebuss

Du kan konfigurera Delta Live Tables-pipelines för att mata in data från meddelandebussar med strömmande tabeller. Databricks rekommenderar att du kombinerar strömmande tabeller med kontinuerlig körning och förbättrad autoskalning för att ge den mest effektiva inmatningen för inläsning med låg latens från meddelandebussar. Se Optimera klusteranvändningen av Delta Live Tables-pipelines med förbättrad autoskalning.

Följande kod konfigurerar till exempel en strömmande tabell för att mata in data från Kafka:

import dlt

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "<server:ip>")
      .option("subscribe", "topic1")
      .option("startingOffsets", "latest")
      .load()
  )

Du kan skriva underordnade åtgärder i ren SQL för att utföra direktuppspelningstransformeringar på dessa data, som i följande exempel:

CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
  *
FROM
  STREAM(LIVE.kafka_raw)
WHERE ...

Ett exempel på hur du arbetar med Event Hubs finns i Använda Azure Event Hubs som en Delta Live Tables-datakälla.

Se Konfigurera strömmande datakällor.

Läsa in data från externa system

Delta Live Tables stöder inläsning av data från alla datakällor som stöds av Azure Databricks. Se Anslut till datakällor. Du kan också läsa in externa data med Lakehouse Federation för datakällor som stöds. Eftersom Lakehouse Federation kräver Databricks Runtime 13.3 LTS eller senare, måste din pipeline konfigureras för att använda förhandsgranskningskanalen för att kunna använda Lakehouse Federation.

Vissa datakällor har inte motsvarande stöd i SQL. Om du inte kan använda Lakehouse Federation med någon av dessa datakällor kan du använda en fristående Python-notebook-fil för att mata in data från källan. Den här notebook-filen kan sedan läggas till som ett källbibliotek med SQL-notebook-filer för att skapa en Delta Live Tables-pipeline. I följande exempel deklareras en materialiserad vy för att få åtkomst till det aktuella tillståndet för data i en fjärransluten PostgreSQL-tabell:

import dlt

@dlt.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

Läsa in små eller statiska datauppsättningar från molnobjektlagring

Du kan läsa in små eller statiska datauppsättningar med apache Spark-inläsningssyntax. Delta Live Tables stöder alla filformat som stöds av Apache Spark på Azure Databricks. En fullständig lista finns i Alternativ för dataformat.

Följande exempel visar hur JSON läses in för att skapa Delta Live Tables-tabeller:

Python

@dlt.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH LIVE TABLE clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;

Kommentar

SQL-konstruktionen SELECT * FROM format.`path`; är gemensam för alla SQL-miljöer i Azure Databricks. Det är det rekommenderade mönstret för direkt filåtkomst med SQL med Delta Live Tables.

Åtkomst till autentiseringsuppgifter för lagring på ett säkert sätt med hemligheter i en pipeline

Du kan använda Azure Databricks-hemligheter för att lagra autentiseringsuppgifter som åtkomstnycklar eller lösenord. Om du vill konfigurera hemligheten i din pipeline använder du en Spark-egenskap i klusterkonfigurationen för pipelineinställningar. Se Konfigurera dina beräkningsinställningar.

I följande exempel används en hemlighet för att lagra en åtkomstnyckel som krävs för att läsa indata från ett Azure Data Lake Storage Gen2-lagringskonto (ADLS Gen2) med hjälp av Auto Loader. Du kan använda samma metod för att konfigurera alla hemligheter som krävs av din pipeline, till exempel AWS-nycklar för att komma åt S3 eller lösenordet till ett Apache Hive-metaarkiv.

Mer information om hur du arbetar med Azure Data Lake Storage Gen2 finns i Anslut till Azure Data Lake Storage Gen2 och Blob Storage.

Kommentar

Du måste lägga till prefixet spark.hadoop. i konfigurationsnyckeln spark_conf som anger det hemliga värdet.

{
    "id": "43246596-a63f-11ec-b909-0242ac120002",
    "clusters": [
      {
        "spark_conf": {
          "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
        },
        "autoscale": {
          "min_workers": 1,
          "max_workers": 5,
          "mode": "ENHANCED"
        }
      }
    ],
    "development": true,
    "continuous": false,
    "libraries": [
      {
        "notebook": {
          "path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
        }
      }
    ],
    "name": "DLT quickstart using ADLS2"
}

Replace

  • <storage-account-name> med namnet på ADLS Gen2-lagringskontot.
  • <scope-name> med namnet på Azure Databricks-hemlighetsomfånget.
  • <secret-name> med namnet på nyckeln som innehåller åtkomstnyckeln för Azure-lagringskontot.
import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Replace

  • <container-name> med namnet på containern för Azure-lagringskontot som lagrar indata.
  • <storage-account-name> med namnet på ADLS Gen2-lagringskontot.
  • <path-to-input-dataset> med sökvägen till indatauppsättningen.