Funkcja delta table przesyła strumieniowo odczyty i zapisy
Usługa Delta Lake jest głęboko zintegrowana z przesyłaniem strumieniowym ze strukturą platformy Spark za pośrednictwem usług readStream
i writeStream
. Usługa Delta Lake pokonuje wiele ograniczeń zwykle związanych z systemami przesyłania strumieniowego i plikami, w tym:
- Łączenie małych plików generowanych przez pozyskiwanie małych opóźnień.
- Obsługa przetwarzania "dokładnie raz" przy użyciu więcej niż jednego strumienia (lub współbieżnych zadań wsadowych).
- Efektywne odnajdywanie nowych plików podczas korzystania z plików jako źródła strumienia.
Uwaga
W tym artykule opisano używanie tabel usługi Delta Lake jako źródeł przesyłania strumieniowego i ujść. Aby dowiedzieć się, jak ładować dane przy użyciu tabel przesyłania strumieniowego w usłudze Databricks SQL, zobacz Ładowanie danych przy użyciu tabel przesyłania strumieniowego w usłudze Databricks SQL.
Aby uzyskać informacje na temat sprzężeń statycznych strumieniowych za pomocą usługi Delta Lake, zobacz Stream-static joins (Sprzężenia statyczne strumienia).
Tabela delty jako źródło
Przesyłanie strumieniowe ze strukturą przyrostowo odczytuje tabele różnicowe. Podczas gdy zapytanie przesyłane strumieniowo jest aktywne względem tabeli delty, nowe rekordy są przetwarzane idempotentnie, ponieważ nowe wersje tabeli zatwierdzają tabelę źródłową.
W poniższych przykładach kodu pokazano konfigurowanie odczytu przesyłania strumieniowego przy użyciu nazwy tabeli lub ścieżki pliku.
Python
spark.readStream.table("table_name")
spark.readStream.load("/path/to/table")
Scala
spark.readStream.table("table_name")
spark.readStream.load("/path/to/table")
Ważne
Jeśli schemat tabeli usługi Delta zmieni się po rozpoczęciu odczytu przesyłania strumieniowego względem tabeli, zapytanie zakończy się niepowodzeniem. W przypadku większości zmian schematu można ponownie uruchomić strumień, aby rozwiązać niezgodność schematu i kontynuować przetwarzanie.
W środowisku Databricks Runtime 12.2 LTS i poniżej nie można przesyłać strumieniowo z tabeli delty z włączonym mapowaniem kolumn, które przeszły ewolucję schematu nie addytywnego, na przykład zmiany nazwy lub upuszczania kolumn. Aby uzyskać szczegółowe informacje, zobacz Przesyłanie strumieniowe za pomocą mapowania kolumn i zmian schematu.
Ogranicz szybkość wprowadzania
Dostępne są następujące opcje do kontrolowania mikrosadów:
maxFilesPerTrigger
: ile nowych plików należy wziąć pod uwagę w każdej mikrosadowej partii. Wartość domyślna to 1000.maxBytesPerTrigger
: ile danych jest przetwarzanych w każdej mikrosadowej partii. Ta opcja ustawia wartość "nietrwałą maksymalną", co oznacza, że partia przetwarza w przybliżeniu tę ilość danych i może przetwarzać więcej niż limit, aby zapytanie przesyłane strumieniowo przechodziło do przodu w przypadkach, gdy najmniejsza jednostka wejściowa jest większa niż ten limit. Ta opcja nie jest domyślnie ustawiona.
Jeśli używasz maxBytesPerTrigger
w połączeniu z usługą maxFilesPerTrigger
, mikrosadowa przetwarza dane do momentu osiągnięcia limitu maxFilesPerTrigger
lub maxBytesPerTrigger
.
Uwaga
W przypadkach, gdy transakcje tabeli źródłowej są czyszczone z powodu logRetentionDuration
konfiguracji , a zapytanie przesyłane strumieniowo próbuje przetworzyć te wersje, domyślnie zapytanie nie może uniknąć utraty danych. Możesz ustawić false
opcję failOnDataLoss
ignorowania utraconych danych i kontynuowania przetwarzania.
Przesyłanie strumieniowe źródła danych zmian usługi Delta Lake (CDC)
Usługa Delta Lake zmienia rekordy źródła danych zmiany w tabeli delty, w tym aktualizacje i usuwanie. Po włączeniu można przesyłać strumieniowo ze źródła danych zmian i zapisywać logikę do przetwarzania wstawień, aktualizacji i usuwania do tabel podrzędnych. Mimo że dane wyjściowe zestawienia danych zmian różnią się nieco od opisanej tabeli delty, zapewnia to rozwiązanie do propagowania zmian przyrostowych do tabel podrzędnych w architekturze medalonu.
Ważne
W środowisku Databricks Runtime 12.2 LTS i poniżej nie można przesyłać strumieniowo ze źródła danych zmian dla tabeli delty z włączonym mapowaniem kolumn, które przeszły ewolucję schematu nie addytywnego, na przykład zmiany nazw lub upuszczania kolumn. Zobacz Przesyłanie strumieniowe za pomocą mapowania kolumn i zmian schematu.
Ignorowanie aktualizacji i usuwania
Przesyłanie strumieniowe ze strukturą nie obsługuje danych wejściowych, które nie są dołączane i zgłasza wyjątek, jeśli jakiekolwiek modyfikacje występują w tabeli używanej jako źródło. Istnieją dwie główne strategie radzenia sobie ze zmianami, których nie można automatycznie propagować podrzędnie:
- Możesz usunąć dane wyjściowe i punkt kontrolny oraz ponownie uruchomić strumień od początku.
- Możesz ustawić jedną z tych dwóch opcji:
ignoreDeletes
: ignoruje transakcje, które usuwają dane na granicach partycji.skipChangeCommits
: ignoruje transakcje, które usuwają lub modyfikują istniejące rekordy.skipChangeCommits
obejmujeignoreDeletes
.
Uwaga
W środowisku Databricks Runtime 12.2 LTS i nowszym skipChangeCommits
oznacza wycofanie poprzedniego ustawienia ignoreChanges
. W środowisku Databricks Runtime 11.3 LTS i niższym jest ignoreChanges
jedyną obsługiwaną opcją.
Semantyka dla ignoreChanges
elementu różni się znacznie od skipChangeCommits
. Z włączoną opcją ignoreChanges
ponownie zapisane pliki danych w tabeli źródłowej są ponownie emitowane po operacji zmiany danych, takiej jak UPDATE
, , MERGE INTO
DELETE
(w ramach partycji) lub OVERWRITE
. Niezmienione wiersze są często emitowane obok nowych wierszy, więc odbiorcy podrzędni muszą mieć możliwość obsługi duplikatów. Usunięcia nie są propagowane w dół. ignoreChanges
obejmuje ignoreDeletes
.
skipChangeCommits
całkowicie ignoruje operacje zmiany plików. Pliki danych, które zostały przepisane w tabeli źródłowej z powodu operacji zmiany danych, takiej jak UPDATE
, MERGE INTO
, DELETE
i OVERWRITE
, są całkowicie ignorowane. Aby odzwierciedlić zmiany w nadrzędnych tabelach źródłowych, należy zaimplementować oddzielną logikę, aby propagować te zmiany.
Obciążenia skonfigurowane z dalszym działaniem ignoreChanges
przy użyciu znanych semantyki, ale usługa Databricks zaleca korzystanie ze skipChangeCommits
wszystkich nowych obciążeń. Migrowanie obciążeń przy użyciu polecenia ignoreChanges
w celu skipChangeCommits
wymaga logiki refaktoryzacji.
Przykład
Załóżmy na przykład, że masz tabelę user_events
z kolumnami date
, user_email
i action
podzielonymi na partycje według date
. Przesyłasz strumieniowo z user_events
tabeli i musisz usunąć z niej dane ze względu na RODO.
Po usunięciu na granicach partycji (czyli w kolumnie partycji) pliki są już podzielone według wartości, WHERE
więc usunięcie po prostu usuwa te pliki z metadanych. Po usunięciu całej partycji danych można użyć następujących elementów:
spark.readStream
.option("ignoreDeletes", "true")
.table("user_events")
Jeśli usuniesz dane w wielu partycjach (w tym przykładzie filtrowanie według user_email
), użyj następującej składni:
spark.readStream
.option("skipChangeCommits", "true")
.table("user_events")
Jeśli zaktualizujesz element user_email
za pomocą instrukcji UPDATE
, plik zawierający user_email
pytanie zostanie przepisany. Użyj polecenia skipChangeCommits
, aby zignorować zmienione pliki danych.
Określanie pozycji początkowej
Poniższe opcje umożliwiają określenie punktu początkowego źródła przesyłania strumieniowego usługi Delta Lake bez przetwarzania całej tabeli.
startingVersion
: wersja usługi Delta Lake do uruchomienia od. Usługa Databricks zaleca pominięcie tej opcji w przypadku większości obciążeń. Jeśli nie jest ustawiona, strumień rozpoczyna się od najnowszej dostępnej wersji, w tym pełnej migawki tabeli w tej chwili.Jeśli zostanie określony, strumień odczytuje wszystkie zmiany w tabeli delta, począwszy od określonej wersji (włącznie). Jeśli określona wersja nie jest już dostępna, uruchomienie strumienia nie powiedzie się. Wersje zatwierdzenia można uzyskać z
version
kolumny danych wyjściowych polecenia DESCRIBE HISTORY .Aby zwrócić tylko najnowsze zmiany, określ wartość
latest
.startingTimestamp
: znacznik czasu do rozpoczęcia od. Wszystkie zmiany tabeli zatwierdzone w znaczniku czasu (włącznie) są odczytywane przez czytnik przesyłania strumieniowego. Jeśli podany znacznik czasu poprzedza wszystkie zatwierdzenia tabeli, odczyt przesyłania strumieniowego rozpoczyna się od najwcześniejszego dostępnego znacznika czasu. Jeden z:- Ciąg znacznika czasu. Na przykład
"2019-01-01T00:00:00.000Z"
. - Ciąg daty. Na przykład
"2019-01-01"
.
- Ciąg znacznika czasu. Na przykład
Nie można jednocześnie ustawić obu opcji. Zaczynają obowiązywać tylko podczas uruchamiania nowego zapytania przesyłania strumieniowego. Jeśli zapytanie przesyłania strumieniowego zostało uruchomione i postęp został zarejestrowany w punkcie kontrolnym, te opcje są ignorowane.
Ważne
Chociaż źródło przesyłania strumieniowego można uruchomić z określonej wersji lub znacznika czasu, schemat źródła przesyłania strumieniowego jest zawsze najnowszym schematem tabeli delty. Musisz upewnić się, że nie ma niezgodnej zmiany schematu w tabeli delta po określonej wersji lub znaczniku czasu. W przeciwnym razie źródło przesyłania strumieniowego może zwracać nieprawidłowe wyniki podczas odczytywania danych z nieprawidłowym schematem.
Przykład
Załóżmy na przykład, że masz tabelę user_events
. Jeśli chcesz odczytać zmiany od wersji 5, użyj:
spark.readStream
.option("startingVersion", "5")
.table("user_events")
Jeśli chcesz odczytać zmiany od 2018-10-18, użyj:
spark.readStream
.option("startingTimestamp", "2018-10-18")
.table("user_events")
Przetwarzanie początkowej migawki bez porzucania danych
Uwaga
Ta funkcja jest dostępna w środowisku Databricks Runtime 11.3 LTS lub nowszym. Ta funkcja jest dostępna w publicznej wersji zapoznawczej.
W przypadku używania tabeli delty jako źródła strumienia zapytanie najpierw przetwarza wszystkie dane obecne w tabeli. Tabela delta w tej wersji jest nazywana początkową migawką. Domyślnie pliki danych tabeli delty są przetwarzane na podstawie tego, który plik został ostatnio zmodyfikowany. Jednak czas ostatniej modyfikacji nie musi reprezentować kolejności czasu zdarzenia rekordu.
W stanowym zapytaniu przesyłanym strumieniowym ze zdefiniowanym znakiem wodnym przetwarzanie plików przez czas modyfikacji może spowodować przetworzenie rekordów w niewłaściwej kolejności. Może to prowadzić do upuszczania rekordów jako opóźnionych zdarzeń przez znak wodny.
Możesz uniknąć problemu z usuwaniem danych, włączając następującą opcję:
- withEventTimeOrder: czy początkowa migawka powinna być przetwarzana z kolejnością czasu zdarzenia.
Po włączeniu kolejności czasu zdarzenia zakres czasu początkowych danych migawki jest podzielony na przedziały czasu. Każda mikrosadowa partia przetwarza zasobnik, filtrując dane w zakresie czasu. Opcje konfiguracji maxFilesPerTrigger i maxBytesPerTrigger nadal mają zastosowanie do kontrolowania rozmiaru mikrobajta, ale tylko w przybliżony sposób ze względu na charakter przetwarzania.
Na poniższej ilustracji przedstawiono ten proces:
Istotne informacje o tej funkcji:
- Problem z usuwaniem danych występuje tylko wtedy, gdy początkowa migawka delty zapytania przesyłania strumieniowego stanowego jest przetwarzana w domyślnej kolejności.
- Nie można zmienić
withEventTimeOrder
po uruchomieniu zapytania strumienia podczas przetwarzania początkowej migawki. Aby ponownie uruchomić polecenie zewithEventTimeOrder
zmianą, należy usunąć punkt kontrolny. - Jeśli uruchamiasz zapytanie strumienia z włączoną funkcjąEventTimeOrder, nie można obniżyć jej do wersji DBR, która nie obsługuje tej funkcji do momentu ukończenia początkowego przetwarzania migawki. Jeśli musisz obniżyć dół, możesz poczekać na zakończenie początkowej migawki lub usunąć punkt kontrolny i ponownie uruchomić zapytanie.
- Ta funkcja nie jest obsługiwana w następujących nietypowych scenariuszach:
- Kolumna czasu zdarzenia jest kolumną wygenerowaną i istnieją przekształcenia niezwiązane z projekcją między źródłem delta i znakiem wodnym.
- Istnieje znak wodny zawierający więcej niż jedno źródło różnicowe w zapytaniu strumienia.
- Po włączeniu kolejności czasu zdarzenia wydajność początkowego przetwarzania migawek różnicowych może być niższa.
- Każda mikrosadowa skanuje początkową migawkę w celu filtrowania danych w odpowiednim zakresie czasu zdarzenia. Aby przyspieszyć akcję filtrowania, zaleca się użycie kolumny źródłowej delty jako czasu zdarzenia, aby można było zastosować pomijanie danych (sprawdź pomijanie danych dla usługi Delta Lake , jeśli ma to zastosowanie). Ponadto partycjonowanie tabeli wzdłuż kolumny czasu zdarzenia może przyspieszyć przetwarzanie. Możesz sprawdzić interfejs użytkownika platformy Spark, aby zobaczyć, ile plików różnicowych jest skanowanych pod kątem określonej mikrosadowej partii.
Przykład
Załóżmy, że masz tabelę user_events
z kolumną event_time
. Zapytanie przesyłane strumieniowo jest zapytaniem agregacji. Jeśli chcesz mieć pewność, że podczas początkowego przetwarzania migawek nie ma żadnych danych, możesz użyć następujących funkcji:
spark.readStream
.option("withEventTimeOrder", "true")
.table("user_events")
.withWatermark("event_time", "10 seconds")
Uwaga
Można również włączyć tę funkcję za pomocą konfiguracji platformy Spark w klastrze, która będzie stosowana do wszystkich zapytań przesyłania strumieniowego: spark.databricks.delta.withEventTimeOrder.enabled true
Tabela delty jako ujście
Dane można również zapisywać w tabeli delty przy użyciu przesyłania strumieniowego ze strukturą. Dziennik transakcji umożliwia usłudze Delta Lake zagwarantowanie dokładnie jednokrotnego przetwarzania, nawet jeśli istnieją inne strumienie lub zapytania wsadowe uruchomione współbieżnie względem tabeli.
Uwaga
Funkcja Delta Lake VACUUM
usuwa wszystkie pliki, które nie są zarządzane przez usługę Delta Lake, ale pomija wszystkie katalogi rozpoczynające się od _
. Punkty kontrolne można bezpiecznie przechowywać wraz z innymi danymi i metadanymi dla tabeli delty przy użyciu struktury katalogów, takiej jak <table-name>/_checkpoints
.
Metryki
Liczbę bajtów i liczbę plików, które nie są jeszcze przetwarzane, można znaleźć w procesie zapytań przesyłanych strumieniowo jako numBytesOutstanding
metryki i numFilesOutstanding
. Dodatkowe metryki obejmują:
numNewListedFiles
: liczba plików usługi Delta Lake wymienionych w celu obliczenia listy prac dla tej partii.backlogEndOffset
: wersja tabeli używana do obliczania listy prac.
Jeśli używasz strumienia w notesie, możesz zobaczyć te metryki na karcie Nieprzetworzone dane na pulpicie nawigacyjnym postępu zapytania przesyłania strumieniowego:
{
"sources" : [
{
"description" : "DeltaSource[file:/path/to/source]",
"metrics" : {
"numBytesOutstanding" : "3456",
"numFilesOutstanding" : "8"
},
}
]
}
Tryb dołączania
Domyślnie strumienie są uruchamiane w trybie dołączania, który dodaje nowe rekordy do tabeli.
toTable
Użyj metody podczas przesyłania strumieniowego do tabel, jak w poniższym przykładzie:
Python
(events.writeStream
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.toTable("events")
)
Scala
events.writeStream
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.toTable("events")
Tryb ukończenia
Możesz również użyć przesyłania strumieniowego ze strukturą, aby zastąpić całą tabelę każdą partią. Przykładem przypadku użycia jest obliczenie podsumowania przy użyciu agregacji:
Python
(spark.readStream
.table("events")
.groupBy("customerId")
.count()
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
.toTable("events_by_customer")
)
Scala
spark.readStream
.table("events")
.groupBy("customerId")
.count()
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
.toTable("events_by_customer")
Powyższy przykład stale aktualizuje tabelę zawierającą zagregowaną liczbę zdarzeń według klienta.
W przypadku aplikacji z bardziej łagodnymi wymaganiami dotyczącymi opóźnienia można zaoszczędzić zasoby obliczeniowe za pomocą wyzwalaczy jednorazowych. Służą one do aktualizowania tabel agregacji podsumowania w danym harmonogramie, przetwarzania tylko nowych danych, które dotarły od ostatniej aktualizacji.
Upsert z zapytań przesyłanych strumieniowo przy użyciu polecenia foreachBatch
Możesz użyć kombinacji elementów i foreachBatch
do zapisania złożonych merge
operacji upsert z zapytania przesyłania strumieniowego do tabeli delty. Zobacz Używanie polecenia foreachBatch do zapisywania do losowego ujściach danych.
Ten wzorzec zawiera wiele aplikacji, w tym następujące:
- Zapisuj agregacje przesyłania strumieniowego w trybie aktualizacji: jest to znacznie bardziej wydajne niż tryb ukończenia.
- Zapis strumienia zmian bazy danych w tabeli delty: kwerenda scalania do zapisywania danych zmian może służyć
foreachBatch
do ciągłego stosowania strumienia zmian do tabeli delty. - Zapisywanie strumienia danych w tabeli delty z deduplikacją: zapytanie scalania tylko do wstawiania dla deduplikacji może służyć
foreachBatch
do ciągłego zapisywania danych (z duplikatami) do tabeli delty z automatyczną deduplikacją.
Uwaga
- Upewnij się, że instrukcja
merge
wewnątrzforeachBatch
jest idempotentna, ponieważ ponowne uruchomienia zapytania przesyłania strumieniowego mogą wielokrotnie stosować operację na tej samej partii danych. - W
merge
przypadku użycia wforeachBatch
programie szybkość danych wejściowych zapytania przesyłania strumieniowego (zgłoszonego iStreamingQueryProgress
widocznego na wykresie szybkości notesu) może być zgłaszana jako wielokrotność rzeczywistej szybkości generowania danych w źródle. Dzieje się tak, ponieważmerge
odczytuje dane wejściowe wiele razy, co powoduje pomnożenie metryk wejściowych. Jeśli jest to wąskie gardło, możesz buforować partię DataFrame przedmerge
, a następnie cofnąć jej buforowanie pomerge
.
W poniższym przykładzie pokazano, jak można użyć programu SQL w programie foreachBatch
, aby wykonać to zadanie:
Scala
// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
// Set the dataframe to view name
microBatchOutputDF.createOrReplaceTempView("updates")
// Use the view name to apply MERGE
// NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
microBatchOutputDF.sparkSession.sql(s"""
MERGE INTO aggregates t
USING updates s
ON s.key = t.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
}
// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
Python
# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
# Set the dataframe to view name
microBatchOutputDF.createOrReplaceTempView("updates")
# Use the view name to apply MERGE
# NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
# In Databricks Runtime 10.5 and below, you must use the following:
# microBatchOutputDF._jdf.sparkSession().sql("""
microBatchOutputDF.sparkSession.sql("""
MERGE INTO aggregates t
USING updates s
ON s.key = t.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta)
.outputMode("update")
.start()
)
Możesz również użyć interfejsów API usługi Delta Lake do wykonywania operacji upsert przesyłania strumieniowego, jak w poniższym przykładzie:
Scala
import io.delta.tables.*
val deltaTable = DeltaTable.forName(spark, "table_name")
// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
deltaTable.as("t")
.merge(
microBatchOutputDF.as("s"),
"s.key = t.key")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
}
// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "table_name")
# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
(deltaTable.alias("t").merge(
microBatchOutputDF.alias("s"),
"s.key = t.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta)
.outputMode("update")
.start()
)
Idempotentne zapisy tabeli w foreachBatch
Uwaga
Usługa Databricks zaleca skonfigurowanie oddzielnego zapisu strumieniowego dla każdego ujścia, który chcesz zaktualizować. Używanie foreachBatch
funkcji do zapisywania w wielu tabelach serializuje zapisy, co zmniejsza równoległość i zwiększa ogólne opóźnienie.
Tabele różnicowe obsługują następujące DataFrameWriter
opcje tworzenia zapisów w wielu tabelach w foreachBatch
ramach idempotentności:
txnAppId
: unikatowy ciąg, który można przekazać do każdego zapisu ramki danych. Na przykład możesz użyć identyfikatora StreamingQuery jakotxnAppId
.txnVersion
: monotonicznie rosnąca liczba, która działa jako wersja transakcji.
Usługa Delta Lake używa kombinacji elementów i txnVersion
do identyfikowania txnAppId
zduplikowanych zapisów i ich ignorowania.
Jeśli zapis wsadowy zostanie przerwany z powodu awarii, ponowne uruchomienie partii używa tej samej aplikacji i identyfikatora partii, aby ułatwić środowisku uruchomieniowemu poprawne zidentyfikowanie zduplikowanych zapisów i zignorowanie ich. Identyfikator aplikacji (txnAppId
) może być dowolnym unikatowym ciągiem generowanym przez użytkownika i nie musi być powiązany z identyfikatorem strumienia. Zobacz Używanie polecenia foreachBatch do zapisywania do losowego ujściach danych.
Ostrzeżenie
Jeśli usuniesz punkt kontrolny przesyłania strumieniowego i uruchomisz ponownie zapytanie przy użyciu nowego punktu kontrolnego, musisz podać inny txnAppId
element . Nowe punkty kontrolne zaczynają się od identyfikatora partii .0
Usługa Delta Lake używa identyfikatora partii i txnAppId
jako unikatowego klucza i pomija partie z już widocznymi wartościami.
W poniższym przykładzie kodu pokazano ten wzorzec:
Python
app_id = ... # A unique string that is used as an application ID.
def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2
streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()
Scala
val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...) // location 1
batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...) // location 2
}