Funkcja delta table przesyła strumieniowo odczyty i zapisy

Usługa Delta Lake jest głęboko zintegrowana z przesyłaniem strumieniowym ze strukturą platformy Spark za pośrednictwem usług readStream i writeStream. Usługa Delta Lake pokonuje wiele ograniczeń zwykle związanych z systemami przesyłania strumieniowego i plikami, w tym:

  • Łączenie małych plików generowanych przez pozyskiwanie małych opóźnień.
  • Obsługa przetwarzania "dokładnie raz" przy użyciu więcej niż jednego strumienia (lub współbieżnych zadań wsadowych).
  • Efektywne odnajdywanie nowych plików podczas korzystania z plików jako źródła strumienia.

Uwaga

W tym artykule opisano używanie tabel usługi Delta Lake jako źródeł przesyłania strumieniowego i ujść. Aby dowiedzieć się, jak ładować dane przy użyciu tabel przesyłania strumieniowego w usłudze Databricks SQL, zobacz Ładowanie danych przy użyciu tabel przesyłania strumieniowego w usłudze Databricks SQL.

Tabela delty jako źródło

Przesyłanie strumieniowe ze strukturą przyrostowo odczytuje tabele różnicowe. Podczas gdy zapytanie przesyłane strumieniowo jest aktywne względem tabeli delty, nowe rekordy są przetwarzane idempotentnie, ponieważ nowe wersje tabeli zatwierdzają tabelę źródłową.

W poniższych przykładach kodu pokazano konfigurowanie odczytu przesyłania strumieniowego przy użyciu nazwy tabeli lub ścieżki pliku.

Python

spark.readStream.table("table_name")

spark.readStream.load("/path/to/table")

Scala

spark.readStream.table("table_name")

spark.readStream.load("/path/to/table")

Ważne

Jeśli schemat tabeli usługi Delta zmieni się po rozpoczęciu odczytu przesyłania strumieniowego względem tabeli, zapytanie zakończy się niepowodzeniem. W przypadku większości zmian schematu można ponownie uruchomić strumień, aby rozwiązać niezgodność schematu i kontynuować przetwarzanie.

W środowisku Databricks Runtime 12.2 LTS i poniżej nie można przesyłać strumieniowo z tabeli delty z włączonym mapowaniem kolumn, które przeszły ewolucję schematu nie addytywnego, na przykład zmiany nazwy lub upuszczania kolumn. Aby uzyskać szczegółowe informacje, zobacz Przesyłanie strumieniowe za pomocą mapowania kolumn i zmian schematu.

Ogranicz szybkość wprowadzania

Dostępne są następujące opcje do kontrolowania mikrosadów:

  • maxFilesPerTrigger: ile nowych plików należy wziąć pod uwagę w każdej mikrosadowej partii. Wartość domyślna to 1000.
  • maxBytesPerTrigger: ile danych jest przetwarzanych w każdej mikrosadowej partii. Ta opcja ustawia wartość "nietrwałą maksymalną", co oznacza, że partia przetwarza w przybliżeniu tę ilość danych i może przetwarzać więcej niż limit, aby zapytanie przesyłane strumieniowo przechodziło do przodu w przypadkach, gdy najmniejsza jednostka wejściowa jest większa niż ten limit. Ta opcja nie jest domyślnie ustawiona.

Jeśli używasz maxBytesPerTrigger w połączeniu z usługą maxFilesPerTrigger, mikrosadowa przetwarza dane do momentu osiągnięcia limitu maxFilesPerTrigger lub maxBytesPerTrigger .

Uwaga

W przypadkach, gdy transakcje tabeli źródłowej są czyszczone z powodu logRetentionDurationkonfiguracji , a zapytanie przesyłane strumieniowo próbuje przetworzyć te wersje, domyślnie zapytanie nie może uniknąć utraty danych. Możesz ustawić false opcję failOnDataLoss ignorowania utraconych danych i kontynuowania przetwarzania.

Przesyłanie strumieniowe źródła danych zmian usługi Delta Lake (CDC)

Usługa Delta Lake zmienia rekordy źródła danych zmiany w tabeli delty, w tym aktualizacje i usuwanie. Po włączeniu można przesyłać strumieniowo ze źródła danych zmian i zapisywać logikę do przetwarzania wstawień, aktualizacji i usuwania do tabel podrzędnych. Mimo że dane wyjściowe zestawienia danych zmian różnią się nieco od opisanej tabeli delty, zapewnia to rozwiązanie do propagowania zmian przyrostowych do tabel podrzędnych w architekturze medalonu.

Ważne

W środowisku Databricks Runtime 12.2 LTS i poniżej nie można przesyłać strumieniowo ze źródła danych zmian dla tabeli delty z włączonym mapowaniem kolumn, które przeszły ewolucję schematu nie addytywnego, na przykład zmiany nazw lub upuszczania kolumn. Zobacz Przesyłanie strumieniowe za pomocą mapowania kolumn i zmian schematu.

Ignorowanie aktualizacji i usuwania

Przesyłanie strumieniowe ze strukturą nie obsługuje danych wejściowych, które nie są dołączane i zgłasza wyjątek, jeśli jakiekolwiek modyfikacje występują w tabeli używanej jako źródło. Istnieją dwie główne strategie radzenia sobie ze zmianami, których nie można automatycznie propagować podrzędnie:

  • Możesz usunąć dane wyjściowe i punkt kontrolny oraz ponownie uruchomić strumień od początku.
  • Możesz ustawić jedną z tych dwóch opcji:
    • ignoreDeletes: ignoruje transakcje, które usuwają dane na granicach partycji.
    • skipChangeCommits: ignoruje transakcje, które usuwają lub modyfikują istniejące rekordy. skipChangeCommits obejmuje ignoreDeletes.

Uwaga

W środowisku Databricks Runtime 12.2 LTS i nowszym skipChangeCommits oznacza wycofanie poprzedniego ustawienia ignoreChanges. W środowisku Databricks Runtime 11.3 LTS i niższym jest ignoreChanges jedyną obsługiwaną opcją.

Semantyka dla ignoreChanges elementu różni się znacznie od skipChangeCommits. Z włączoną opcją ignoreChanges ponownie zapisane pliki danych w tabeli źródłowej są ponownie emitowane po operacji zmiany danych, takiej jak UPDATE, , MERGE INTODELETE(w ramach partycji) lub OVERWRITE. Niezmienione wiersze są często emitowane obok nowych wierszy, więc odbiorcy podrzędni muszą mieć możliwość obsługi duplikatów. Usunięcia nie są propagowane w dół. ignoreChanges obejmuje ignoreDeletes.

skipChangeCommits całkowicie ignoruje operacje zmiany plików. Pliki danych, które zostały przepisane w tabeli źródłowej z powodu operacji zmiany danych, takiej jak UPDATE, MERGE INTO, DELETEi OVERWRITE , są całkowicie ignorowane. Aby odzwierciedlić zmiany w nadrzędnych tabelach źródłowych, należy zaimplementować oddzielną logikę, aby propagować te zmiany.

Obciążenia skonfigurowane z dalszym działaniem ignoreChanges przy użyciu znanych semantyki, ale usługa Databricks zaleca korzystanie ze skipChangeCommits wszystkich nowych obciążeń. Migrowanie obciążeń przy użyciu polecenia ignoreChanges w celu skipChangeCommits wymaga logiki refaktoryzacji.

Przykład

Załóżmy na przykład, że masz tabelę user_events z kolumnami date, user_emaili action podzielonymi na partycje według date. Przesyłasz strumieniowo z user_events tabeli i musisz usunąć z niej dane ze względu na RODO.

Po usunięciu na granicach partycji (czyli w kolumnie partycji) pliki są już podzielone według wartości, WHERE więc usunięcie po prostu usuwa te pliki z metadanych. Po usunięciu całej partycji danych można użyć następujących elementów:

spark.readStream.format("delta")
  .option("ignoreDeletes", "true")
  .load("/tmp/delta/user_events")

Jeśli usuniesz dane w wielu partycjach (w tym przykładzie filtrowanie według user_email), użyj następującej składni:

spark.readStream.format("delta")
  .option("skipChangeCommits", "true")
  .load("/tmp/delta/user_events")

Jeśli zaktualizujesz element user_email za pomocą instrukcji UPDATE , plik zawierający user_email pytanie zostanie przepisany. Użyj polecenia skipChangeCommits , aby zignorować zmienione pliki danych.

Określanie pozycji początkowej

Poniższe opcje umożliwiają określenie punktu początkowego źródła przesyłania strumieniowego usługi Delta Lake bez przetwarzania całej tabeli.

  • startingVersion: wersja usługi Delta Lake do uruchomienia od. Usługa Databricks zaleca pominięcie tej opcji w przypadku większości obciążeń. Jeśli nie jest ustawiona, strumień rozpoczyna się od najnowszej dostępnej wersji, w tym pełnej migawki tabeli w tej chwili.

    Jeśli zostanie określony, strumień odczytuje wszystkie zmiany w tabeli delta, począwszy od określonej wersji (włącznie). Jeśli określona wersja nie jest już dostępna, uruchomienie strumienia nie powiedzie się. Wersje zatwierdzenia można uzyskać z version kolumny danych wyjściowych polecenia DESCRIBE HISTORY .

    Aby zwrócić tylko najnowsze zmiany, określ wartość latest.

  • startingTimestamp: znacznik czasu do rozpoczęcia od. Wszystkie zmiany tabeli zatwierdzone w znaczniku czasu (włącznie) są odczytywane przez czytnik przesyłania strumieniowego. Jeśli podany znacznik czasu poprzedza wszystkie zatwierdzenia tabeli, odczyt przesyłania strumieniowego rozpoczyna się od najwcześniejszego dostępnego znacznika czasu. Jeden z:

    • Ciąg znacznika czasu. Na przykład "2019-01-01T00:00:00.000Z".
    • Ciąg daty. Na przykład "2019-01-01".

Nie można jednocześnie ustawić obu opcji. Zaczynają obowiązywać tylko podczas uruchamiania nowego zapytania przesyłania strumieniowego. Jeśli zapytanie przesyłania strumieniowego zostało uruchomione i postęp został zarejestrowany w punkcie kontrolnym, te opcje są ignorowane.

Ważne

Chociaż źródło przesyłania strumieniowego można uruchomić z określonej wersji lub znacznika czasu, schemat źródła przesyłania strumieniowego jest zawsze najnowszym schematem tabeli delty. Musisz upewnić się, że nie ma niezgodnej zmiany schematu w tabeli delta po określonej wersji lub znaczniku czasu. W przeciwnym razie źródło przesyłania strumieniowego może zwracać nieprawidłowe wyniki podczas odczytywania danych z nieprawidłowym schematem.

Przykład

Załóżmy na przykład, że masz tabelę user_events. Jeśli chcesz odczytać zmiany od wersji 5, użyj:

spark.readStream.format("delta")
  .option("startingVersion", "5")
  .load("/tmp/delta/user_events")

Jeśli chcesz odczytać zmiany od 2018-10-18, użyj:

spark.readStream.format("delta")
  .option("startingTimestamp", "2018-10-18")
  .load("/tmp/delta/user_events")

Przetwarzanie początkowej migawki bez porzucania danych

Uwaga

Ta funkcja jest dostępna w środowisku Databricks Runtime 11.3 LTS lub nowszym. Ta funkcja jest dostępna w publicznej wersji zapoznawczej.

W przypadku używania tabeli delty jako źródła strumienia zapytanie najpierw przetwarza wszystkie dane obecne w tabeli. Tabela delta w tej wersji jest nazywana początkową migawką. Domyślnie pliki danych tabeli delty są przetwarzane na podstawie tego, który plik został ostatnio zmodyfikowany. Jednak czas ostatniej modyfikacji nie musi reprezentować kolejności czasu zdarzenia rekordu.

W stanowym zapytaniu przesyłanym strumieniowym ze zdefiniowanym znakiem wodnym przetwarzanie plików przez czas modyfikacji może spowodować przetworzenie rekordów w niewłaściwej kolejności. Może to prowadzić do upuszczania rekordów jako opóźnionych zdarzeń przez znak wodny.

Możesz uniknąć problemu z usuwaniem danych, włączając następującą opcję:

  • withEventTimeOrder: czy początkowa migawka powinna być przetwarzana z kolejnością czasu zdarzenia.

Po włączeniu kolejności czasu zdarzenia zakres czasu początkowych danych migawki jest podzielony na przedziały czasu. Każda mikrosadowa partia przetwarza zasobnik, filtrując dane w zakresie czasu. Opcje konfiguracji maxFilesPerTrigger i maxBytesPerTrigger nadal mają zastosowanie do kontrolowania rozmiaru mikrobajta, ale tylko w przybliżony sposób ze względu na charakter przetwarzania.

Na poniższej ilustracji przedstawiono ten proces:

Migawka początkowa

Istotne informacje o tej funkcji:

  • Problem z usuwaniem danych występuje tylko wtedy, gdy początkowa migawka delty zapytania przesyłania strumieniowego stanowego jest przetwarzana w domyślnej kolejności.
  • Nie można zmienić withEventTimeOrder po uruchomieniu zapytania strumienia podczas przetwarzania początkowej migawki. Aby ponownie uruchomić polecenie ze withEventTimeOrder zmianą, należy usunąć punkt kontrolny.
  • Jeśli uruchamiasz zapytanie strumienia z włączoną funkcjąEventTimeOrder, nie można obniżyć jej do wersji DBR, która nie obsługuje tej funkcji do momentu ukończenia początkowego przetwarzania migawki. Jeśli musisz obniżyć dół, możesz poczekać na zakończenie początkowej migawki lub usunąć punkt kontrolny i ponownie uruchomić zapytanie.
  • Ta funkcja nie jest obsługiwana w następujących nietypowych scenariuszach:
    • Kolumna czasu zdarzenia jest kolumną wygenerowaną i istnieją przekształcenia niezwiązane z projekcją między źródłem delta i znakiem wodnym.
    • Istnieje znak wodny zawierający więcej niż jedno źródło różnicowe w zapytaniu strumienia.
  • Po włączeniu kolejności czasu zdarzenia wydajność początkowego przetwarzania migawek różnicowych może być niższa.
  • Każda mikrosadowa skanuje początkową migawkę w celu filtrowania danych w odpowiednim zakresie czasu zdarzenia. Aby przyspieszyć akcję filtrowania, zaleca się użycie kolumny źródłowej delty jako czasu zdarzenia, aby można było zastosować pomijanie danych (sprawdź pomijanie danych dla usługi Delta Lake , jeśli ma to zastosowanie). Ponadto partycjonowanie tabeli wzdłuż kolumny czasu zdarzenia może przyspieszyć przetwarzanie. Możesz sprawdzić interfejs użytkownika platformy Spark, aby zobaczyć, ile plików różnicowych jest skanowanych pod kątem określonej mikrosadowej partii.

Przykład

Załóżmy, że masz tabelę user_events z kolumną event_time . Zapytanie przesyłane strumieniowo jest zapytaniem agregacji. Jeśli chcesz mieć pewność, że podczas początkowego przetwarzania migawek nie ma żadnych danych, możesz użyć następujących funkcji:

spark.readStream.format("delta")
  .option("withEventTimeOrder", "true")
  .load("/tmp/delta/user_events")
  .withWatermark("event_time", "10 seconds")

Uwaga

Można również włączyć tę funkcję za pomocą konfiguracji platformy Spark w klastrze, która będzie stosowana do wszystkich zapytań przesyłania strumieniowego: spark.databricks.delta.withEventTimeOrder.enabled true

Tabela delty jako ujście

Dane można również zapisywać w tabeli delty przy użyciu przesyłania strumieniowego ze strukturą. Dziennik transakcji umożliwia usłudze Delta Lake zagwarantowanie dokładnie jednokrotnego przetwarzania, nawet jeśli istnieją inne strumienie lub zapytania wsadowe uruchomione współbieżnie względem tabeli.

Uwaga

Funkcja Delta Lake VACUUM usuwa wszystkie pliki, które nie są zarządzane przez usługę Delta Lake, ale pomija wszystkie katalogi rozpoczynające się od _. Punkty kontrolne można bezpiecznie przechowywać wraz z innymi danymi i metadanymi dla tabeli delty przy użyciu struktury katalogów, takiej jak <table-name>/_checkpoints.

Metryki

Liczbę bajtów i liczbę plików, które nie są jeszcze przetwarzane, można znaleźć w procesie zapytań przesyłanych strumieniowo jako numBytesOutstanding metryki i numFilesOutstanding . Dodatkowe metryki obejmują:

  • numNewListedFiles: liczba plików usługi Delta Lake wymienionych w celu obliczenia listy prac dla tej partii.
    • backlogEndOffset: wersja tabeli używana do obliczania listy prac.

Jeśli używasz strumienia w notesie, możesz zobaczyć te metryki na karcie Nieprzetworzone dane na pulpicie nawigacyjnym postępu zapytania przesyłania strumieniowego:

{
  "sources" : [
    {
      "description" : "DeltaSource[file:/path/to/source]",
      "metrics" : {
        "numBytesOutstanding" : "3456",
        "numFilesOutstanding" : "8"
      },
    }
  ]
}

Tryb dołączania

Domyślnie strumienie są uruchamiane w trybie dołączania, który dodaje nowe rekordy do tabeli.

Możesz użyć metody path:

Python

(events.writeStream
   .format("delta")
   .outputMode("append")
   .option("checkpointLocation", "/tmp/delta/_checkpoints/")
   .start("/delta/events")
)

Scala

events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .start("/tmp/delta/events")

lub metodę toTable w następujący sposób:

Python

(events.writeStream
   .format("delta")
   .outputMode("append")
   .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
   .toTable("events")
)

Scala

events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .toTable("events")

Tryb ukończenia

Możesz również użyć przesyłania strumieniowego ze strukturą, aby zastąpić całą tabelę każdą partią. Przykładem przypadku użycia jest obliczenie podsumowania przy użyciu agregacji:

Python

(spark.readStream
  .format("delta")
  .load("/tmp/delta/events")
  .groupBy("customerId")
  .count()
  .writeStream
  .format("delta")
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .start("/tmp/delta/eventsByCustomer")
)

Scala

spark.readStream
  .format("delta")
  .load("/tmp/delta/events")
  .groupBy("customerId")
  .count()
  .writeStream
  .format("delta")
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .start("/tmp/delta/eventsByCustomer")

Powyższy przykład stale aktualizuje tabelę zawierającą zagregowaną liczbę zdarzeń według klienta.

W przypadku aplikacji z bardziej łagodnymi wymaganiami dotyczącymi opóźnienia można zaoszczędzić zasoby obliczeniowe za pomocą wyzwalaczy jednorazowych. Służą one do aktualizowania tabel agregacji podsumowania w danym harmonogramie, przetwarzania tylko nowych danych, które dotarły od ostatniej aktualizacji.

Wykonywanie sprzężeń statycznych strumieniowo

Możesz polegać na transakcyjnych gwarancjach i protokole przechowywania wersji usługi Delta Lake, aby wykonywać sprzężenia statyczne strumienia. Sprzężenie statyczne strumieniowo łączy najnowszą prawidłową wersję tabeli delta (danych statycznych) do strumienia danych przy użyciu sprzężenia bezstanowego.

Gdy usługa Azure Databricks przetwarza mikrosadową danych w sprzężeniu statycznym strumieniowo, najnowsza prawidłowa wersja danych ze statycznej tabeli delty łączy się z rekordami obecnymi w bieżącej mikrosadowej partii. Ponieważ sprzężenie jest bezstanowe, nie trzeba konfigurować znakowania wodnego i może przetwarzać wyniki z małym opóźnieniem. Dane w statycznej tabeli delty używanej w sprzężeniu powinny być powoli zmieniane.

streamingDF = spark.readStream.table("orders")
staticDF = spark.read.table("customers")

query = (streamingDF
  .join(staticDF, streamingDF.customer_id==staticDF.id, "inner")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .table("orders_with_customer_info")
)

Upsert z zapytań przesyłanych strumieniowo przy użyciu polecenia foreachBatch

Możesz użyć kombinacji elementów i foreachBatch do zapisania złożonych merge operacji upsert z zapytania przesyłania strumieniowego do tabeli delty. Zobacz Używanie polecenia foreachBatch do zapisywania do losowego ujściach danych.

Ten wzorzec zawiera wiele aplikacji, w tym następujące:

  • Zapisuj agregacje przesyłania strumieniowego w trybie aktualizacji: jest to znacznie bardziej wydajne niż tryb ukończenia.
  • Zapis strumienia zmian bazy danych w tabeli delty: kwerenda scalania do zapisywania danych zmian może służyć foreachBatch do ciągłego stosowania strumienia zmian do tabeli delty.
  • Zapisywanie strumienia danych w tabeli delty z deduplikacją: zapytanie scalania tylko do wstawiania dla deduplikacji może służyć foreachBatch do ciągłego zapisywania danych (z duplikatami) do tabeli delty z automatyczną deduplikacją.

Uwaga

  • Upewnij się, że instrukcja merge wewnątrz foreachBatch jest idempotentna, ponieważ ponowne uruchomienia zapytania przesyłania strumieniowego mogą wielokrotnie stosować operację na tej samej partii danych.
  • W merge przypadku użycia w foreachBatchprogramie szybkość danych wejściowych zapytania przesyłania strumieniowego (zgłoszonego i StreamingQueryProgress widocznego na wykresie szybkości notesu) może być zgłaszana jako wielokrotność rzeczywistej szybkości generowania danych w źródle. Dzieje się tak, ponieważ merge odczytuje dane wejściowe wiele razy, co powoduje pomnożenie metryk wejściowych. Jeśli jest to wąskie gardło, możesz buforować partię DataFrame przed merge , a następnie cofnąć jej buforowanie po merge.

W poniższym przykładzie pokazano, jak można użyć programu SQL w programie foreachBatch , aby wykonać to zadanie:

Scala

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  // Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  // Use the view name to apply MERGE
  // NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
  microBatchOutputDF.sparkSession.sql(s"""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .format("delta")
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  # Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  # Use the view name to apply MERGE
  # NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe

  # In Databricks Runtime 10.5 and below, you must use the following:
  # microBatchOutputDF._jdf.sparkSession().sql("""
  microBatchOutputDF.sparkSession.sql("""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .format("delta")
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

Możesz również użyć interfejsów API usługi Delta Lake do wykonywania operacji upsert przesyłania strumieniowego, jak w poniższym przykładzie:

Scala

import io.delta.tables.*

val deltaTable = DeltaTable.forPath(spark, "/data/aggregates")

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  deltaTable.as("t")
    .merge(
      microBatchOutputDF.as("s"),
      "s.key = t.key")
    .whenMatched().updateAll()
    .whenNotMatched().insertAll()
    .execute()
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .format("delta")
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, "/data/aggregates")

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  (deltaTable.alias("t").merge(
      microBatchOutputDF.alias("s"),
      "s.key = t.key")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
  )

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .format("delta")
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

Idempotentne zapisy tabeli w foreachBatch

Uwaga

Usługa Databricks zaleca skonfigurowanie oddzielnego zapisu strumieniowego dla każdego ujścia, który chcesz zaktualizować. Używanie foreachBatch funkcji do zapisywania w wielu tabelach serializuje zapisy, co zmniejsza równoległość i zwiększa ogólne opóźnienie.

Tabele różnicowe obsługują następujące DataFrameWriter opcje tworzenia zapisów w wielu tabelach w foreachBatch ramach idempotentności:

  • txnAppId: unikatowy ciąg, który można przekazać do każdego zapisu ramki danych. Na przykład możesz użyć identyfikatora StreamingQuery jako txnAppId.
  • txnVersion: monotonicznie rosnąca liczba, która działa jako wersja transakcji.

Usługa Delta Lake używa kombinacji elementów i txnVersion do identyfikowania txnAppId zduplikowanych zapisów i ich ignorowania.

Jeśli zapis wsadowy zostanie przerwany z powodu awarii, ponowne uruchomienie partii używa tej samej aplikacji i identyfikatora partii, aby ułatwić środowisku uruchomieniowemu poprawne zidentyfikowanie zduplikowanych zapisów i zignorowanie ich. Identyfikator aplikacji (txnAppId) może być dowolnym unikatowym ciągiem generowanym przez użytkownika i nie musi być powiązany z identyfikatorem strumienia. Zobacz Używanie polecenia foreachBatch do zapisywania do losowego ujściach danych.

Ostrzeżenie

Jeśli usuniesz punkt kontrolny przesyłania strumieniowego i uruchomisz ponownie zapytanie przy użyciu nowego punktu kontrolnego, musisz podać inny txnAppIdelement . Nowe punkty kontrolne zaczynają się od identyfikatora partii .0 Usługa Delta Lake używa identyfikatora partii i txnAppId jako unikatowego klucza i pomija partie z już widocznymi wartościami.

W poniższym przykładzie kodu pokazano ten wzorzec:

Python

app_id = ... # A unique string that is used as an application ID.

def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2

streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()

Scala

val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 1
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 2
}