Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Punkty kontrolne i dzienniki zapisu uprzedniego współpracują ze sobą, aby zapewnić gwarancje przetwarzania danych w strumieniowym przetwarzaniu danych strukturalnych. Punkt kontrolny śledzi informacje identyfikujące zapytanie, w tym informacje o stanie i przetworzone rekordy. Po usunięciu plików w katalogu punktu kontrolnego lub zmianie na nową lokalizację punktu kontrolnego następny przebieg zapytania rozpoczyna się od nowa.
Każde zapytanie musi mieć inną lokalizację punktu kontrolnego. Wiele zapytań nigdy nie powinno współdzielić tej samej lokalizacji.
Włączanie tworzenia punktów kontrolnych dla zapytań przesyłania strumieniowego ze strukturą
Należy określić opcję checkpointLocation zanim uruchomisz zapytanie strumieniowe, jak w poniższym przykładzie:
Python
(df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
)
Scala
df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
Uwaga
Niektóre ujścia, takie jak dane wyjściowe display() w notesach i ujście memory, automatycznie generują tymczasową lokalizację punktu kontrolnego, jeśli pominięto tę opcję. Te tymczasowe lokalizacje punktów kontrolnych nie zapewniają żadnej odporności na uszkodzenia ani gwarancji spójności danych i mogą nie zostać prawidłowo wyczyszczone. Usługa Databricks zaleca zawsze określanie lokalizacji punktu kontrolnego dla tych ujść.
Odzyskiwanie po zmianach w zapytaniu Structured Streaming
Istnieją ograniczenia dotyczące zmian w zapytaniu przesyłanym strumieniowo między ponownymi uruchomieniami z tej samej lokalizacji punktu kontrolnego. Poniżej przedstawiono kilka zmian, które nie są dozwolone lub efekt zmiany nie jest dobrze zdefiniowany. Dla wszystkich z nich:
- Termin dozwolony oznacza, że można wykonać określoną zmianę, ale czy semantyka jej efektu jest dobrze zdefiniowana, zależy od zapytania i zmiany.
- Termin niedozwolony oznacza, że nie należy wykonywać określonej zmiany, ponieważ ponownie uruchomione zapytanie prawdopodobnie nie powiedzie się z nieprzewidywalnymi błędami.
-
sdfreprezentuje strumieniowe przetwarzanie DataFrame/Dataset wygenerowane za pomocąsparkSession.readStream.
Typy zmian w zapytaniach Structured Streaming
Zmiany w liczbie lub typie (czyli innym źródle) źródeł wejściowych: nie jest to dozwolone.
Zmiany parametrów źródeł wejściowych: czy jest to dozwolone i czy semantyka zmiany jest dobrze zdefiniowana, zależy od źródła i zapytania, w tym kontroli dostępu, takich jak
maxFilesPerTriggerlubmaxOffsetsPerTrigger. Oto kilka przykładów:Dodawanie, usuwanie i modyfikowanie limitów szybkości jest dozwolone:
spark.readStream.format("kafka").option("subscribe", "article")do
spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)Zmiany w subskrybowanych artykułach i plikach są zwykle niedozwolone, ponieważ wyniki są nieprzewidywalne:
spark.readStream.format("kafka").option("subscribe", "article")spark.readStream.format("kafka").option("subscribe", "newarticle")
Zmiany w interwale wyzwalania: Możesz zmieniać wyzwalacze między partiami przyrostowymi a interwałami czasu. Zobacz Zmiana interwałów wyzwalacza między uruchomieniami.
Zmiany typu ujścia danych wyjściowych: zmiany między kilkoma konkretnymi kombinacjami ujścia są dozwolone. Należy to zweryfikować indywidualnie dla każdego przypadku. Oto kilka przykładów:
- Ujście pliku do ujścia platformy Kafka jest dozwolone. Platforma Kafka będzie widzieć tylko nowe dane.
- Ujście platformy Kafka do ujścia plików nie jest dozwolone.
- Zmiana ujścia danych w platformie Kafka na instrukcję foreach lub odwrotnie jest dozwolona.
Zmiany parametrów ujścia wyjściowego: czy jest to dozwolone i czy semantyka zmiany jest dobrze zdefiniowana, zależy od ujścia i zapytania. Oto kilka przykładów:
- Zmiany katalogu wyjściowego odbiornika pliku nie są dozwolone:
sdf.writeStream.format("parquet").option("path", "/somePath")dosdf.writeStream.format("parquet").option("path", "/anotherPath") - Zmiany w temacie wyjściowym są dozwolone:
sdf.writeStream.format("kafka").option("topic", "topic1")sdf.writeStream.format("kafka").option("topic", "topic2") - Zmiana ujścia "foreach" zdefiniowanego przez użytkownika (to jest
ForeachWriterkod) jest dozwolona, ale jej semantyka zależy od kodu.
- Zmiany katalogu wyjściowego odbiornika pliku nie są dozwolone:
Zmiany w operacjach typu projekcja/filtr/mapowanie: niektóre przypadki są dozwolone. Przykład:
- Dodawanie/usuwanie filtrów jest dozwolone:
sdf.selectExpr("a")dosdf.where(...).selectExpr("a").filter(...). - Zmiany w projekcjach z takim samym schematem wyjściowym są dozwolone:
sdf.selectExpr("stringColumn AS json").writeStreamdosdf.select(to_json(...).as("json")).writeStream. - Zmiany w projekcjach z innym schematem wyjściowym są warunkowo dozwolone: przejście z
sdf.selectExpr("a").writeStreamdosdf.selectExpr("b").writeStreamjest dozwolone tylko wtedy, gdy ujście danych wyjściowych zezwala na zmianę schematu z"a"na"b".
- Dodawanie/usuwanie filtrów jest dozwolone:
Zmiany w operacjach stanowych: niektóre operacje w zapytaniach przesyłanych strumieniowo muszą obsługiwać dane stanu w celu ciągłego aktualizowania wyniku. Strukturalne przesyłanie strumieniowe automatycznie zapisuje dane stanu do magazynu odpornym na błędy (na przykład DBFS, Azure Blob Storage) i przywraca je po ponownym uruchomieniu. Zakłada się jednak, że schemat danych stanu pozostaje taki sam w przypadku ponownych uruchomień. Oznacza to, że wszelkie zmiany (czyli dodawanie, usuwanie lub modyfikacje schematu) w stanowych operacjach zapytania przesyłania strumieniowego są niedozwolone między ponownymi uruchomieniami. Oto lista operacji stanowych, których schemat nie powinien być zmieniany między ponownymi uruchomieniami w celu zapewnienia odzyskiwania stanu:
-
Agregacja strumieniowa: na przykład
sdf.groupBy("a").agg(...). Wszelkie zmiany w liczbie lub typie kluczy grupowania lub agregacji nie są dozwolone. -
Deduplikacja przesyłania strumieniowego: na przykład
sdf.dropDuplicates("a"). Wszelkie zmiany w liczbie lub typie kluczy grupowania lub agregacji nie są dozwolone. -
łącze strumieni: na przykład
sdf1.join(sdf2, ...)(tj. oba wejścia są generowane przy użyciusparkSession.readStream). Zmiany w schemacie lub kolumnach łączących równoważnych są niedozwolone. Zmiany typu sprzężenia (zewnętrzne lub wewnętrzne) są niedozwolone. Inne zmiany w warunku sprzężenia są źle zdefiniowane. -
Dowolna operacja z zachowaniem stanu: na przykład
sdf.groupByKey(...).mapGroupsWithState(...)lubsdf.groupByKey(...).flatMapGroupsWithState(...). Wszelkie zmiany schematu stanu zdefiniowanego przez użytkownika i typ limitu czasu nie są dozwolone. Każda zmiana w funkcji mapowania stanu zdefiniowanego przez użytkownika jest dozwolona, ale semantyczny wpływ zmiany zależy od logiki zdefiniowanej przez użytkownika. Jeśli naprawdę chcesz obsługiwać zmiany schematu stanu, możesz jawnie kodować/dekodować złożone struktury danych stanu do bajtów przy użyciu schematu kodowania/dekodowania, który obsługuje migrację schematu. Jeśli na przykład zapiszesz stan jako bajty zakodowane w formacie Avro, możesz zmienić schemat Avro-state-między ponownym uruchomieniem zapytania, ponieważ spowoduje to przywrócenie stanu binarnego.
-
Agregacja strumieniowa: na przykład
Ważne
Operatory dropDuplicates() stanowe i dropDuplicatesWithinWatermark() mogą nie zostać uruchomione ponownie z powodu sprawdzania zgodności schematu stanu podczas zmiany między trybami dostępu obliczeniowego.
Zmiana między trybami dostępu dedykowanego i bez izolacji jest dozwolona. Zmiana między standardowymi i bezserwerowym trybem dostępu jest dozwolona. Nie należy podejmować próby zmiany między innymi kombinacjami trybu dostępu.
Aby uniknąć tego błędu, nie zmieniaj trybu dostępu obliczeniowego dla zapytań przesyłanych strumieniowo zawierających te operatory.