Delen via


Gegevens incrementeel laden en verwerken met Delta Live Tables-stromen

In dit artikel wordt uitgelegd wat stromen zijn en hoe u stromen in Delta Live Tables-pijplijnen kunt gebruiken om gegevens van een bron naar een doelstreamingtabel incrementeel te verwerken. In Delta Live Tables worden stromen op twee manieren gedefinieerd:

  1. Een stroom wordt automatisch gedefinieerd wanneer u een query maakt waarmee een streamingtabel wordt bijgewerkt.
  2. Delta Live Tables biedt ook functionaliteit om stromen expliciet te definiëren voor complexere verwerking, zoals toevoegen aan een streamingtabel vanuit meerdere streamingbronnen.

In dit artikel worden de impliciete stromen besproken die worden gemaakt wanneer u een query definieert voor het bijwerken van een streamingtabel en vindt u vervolgens details over de syntaxis om complexere stromen te definiëren.

Wat is een stroom?

In Delta Live Tables is een stroom een streamingquery die brongegevens incrementeel verwerkt om een doelstreamingtabel bij te werken. De meeste Delta Live Tables-gegevenssets die u in een pijplijn maakt, definiëren de stroom als onderdeel van de query en hoeven de stroom niet expliciet te definiëren. U maakt bijvoorbeeld een streamingtabel in Delta Live Tables in één DDL-opdracht in plaats van afzonderlijke tabel- en stroominstructies te gebruiken om de streamingtabel te maken:

Notitie

Dit CREATE FLOW voorbeeld is alleen bedoeld voor illustratieve doeleinden en bevat trefwoorden die geen geldige syntaxis van Delta Live Tables zijn.

CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")

-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;

CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");

Naast de standaardstroom die is gedefinieerd door een query, bieden de Python- en SQL-interfaces van Delta Live Tables python en SQL toevoegstroomfunctionaliteit . Toevoegstroom ondersteunt verwerking waarvoor het lezen van gegevens uit meerdere streamingbronnen vereist is om één streamingtabel bij te werken. U kunt bijvoorbeeld toevoegstroomfunctionaliteit gebruiken wanneer u een bestaande streamingtabel en stroom hebt en een nieuwe streamingbron wilt toevoegen die naar deze bestaande streamingtabel schrijft.

Toevoegstroom gebruiken om vanuit meerdere bronstreams naar een streamingtabel te schrijven

Notitie

Als u de verwerking van toevoegstromen wilt gebruiken, moet uw pijplijn zijn geconfigureerd voor het gebruik van het preview-kanaal.

Gebruik de @append_flow decorator in de Python-interface of de CREATE FLOW component in de SQL-interface om vanuit meerdere streamingbronnen naar een streamingtabel te schrijven. Gebruik toevoegstroom voor het verwerken van taken, zoals de volgende:

  • Voeg streamingbronnen toe die gegevens toevoegen aan een bestaande streamingtabel zonder dat u een volledige vernieuwing nodig hebt. U hebt bijvoorbeeld een tabel waarin regionale gegevens worden gecombineerd van elke regio waarin u werkt. Wanneer er nieuwe regio's worden geïmplementeerd, kunt u de nieuwe regiogegevens toevoegen aan de tabel zonder dat u een volledige vernieuwing hoeft uit te voeren. Zie voorbeeld: Schrijven naar een streamingtabel vanuit meerdere Kafka-onderwerpen.
  • Werk een streamingtabel bij door ontbrekende historische gegevens toe te voegen (backfilling). U hebt bijvoorbeeld een bestaande streamingtabel waarnaar wordt geschreven door een Apache Kafka-onderwerp. U hebt ook historische gegevens opgeslagen in een tabel die u precies eenmaal in de streamingtabel hebt ingevoegd en u kunt de gegevens niet streamen omdat uw verwerking een complexe aggregatie omvat voordat u de gegevens invoegt. Zie voorbeeld: Een eenmalige gegevensbackfill uitvoeren.
  • Combineer gegevens uit meerdere bronnen en schrijf naar één streamingtabel in plaats van de UNION component in een query te gebruiken. Als u toevoegstroomverwerking gebruikt in plaats van UNION dat u de doeltabel incrementeel kunt bijwerken zonder een volledige vernieuwingsupdate uit te voeren. Zie voorbeeld: De verwerking van toevoegstromen gebruiken in plaats van UNION.

Het doel voor de recorduitvoer door de verwerking van de toevoegstroom kan een bestaande tabel of een nieuwe tabel zijn. Gebruik voor Python-query's de functie create_streaming_table() om een doeltabel te maken.

Belangrijk

  • Als u beperkingen voor gegevenskwaliteit met verwachtingen wilt definiëren, definieert u de verwachtingen in de doeltabel als onderdeel van de create_streaming_table() functie of in een bestaande tabeldefinitie. U kunt geen verwachtingen definiëren in de @append_flow definitie.
  • Stromen worden geïdentificeerd door een stroomnaam en deze naam wordt gebruikt om streamingcontrolepunten te identificeren. Het gebruik van de stroomnaam om het controlepunt te identificeren betekent het volgende:
    • Als de naam van een bestaande stroom in een pijplijn wordt gewijzigd, wordt het controlepunt niet overgedragen en is de hernoemde stroom een volledig nieuwe stroom.
    • U kunt een stroomnaam niet opnieuw gebruiken in een pijplijn, omdat het bestaande controlepunt niet overeenkomt met de nieuwe stroomdefinitie.

Hier volgt de syntaxis voor @append_flow:

Python

import dlt

dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dlt.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming query>)

SQL

CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.

CREATE FLOW
  flow_name
AS INSERT INTO
  target_table BY NAME
SELECT * FROM
  source;

Voorbeeld: Schrijven naar een streamingtabel vanuit meerdere Kafka-onderwerpen

In de volgende voorbeelden wordt een streamingtabel gemaakt met de naam kafka_target en schrijfbewerkingen naar die streamingtabel vanuit twee Kafka-onderwerpen:

Python

import dlt

dlt.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dlt.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

Zie read_kafka in de sql-taalverwijzing voor meer informatie over de read_kafka() tabelwaardefunctie die wordt gebruikt in de SQL-query's.

Voorbeeld: Een eenmalige gegevensbackfill uitvoeren

In de volgende voorbeelden wordt een query uitgevoerd om historische gegevens toe te voegen aan een streamingtabel:

Notitie

Als u een echte eenmalige backfill wilt garanderen wanneer de backfillquery deel uitmaakt van een pijplijn die op geplande basis of continu wordt uitgevoerd, verwijdert u de query nadat de pijplijn eenmaal is uitgevoerd. Als u nieuwe gegevens wilt toevoegen als deze binnenkomt in de map backfill, laat u de query op zijn plaats.

Python

import dlt

@dlt.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dlt.append_flow(target = "csv_target")
def backfill():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")

SQL

CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  cloud_files(
    "path/to/sourceDir",
    "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO
  csv_target BY NAME
SELECT * FROM
  cloud_files(
    "path/to/backfill/data/dir",
    "csv"
  );

Voorbeeld: Toevoegstroomverwerking gebruiken in plaats van UNION

In plaats van een query met een UNION component te gebruiken, kunt u toevoegstroomquery's gebruiken om meerdere bronnen te combineren en naar één streamingtabel te schrijven. Als u toevoegstroomquery's gebruikt in plaats van UNION dat u vanuit meerdere bronnen aan een streamingtabel kunt toevoegen zonder een volledige vernieuwing uit te voeren.

Het volgende Python-voorbeeld bevat een query die meerdere gegevensbronnen combineert met een UNION component:

@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")

  raw_orders_eu =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")

  return raw_orders_us.union(raw_orders_eu)

De volgende voorbeelden vervangen de UNION query door toevoegstroomquery's:

Python

dlt.create_streaming_table("raw_orders")

@dlt.append_flow(target="raw_orders")
def raw_oders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")

SQL

CREATE OR REFRESH STREAMING TABLE raw_orders;

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  cloud_files(
    "/path/to/orders/us",
    "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  cloud_files(
    "/path/to/orders/eu",
    "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  cloud_files(
    "/path/to/orders/apac",
    "csv"
  );