Kör din första arbetsbelastning för strukturerad direktuppspelning

Den här artikeln innehåller kodexempel och förklaring av grundläggande begrepp som krävs för att köra dina första strukturerade direktuppspelningsfrågor på Azure Databricks. Du kan använda Structured Streaming för arbetsbelastningar för nästan realtidsbearbetning och inkrementell bearbetning.

Strukturerad direktuppspelning är en av flera tekniker som driver strömningstabeller i Delta Live Tables. Databricks rekommenderar att du använder Delta Live Tables för alla nya ETL-, inmatnings- och strukturerade strömningsarbetsbelastningar. Se Vad är Delta Live Tables?.

Kommentar

Delta Live Tables har en något ändrad syntax för att deklarera strömmande tabeller, men den allmänna syntaxen för att konfigurera läsningar och transformeringar för direktuppspelning gäller för alla användningsfall för strömning i Azure Databricks. Delta Live Tables förenklar också strömning genom att hantera tillståndsinformation, metadata och många konfigurationer.

Läsa från en dataström

Du kan använda Structured Streaming för att inkrementellt mata in data från datakällor som stöds. Några av de vanligaste datakällorna som används i Azure Databricks Structured Streaming-arbetsbelastningar är följande:

  • Datafiler i molnobjektlagring
  • Meddelandebussar och köer
  • Data Lake

Databricks rekommenderar att du använder Automatisk inläsning för strömmande inmatning från molnobjektlagring. Automatisk inläsning stöder de flesta filformat som stöds av strukturerad direktuppspelning. Se Vad är automatisk inläsare?.

Varje datakälla innehåller ett antal alternativ för att ange hur du läser in batchar med data. Under läsarkonfigurationen kan de viktigaste alternativen som du kan behöva ange ingå i följande kategorier:

  • Alternativ som anger datakällan eller formatet (till exempel filtyp, avgränsare och schema).
  • Alternativ som konfigurerar åtkomst till källsystem (till exempel portinställningar och autentiseringsuppgifter).
  • Alternativ som anger var du ska börja i en ström (till exempel Kafka förskjuter eller läser alla befintliga filer).
  • Alternativ som styr hur mycket data som bearbetas i varje batch (till exempel maximala förskjutningar, filer eller byte per batch).

Använda Automatisk inläsning för att läsa strömmande data från objektlagring

I följande exempel visas hur du läser in JSON-data med Auto Loader, som används cloudFiles för att ange format och alternativ. Alternativet schemaLocation aktiverar schemainferens och utveckling. Klistra in följande kod i en Databricks Notebook-cell och kör cellen för att skapa en strömmande DataFrame med namnet raw_df:

file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

raw_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .load(file_path)
)

Precis som andra läsåtgärder i Azure Databricks läser konfigurationen av en strömmande läsning inte in data. Du måste utlösa en åtgärd på data innan strömmen börjar.

Kommentar

Att anropa display() på en strömmande DataFrame startar ett direktuppspelningsjobb. För de flesta användningsfall för strukturerad direktuppspelning bör åtgärden som utlöser en ström skriva data till en mottagare. Se Förbereda din kod för strukturerad direktuppspelning för produktion.

Utföra en direktuppspelningstransformering

Strukturerad direktuppspelning stöder de flesta transformeringar som är tillgängliga i Azure Databricks och Spark SQL. Du kan till och med läsa in MLflow-modeller som UDF:er och göra förutsägelser för strömning som en transformering.

I följande kodexempel slutförs en enkel transformering för att utöka inmatade JSON-data med ytterligare information med hjälp av Spark SQL-funktioner:

from pyspark.sql.functions import col, current_timestamp

transformed_df = (raw_df.select(
    "*",
    col("_metadata.file_path").alias("source_file"),
    current_timestamp().alias("processing_time")
    )
)

Resultatet transformed_df innehåller frågeinstruktioner för att läsa in och transformera varje post när den kommer till datakällan.

Kommentar

Strukturerad direktuppspelning behandlar datakällor som obundna eller oändliga datamängder. Därför stöds inte vissa transformeringar i arbetsbelastningar för strukturerad direktuppspelning eftersom de skulle kräva sortering av ett oändligt antal objekt.

De flesta sammansättningar och många kopplingar kräver hantering av tillståndsinformation med vattenstämplar, fönster och utdataläge. Se Tillämpa vattenstämplar för att kontrollera tröskelvärden för databehandling.

Skriva till en datamottagare

En datamottagare är målet för en direktuppspelningsskrivningsåtgärd. Vanliga mottagare som används i strömningsarbetsbelastningar i Azure Databricks är följande:

  • Data Lake
  • Meddelandebussar och köer
  • Nyckelvärdesdatabaser

Precis som med datakällor ger de flesta datamottagare ett antal alternativ för att styra hur data skrivs till målsystemet. Under skrivarkonfigurationen kan de viktigaste alternativen som du kan behöva ange ingå i följande kategorier:

  • Utdataläge (lägg till som standard).
  • En kontrollpunktsplats (krävs för varje skrivare).
  • Utlösarintervall; se Konfigurera utlösarintervall för strukturerad direktuppspelning.
  • Alternativ som anger datamottagaren eller formatet (till exempel filtyp, avgränsare och schema).
  • Alternativ som konfigurerar åtkomst till målsystem (till exempel portinställningar och autentiseringsuppgifter).

Utföra en inkrementell batchskrivning till Delta Lake

I följande exempel skrivs till Delta Lake med en angiven filsökväg och kontrollpunkt.

Viktigt!

Se alltid till att du anger en unik kontrollpunktsplats för varje strömmande skrivare som du konfigurerar. Kontrollpunkten innehåller den unika identiteten för din dataström och spårar alla poster som bearbetas och tillståndsinformation som är associerad med din strömmande fråga.

Inställningen availableNow för utlösaren instruerar Structured Streaming att bearbeta alla tidigare obearbetade poster från källdatauppsättningen och sedan stänga av, så att du på ett säkert sätt kan köra följande kod utan att behöva oroa dig för att lämna en ström igång:

target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

transformed_df.writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .option("path", target_path)
    .start()

I det här exemplet kommer inga nya poster till vår datakälla, så upprepad körning av den här koden matar inte in nya poster.

Varning

Strukturerad strömningskörning kan förhindra automatisk avslutning från att stänga av beräkningsresurser. För att undvika oväntade kostnader måste du avsluta direktuppspelningsfrågor.

Förbereda din structured streaming-kod för produktion

Databricks rekommenderar att du använder Delta Live Tables för de flesta strukturerade strömningsarbetsbelastningar. Följande rekommendationer ger en startpunkt för att förbereda strukturerade strömningsarbetsbelastningar för produktion:

  • Ta bort onödig kod från notebook-filer som returnerar resultat, till exempel display och count.
  • Kör inte structured streaming-arbetsbelastningar på interaktiva kluster. schemalägg alltid strömmar som jobb.
  • Konfigurera jobb med oändliga återförsök för att hjälpa strömmande jobb att återställas automatiskt.
  • Använd inte automatisk skalning för arbetsbelastningar med strukturerad direktuppspelning.

Fler rekommendationer finns i Produktionsöverväganden för strukturerad direktuppspelning.

Läsa data från Delta Lake, transformera och skriva till Delta Lake

Delta Lake har omfattande stöd för att arbeta med Structured Streaming som både källa och mottagare. Se Delta table streaming reads and writes (Delta table streaming reads and writes).

I följande exempel visas exempelsyntax för inkrementell inläsning av alla nya poster från en Delta-tabell, koppla dem till en ögonblicksbild av en annan Delta-tabell och skriva dem till en Delta-tabell:

(spark.readStream
    .table("<table-name1>")
    .join(spark.read.table("<table-name2>"), on="<id>", how="left")
    .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", "<checkpoint-path>")
    .toTable("<table-name3>")
)

Du måste ha rätt behörigheter konfigurerade för att läsa källtabeller och skriva till måltabeller och den angivna kontrollpunktsplatsen. Fyll i alla parametrar som anges med vinkelparenteser (<>) med relevanta värden för dina datakällor och mottagare.

Kommentar

Delta Live Tables ger en helt deklarativ syntax för att skapa Delta Lake-pipelines och hanterar egenskaper som utlösare och kontrollpunkter automatiskt. Se Vad är Delta Live Tables?.

Läsa data från Kafka, transformera och skriva till Kafka

Apache Kafka och andra meddelandebussar ger några av de lägsta svarstiderna som är tillgängliga för stora datamängder. Du kan använda Azure Databricks för att tillämpa transformeringar på data som matas in från Kafka och sedan skriva data tillbaka till Kafka.

Kommentar

Om du skriver data till molnobjektlagring läggs ytterligare svarstid till. Om du vill lagra data från en meddelandebuss i Delta Lake men kräver lägsta möjliga svarstid för strömmande arbetsbelastningar rekommenderar Databricks att du konfigurerar separata direktuppspelningsjobb för att mata in data till lakehouse och tillämpa transformeringar i nära realtid för nedströms meddelandebussmottagare.

I följande kodexempel visas ett enkelt mönster för att berika data från Kafka genom att koppla dem till data i en Delta-tabell och sedan skriva tillbaka till Kafka:

(spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "<topic>")
    .option("startingOffsets", "latest")
    .load()
    .join(spark.read.table("<table-name>"), on="<id>", how="left")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("topic", "<topic>")
    .option("checkpointLocation", "<checkpoint-path>")
    .start()
)

Du måste ha rätt behörigheter konfigurerade för åtkomst till Kafka-tjänsten. Fyll i alla parametrar som anges med vinkelparenteser (<>) med relevanta värden för dina datakällor och mottagare. Se Stream-bearbetning med Apache Kafka och Azure Databricks.