Dokumentacja opcji interfejsu API platformy Spark

Na tej stronie wymieniono dostępne opcje danych wejściowych i wyjściowych dla interfejsów API platformy Spark, które odczytują i zapisują dane.

Opcje elementu DataFrameReader

Użyj tych opcji z DataFrameReader.option(), DataFrameReader.options(), read_files, COPY INTO i Auto Loader kontrolować, jak Azure Databricks odczytuje pliki danych.

Example

Poniższy przykład ustawia wartość multiLine na True do odczytywania plików JSON:

Python
df = spark.read.format("json").option("multiLine", True).load("/path/to/data")
Scala
val df = spark.read.format("json").option("multiLine", "true").load("/path/to/data")
SQL
SELECT * FROM read_files("/path/to/data", format => "json", multiLine => true)

Wspólne

Poniższe opcje mają zastosowanie do wszystkich formatów plików.

Key Domyślny Description
ignoreCorruptFiles false Czy ignorować uszkodzone pliki. Jeśli ma wartość true, zadania platformy Spark będą nadal działać po napotkaniu uszkodzonych plików, a zawartość, która została odczytowana, będzie nadal zwracana. W przypadku COPY INTOprogramu można obserwować pomijanie uszkodzonych plików, jak numSkippedCorruptFiles w operationMetrics kolumnie historii usługi Delta Lake. Dostępne w środowisku Databricks Runtime 11.3 LTS i nowszym.
ignoreMissingFiles false dla automatycznego modułu ładującego dla trueCOPY INTO (starsza wersja) Czy ignorować brakujące pliki. Jeśli to prawda, zadania platformy Spark będą nadal uruchamiane po napotkaniu brakujących plików, a zawartość jest nadal zwracana. Dostępne w środowisku Databricks Runtime 11.3 LTS i nowszym.
modifiedAfter None Opcjonalny znacznik czasu jako filtr do pobierania tylko tych plików, które mają znacznik czasu modyfikacji po podanym znaczniku czasowym.
modifiedBefore None Opcjonalny znacznik czasu jako filtr do wczytywania tylko plików, które mają znacznik czasu modyfikacji wcześniejszy niż podany znacznik czasu.
pathGlobFilter lub fileNamePattern None Potencjalny wzorzec globu umożliwiający wybór plików. PATTERN Odpowiednik w (COPY INTOstarsza wersja). fileNamePattern można użyć w pliku read_files.
recursiveFileLookup false Gdy trueta opcja wyszukuje katalogi zagnieżdżone, nawet jeśli ich nazwy nie są zgodne ze schematem nazewnictwa partycji, takim jak date=2019-07-01.

Avro

Key Domyślny Description
avroSchema None Opcjonalny schemat dostarczony przez użytkownika w formacie Avro. Podczas odczytywania avro tę opcję można ustawić na rozwinięty schemat, który jest zgodny, ale różni się od rzeczywistego schematu Avro. Schemat deserializacji jest zgodny ze schematem rozwiniętym. Jeśli na przykład ustawisz rozwinięty schemat zawierający jedną dodatkową kolumnę z wartością domyślną, wynik odczytu również zawiera nową kolumnę.
avroSchemaEvolutionMode none Jak obsługiwać ewolucję schematu podczas korzystania z rejestru schematów. Prawidłowe wartości: none (ignoruj zmiany schematu i kontynuuj zadanie), restart (gdy zostaną wykryte zmiany schematu, zgłasza UnknownFieldException element i wymaga ponownego uruchomienia zadania).
datetimeRebaseMode LEGACY Steruje przestawieniem wartości DATE i TIMESTAMP między kalendarzami juliańskim a proleptycznym gregoriańskim. Prawidłowe wartości: EXCEPTION, LEGACYi CORRECTED.
enableStableIdentifiersForUnionType false Określa, czy używać stabilnych nazw pól dla typów unii Avro. Po włączeniu nazwy pól typu unii pochodzą z nazw typów w małych literach (na przykład member_int, member_string). Zgłasza wyjątek, jeśli dwie nazwy typów są identyczne po małych literach.
mergeSchema false Czy należy wywnioskować schemat dla wielu plików i scalić schematy każdego pliku. mergeSchema dla Avro nie jest możliwe złagodzenie typów danych.
mode FAILFAST Tryb analizatora do obsługi uszkodzonych rekordów. Prawidłowe wartości: FAILFAST (zgłasza wyjątek), PERMISSIVE (ustawia źle sformułowane pola na wartość null) DROPMALFORMED (dyskretnie porzuca nieprawidłowe rekordy).
readerCaseSensitive true Określa zachowanie wrażliwości na wielkość liter po włączeniu rescuedDataColumn. Jeśli to prawda, należy uratować kolumny danych, których nazwy różnią się wielkością liter od schematu. Gdy wartość false, odczytuje dane w sposób niewrażliwy na wielkość liter.
recursiveFieldMaxDepth None Maksymalna głębokość rekursji dla cyklicznych pól Avro. Ustaw wartość na 1 obcięcie wszystkich pól cyklicznych, 2 aby zezwolić na jeden poziom rekursji itd.15 W przypadku niezastawionych pól cyklicznych lub 0pola cyklicznego nie są dozwolone. Prawidłowe wartości: 0 do 15.
rescuedDataColumn None Czy zebrać wszystkie dane, których nie można przeanalizować z powodu niezgodności typu danych oraz niezgodności schematu (w tym wielkości liter kolumn), do osobnej kolumny. Ta kolumna jest domyślnie dołączana podczas korzystania z modułu automatycznego ładowania.
COPY INTO (starsza wersja) nie obsługuje uratowanych kolumn danych, ponieważ nie można ręcznie ustawić schematu przy użyciu COPY INTO. Databricks zaleca używanie Auto Loader w przypadku większości scenariuszy przyjmowania danych.
Aby uzyskać więcej informacji, zobacz Co to jest uratowana kolumna danych?.
stableIdentifierPrefixForUnionType member_ Prefiks używany dla nazw pól typu stabilnego unii, gdy enableStableIdentifiersForUnionType=true.

CSV

Key Domyślny Description
badRecordsPath None Ścieżka do przechowywania plików do rejestrowania informacji o nieprawidłowych rekordach CSV.
charToEscapeQuoteEscaping \0 Znak używany do unikania znaku używanego do unikania cudzysłowów. Na przykład dla następującego rekordu: : [ " a\\", b ]
  • Jeśli znak używany do ucieczki '\' jest niezdefiniowany, rekord nie zostanie przetworzony. Analizator odczytuje znaki: [a],[\],["],[,],[ ],[b] i zgłasza błąd, ponieważ nie może znaleźć cudzysłowu zamykającego.
  • Jeśli znak ucieczki '\' jest zdefiniowany jako '\', rekord zostanie odczytany z dwoma wartościami: [a\] i [b].
columnNameOfCorruptRecord _corrupt_record Obsługiwane dla funkcji Auto Loader. Nie obsługiwane w przypadku COPY INTO (starsza wersja).
Kolumna do przechowywania rekordów, które są źle sformułowane i nie można ich przeanalizować. mode Jeśli dla analizowania ustawiono wartość DROPMALFORMED, ta kolumna będzie pusta.
comment \0 Definiuje znak reprezentujący komentarz liniowy znajdujący się na początku linii tekstu. Użyj polecenia '\0' , aby wyłączyć pomijanie komentarza.
dateFormat yyyy-MM-dd Format analizowania ciągów dat.
emptyValue Pusty ciąg Wartość reprezentowana jako pusty ciąg.
enableDateTimeParsingFallback false Czy powrócić do starszego zachowania analizy daty i znacznika czasu, gdy nie można przeanalizować wartości przy użyciu określonego formatu. Gdy falsebłędy analizowania zgłaszają błąd lub powodują wygenerowanie wartości null w zależności od modemetody .
encoding lub charset UTF-8 Nazwa kodowania plików CSV. Zobacz java.nio.charset.Charset listę opcji. UTF-16 i UTF-32 nie mogą być używane, gdy multiline to true.
enforceSchema true Czy wymuszać stosowanie zadanego lub wnioskowanego schematu do plików CSV. Jeśli opcja jest włączona, nagłówki plików CSV są ignorowane. Ta opcja jest domyślnie ignorowana podczas używania automatycznego modułu ładującego do ratowania danych i zezwalania na ewolucję schematu.
escape \ Znak ucieczki do użycia podczas analizowania danych.
extension csv Oczekiwane rozszerzenie nazwy pliku. Pliki bez tego rozszerzenia są filtrowane podczas odczytu.
failOnUnknownFields false Czy rekord CSV nie może występować w schemacie, czy rekord CSV zawiera kolumny, które nie są obecne. W przypadku false, nierozpoznane kolumny są dyskretnie porzucane lub ratowane w zależności od rescuedDataColumn.
failOnWidenedFields false Czy nie można przeanalizować wartości pola jako zadeklarowanego typu schematu bez rozszerzania. Gdy falsewartości rozszerzone typu są dyskretnie ratowane w zależności od rescuedDataColumn. Ustawienie failOnUnknownFields=true może maskować efekty tej opcji.
header false Określa, czy pliki CSV zawierają nagłówek. Automatyczny ładowacz zakłada, że pliki mają nagłówki podczas ustalania schematu.
ignoreLeadingWhiteSpace false Czy ignorować początkowe spacje dla każdej parsowanej wartości.
ignoreTrailingWhiteSpace false Czy należy pomijać zbędne spacje dla każdej analizowanej wartości.
inferSchema false Czy wywnioskować typy danych analizowanych rekordów CSV, czy przyjąć, że wszystkie kolumny mają wartość StringType. Wymaga dodatkowego przetwarzania danych, jeśli ustawiono wartość true. Zamiast tego użyj cloudFiles.inferColumnTypes dla Auto Loader.
inputBufferSize 1048576 (1 MB) Rozmiar buforu w bajtach analizatora CSV. Przydatne do dostrajania użycia pamięci podczas analizowania dużych plików CSV. Prawidłowe wartości: dodatnie liczby całkowite.
lineSep Brak, który obejmuje \r, \r\ni \n Ciąg pomiędzy dwoma następującymi po sobie rekordami CSV.
locale US Identyfikator java.util.Locale. Wpływa na sposób domyślnego interpretowania dat, znaczników czasu oraz liczb dziesiętnych w pliku CSV.
maxCharsPerColumn -1 Maksymalna liczba znaków oczekiwana od wartości do przeanalizowania. Może służyć do unikania błędów pamięci. Wartość domyślna to -1, co oznacza brak ograniczeń. Prawidłowe wartości: dodatnie liczby całkowite lub -1 nieograniczone.
maxColumns 20480 Stały limit liczby kolumn, które może mieć rekord. Prawidłowe wartości: dodatnie liczby całkowite.
mergeSchema false Czy należy wywnioskować schemat dla wielu plików i scalić schematy każdego pliku. Domyślnie włączono funkcję automatycznego ładowania podczas wnioskowania schematu.
mode PERMISSIVE Tryb analizatora dotyczący obsługi nieprawidłowo sformułowanych danych. Prawidłowe wartości: PERMISSIVE, , DROPMALFORMEDFAILFAST.
multiLine false Określa, czy rekordy CSV obejmują wiele wierszy.
nanValue NaN Reprezentacja ciągu znakowego wartości niebędącej liczbą podczas parsowania kolumn FloatType i DoubleType.
negativeInf -Inf Tekstowa reprezentacja nieskończoności ujemnej podczas parsowania kolumn FloatType lub DoubleType.
nullValue Pusty ciąg Reprezentacja wartości null jako ciąg znaków.
parserCaseSensitive (przestarzałe) false Czy podczas odczytywania plików należy wyrównywać kolumny zadeklarowane w nagłówku z uwzględnieniem wielkości liter w schemacie. true To jest domyślne ustawienie dla Automatycznego Ładowania. Kolumny, które różnią się wielkością liter, zostaną zachowane w rescuedDataColumn, jeśli funkcja ta zostanie włączona. Ta opcja została uznana za przestarzałą na rzecz readerCaseSensitive.
positiveInf Inf Postać tekstowa nieskończoności dodatniej podczas parsowania kolumn FloatType lub DoubleType.
preferDate true Próbuje interpretować ciągi znaków jako daty, zamiast traktować je jako znaczniki czasu, gdy jest to możliwe. Należy również użyć wnioskowania schematu, włączając inferSchema lub używając cloudFiles.inferColumnTypes funkcji automatycznego modułu ładującego.
quote " Znak używany do unikania wartości, w których ogranicznik pola jest częścią wartości.
readerCaseSensitive true Określa zachowanie wrażliwości na wielkość liter po włączeniu rescuedDataColumn. Jeśli to prawda, należy uratować kolumny danych, których nazwy różnią się wielkością liter od schematu. Gdy wartość false, odczytuje dane w sposób niewrażliwy na wielkość liter.
rescuedDataColumn None Czy zebrać wszystkie dane, których nie można przeanalizować z powodu niezgodności typu danych oraz niezgodności schematu (w tym wielkości liter kolumn), do osobnej kolumny. Ta kolumna jest domyślnie dołączana podczas korzystania z modułu automatycznego ładowania. Aby uzyskać więcej informacji, zobacz Co to jest uratowana kolumna danych?.
COPY INTO (starsza wersja) nie obsługuje uratowanych kolumn danych, ponieważ nie można ręcznie ustawić schematu przy użyciu COPY INTO. Databricks zaleca używanie Auto Loader w przypadku większości scenariuszy przyjmowania danych.
sep lub delimiter , Ciąg separatorów między kolumnami.
singleVariantColumn None Po ustawieniu nazwy kolumny odczytuje cały rekord CSV w jedną VariantType kolumnę o tej nazwie zamiast analizować każde pole do własnej kolumny. Wymaga header=true.
skipRows 0 Liczba wierszy z początku pliku CSV, które powinny być ignorowane (w tym z komentarzami i pustymi wierszami). Jeśli header ma wartość true, nagłówek będzie pierwszym wierszem, który nie został pominięty ani skomentowany. Prawidłowe wartości: dodatnie liczby całkowite lub 0.
timeFormat HH:mm:ss Format analizowania TimeType wartości kolumn.
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] Format analizowania ciągów znacznika czasu.
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] Format analizowania znacznika czasu bez ciągów strefy czasowej (TimestampNTZType).
timeZone None Element java.time.ZoneId do użycia podczas analizowania sygnatur czasowych i dat.
unescapedQuoteHandling STOP_AT_DELIMITER Strategia obsługi niewyłuskanych cudzysłowów. Dozwolone opcje:
  • STOP_AT_CLOSING_QUOTE: Jeśli w danych wejściowych znajdują się niepoprawnie wyrażone cudzysłowy, zakumuluj znak cudzysłowu i kontynuuj analizowanie wartości jako wartości w cudzysłowie, aż zostanie znaleziony cudzysłów zamykający.
  • BACK_TO_DELIMITER: Jeśli w danych wejściowych znajdują się nieosłonięte cudzysłowy, należy uznać wartość za wartość bez cudzysłowu. Spowoduje to, że analizator zgromadzi wszystkie znaki bieżącej wartości analizowanej do momentu znalezienia ogranicznika zdefiniowanego przez sep element . Jeśli w wartości nie znaleziono ogranicznika, analizator będzie nadal gromadzić znaki z danych wejściowych do momentu znalezienia ogranicznika lub zakończenia wiersza.
  • STOP_AT_DELIMITER: Jeśli w danych wejściowych znajdują się nieosłonięte cudzysłowy, należy uznać wartość za wartość bez cudzysłowu. Spowoduje to, że analizator zgromadzi wszystkie znaki do momentu znalezienia ogranicznika zdefiniowanego przez sep, lub zakończenia wiersza w danych wejściowych.
  • SKIP_VALUE: Jeśli w danych wejściowych znajdzie się niezakodowane cudzysłowy, zawartość przeanalizowana dla danej wartości zostanie pominięta (aż do znalezienia następnego ogranicznika), a wartość ustawiona w nullValue zostanie wygenerowana.
  • RAISE_ERROR: Jeśli w danych wejściowych znajdują się niewyobrażalne cudzysłowy, TextParsingException zostanie zgłoszony element .

Excel

Key Domyślny Description
dataAddress None Zakres komórek do odczytania w składni Excel. Jeśli pominięto, odczytuje wszystkie prawidłowe komórki z pierwszego arkusza. Użyj "SheetName!C5:H10" polecenia , aby odczytać zakres z nazwanego arkusza, "C5:H10" aby odczytać zakres z pierwszego arkusza lub "SheetName" odczytać wszystkie dane z określonego arkusza.
headerRows 0 Liczba początkowych wierszy do użycia jako nagłówki nazw kolumn. Gdy dataAddress zostanie określony, ma to zastosowanie w zakresie komórek. Gdy 0nazwy kolumn są generowane automatycznie jako _c1, _c2, _c3itp. Prawidłowe wartości: 0, 1.
ignoreMissingSheet false Czy dyskretnie pominąć pliki, które nie zawierają arkusza określonego przez dataAddress. W przypadku falsewystąpienia błędu zgłaszany jest błąd, jeśli brakuje żądanego arkusza. Ma zastosowanie tylko wtedy, gdy nazwa arkusza jest określona w pliku dataAddress. Prawidłowe wartości: true, false.
includePhoneticRuns false Czy dołączyć adnotacje fonetyczne (takie jak pinyin lub furigana) łączone z wartościami ciągów komórek podczas odczytywania plików XLSX. Prawidłowe wartości: true, false.
operation readSheet Operacja do wykonania w skoroszycie Excel. Prawidłowe wartości: readSheet (odczytuje dane z arkusza) listSheets (zwraca strukturę z polami sheetIndex: long i sheetName: String dla każdego arkusza).
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] Niestandardowy ciąg formatu dla wartości timestamp-without-timezone przechowywanych jako ciągi w Excel. Niestandardowe formaty dat są zgodne z formatami we wzorcach Datetime patterns.
dateFormat yyyy-MM-dd Ciąg formatu niestandardowego dla wartości ciągów odczytanych jako Date. Niestandardowe formaty dat są zgodne z formatami we wzorcach Datetime patterns.

JSON

Key Domyślny Description
allowBackslashEscapingAnyCharacter false Czy zezwolić na użycie ukośników odwrotnych do zasłaniania dowolnego znaku, który po nich następuje. Jeśli nie jest włączona, tylko znaki, które są jawnie wymienione przez specyfikację JSON, mogą zostać uniknione.
allowComments false Czy zezwalać na używanie komentarzy w stylu Java, C i C++ ('/', '*', i '//' odmian) w analizowanej zawartości, czy nie.
allowNonNumericNumbers true Czy zezwalać na zestaw tokenów nie będących liczbami (NaN) jako prawidłowe wartości liczb zmiennoprzecinkowych.
allowNumericLeadingZeros false Czy pozwolić na rozpoczynanie liczb całkowitych od dodatkowych (ignorowanych) zer (na przykład 000001).
allowSingleQuotes true Czy zezwalać na używanie pojedynczych cudzysłowów (apostrof, znak '\') do cytowania ciągów (nazw i wartości ciągu).
allowUnquotedControlChars false Czy zezwolić ciągom JSON na zawieranie nieoznaczonych znaków kontrolnych (znaki ASCII o wartości mniejszej niż 32, w tym znaki tabulatora i przesunięcia wiersza), czy nie.
allowUnquotedFieldNames false Czy zezwalać na używanie niekwotowanych nazw pól, które są dozwolone przez język JavaScript, ale nie przez specyfikację JSON.
alternateVariantEncoding None Kodowanie używane dla wartości wariantów w źródłowym formacie JSON. Ustaw wartość na wartość , aby Z85 zdekodować wartości wariantów, które zostały zakodowane w formacie Base85 zamiast przechowywanych jako wbudowany kod JSON.
badRecordsPath None Ścieżka do przechowywania plików do rejestrowania informacji o nieprawidłowych rekordach JSON.
badRecordsPath Użycie opcji w źródle danych opartym na plikach ma następujące ograniczenia:
  • Nie jest to transakcyjne i może prowadzić do niespójnych wyników.
  • Błędy przejściowe są traktowane jako błędy.
columnNameOfCorruptRecord _corrupt_record Kolumna do przechowywania rekordów, które są źle sformułowane i nie można ich przeanalizować. mode Jeśli dla analizowania ustawiono wartość DROPMALFORMED, ta kolumna będzie pusta.
dateFormat yyyy-MM-dd Format analizowania ciągów dat.
dropFieldIfAllNull false Czy ignorować kolumny wszystkich wartości null, czy puste tablice i struktury podczas wnioskowania schematu.
encoding lub charset UTF-8 Nazwa kodowania plików JSON. Zobacz java.nio.charset.Charset listę opcji. Nie można użyć polecenia UTF-16 i UTF-32 gdy multiline ma wartość true.
inferTimestamp false Czy należy spróbować wywnioskować ciągi znacznika czasu jako TimestampType? W przypadku ustawienia trueopcji wnioskowanie schematu może trwać znacznie dłużej. Aby korzystać z Auto Loader, musisz włączyć cloudFiles.inferColumnTypes.
lineSep Brak, który obejmuje \r, \r\ni \n Ciąg między dwoma następującymi po sobie rekordami JSON.
locale US Identyfikator java.util.Locale. Wpływa na domyślną datę, znacznik czasu i analizowanie dziesiętne w formacie JSON.
maxNestingDepth 500 Maksymalna dozwolona głębokość zagnieżdżania dla obiektów i tablic JSON. Zwiększ tę wartość dla głęboko zagnieżdżonych dokumentów. Prawidłowe wartości: dodatnie liczby całkowite.
maxNumLen 1000 Maksymalna długość tokenów liczbowych w danych wejściowych JSON. Zwiększ tę wartość dla formatu JSON przy użyciu dużych literałów liczbowych. Prawidłowe wartości: dodatnie liczby całkowite.
maxStringLen Nieograniczony Maksymalna długość wartości ciągu w danych wejściowych JSON. Ustaw wartość , aby ograniczyć użycie pamięci podczas analizowania kodu JSON z dużymi ciągami. Prawidłowe wartości: dodatnie liczby całkowite.
mode PERMISSIVE Tryb analizatora dotyczący obsługi nieprawidłowo sformułowanych danych. Prawidłowe wartości: PERMISSIVE, , DROPMALFORMEDFAILFAST.
multiLine false Czy rekordy JSON rozciągają się na wiele wierszy?
prefersDecimal false Próbuje interpretować ciągi znaków jako DecimalType zamiast typu float lub double, jeśli jest to możliwe. Należy również użyć wnioskowania schematu, włączając inferSchema lub używając cloudFiles.inferColumnTypes funkcji automatycznego modułu ładującego.
primitivesAsString false Czy wywnioskować typy pierwotne, takie jak liczby i booleany, np. jako StringType.
readerCaseSensitive true Określa zachowanie wrażliwości na wielkość liter po włączeniu rescuedDataColumn. Jeśli to prawda, należy uratować kolumny danych, których nazwy różnią się wielkością liter od schematu. Gdy wartość false, odczytuje dane w sposób niewrażliwy na wielkość liter. Dostępne w środowisku Databricks Runtime 13.3 lub nowszym.
rescuedDataColumn None Czy zebrać wszystkie dane, których nie można przeanalizować z powodu niezgodności typu danych lub niezgodności schematu (w tym wielkości liter kolumn) do oddzielnej kolumny. Ta kolumna jest domyślnie dołączana podczas korzystania z modułu automatycznego ładowania. Aby uzyskać więcej informacji, zobacz Co to jest uratowana kolumna danych?.
COPY INTO (starsza wersja) nie obsługuje uratowanych kolumn danych, ponieważ nie można ręcznie ustawić schematu przy użyciu COPY INTO. Databricks zaleca używanie Auto Loader w przypadku większości scenariuszy przyjmowania danych.
singleVariantColumn None Czy pozyskać cały dokument JSON, przeanalizowany w jednej kolumnie Wariant z określonym ciągiem jako nazwą kolumny. Jeśli nie zostanie ustawiona, pola JSON są pozyskiwane do własnych kolumn. Prawidłowe wartości: dowolny ciąg.
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] Format analizowania ciągów znacznika czasu.
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] Format analizowania znacznika czasu bez ciągów strefy czasowej (TimestampNTZType).
timeZone None Element java.time.ZoneId do użycia podczas analizowania sygnatur czasowych i dat.
upgradeExceptionAsBadRecord false Czy traktować wyjątki uaktualniania typu (na przykład jeśli nie można rozszerzyć wartości do zadeklarowanego typu kolumny) jako nieprawidłowe rekordy, a nie zgłaszać wyjątku.

Kafka

Aby uzyskać pełną listę opcji czytnika platformy Kafka, zobacz Opcje dataStreamReader Kafka. Poniższe opcje dotyczą tylko operacji odczytu wsadowego przy użyciu polecenia spark.read.format("kafka").

Key Domyślny Description
endingOffsets latest Gdzie przestać czytać. Prawidłowe wartości: latestlub ciąg JSON przesunięcia dla każdej partycji, na przykład {"topicA":{"0":50,"1":-1}}.
W ciągu -1 JSON jest najnowszym przesunięciem. -2, który jest najwcześniejszym przesunięciem, nie jest dozwolony jako przesunięcie końcowe.
endingOffsetsByTimestamp None Przesunięcia końcowe dla partycji określone jako znaczniki czasu w milisekundach. Prawidłowe wartości: ciąg JSON sygnatur czasowych dla każdej partycji, taki jak {"topicA":{"0":2000,"1":3000}}.
endingTimestamp None Globalny znacznik czasu zakończenia w milisekundach zastosowany do wszystkich partycji. Prawidłowe wartości: liczby całkowite inne niż ujemne.

ORC

Key Domyślny Description
mergeSchema false Czy należy wywnioskować schemat dla wielu plików i scalić schematy każdego pliku.

Parkiet

Key Domyślny Description
datetimeRebaseMode LEGACY Steruje przestawieniem wartości DATE i TIMESTAMP między kalendarzami juliańskim a proleptycznym gregoriańskim. Prawidłowe wartości: EXCEPTION, LEGACYi CORRECTED.
int96RebaseMode LEGACY Zarządza przebazowaniem wartości znacznika czasu INT96 między kalendarzami juliańskim a proleptycznym gregoriańskim. Prawidłowe wartości: EXCEPTION, LEGACYi CORRECTED.
mergeSchema false Czy należy wywnioskować schemat dla wielu plików i scalić schematy każdego pliku.
readerCaseSensitive true Określa zachowanie wrażliwości na wielkość liter po włączeniu rescuedDataColumn. Jeśli to prawda, należy uratować kolumny danych, których nazwy różnią się wielkością liter od schematu. Gdy wartość false, odczytuje dane w sposób niewrażliwy na wielkość liter.
rescuedDataColumn None Czy zebrać wszystkie dane, których nie można przeanalizować z powodu niezgodności typu danych oraz niezgodności schematu (w tym wielkości liter kolumn), do osobnej kolumny. Ta kolumna jest domyślnie dołączana podczas korzystania z modułu automatycznego ładowania. Aby uzyskać więcej informacji, zobacz Co to jest uratowana kolumna danych?.
COPY INTO (starsza wersja) nie obsługuje uratowanych kolumn danych, ponieważ nie można ręcznie ustawić schematu przy użyciu COPY INTO. Databricks zaleca używanie Auto Loader w przypadku większości scenariuszy przyjmowania danych.

Magazyn stanów

Użyj tych opcji z funkcją lub funkcją spark.read.format("statestore")read_statestore wartości tabeli, aby odczytać dane stanu przesyłania strumieniowego ze strukturą. Zobacz Jak odczytać informacje o stanie strukturalnego strumieniowania danych.

Key Domyślny Description
batchId Najnowszy identyfikator partii Docelowa partia do odczytu. Użyj polecenia , aby wysłać zapytanie do wcześniejszego stanu zapytania. Partia musi zostać zatwierdzona, ale nie została jeszcze wyczyszczona. Prawidłowe wartości: liczby całkowite inne niż ujemne.
operatorId 0 Operator docelowy do odczytu. Użyj polecenia , gdy zapytanie ma wiele operatorów stanowych. Prawidłowe wartości: liczby całkowite inne niż ujemne.
storeName DEFAULT Docelowa nazwa magazynu stanów do odczytania. Użyj, gdy operator stanowy ma wiele wystąpień magazynu stanów. Należy określić sprzężenia storeName strumienia lub joinSide dla sprzężenia strumienia, ale nie obu tych elementów. Prawidłowe wartości: dowolny ciąg.
joinSide None Strona docelowa do odczytu ze sprzężenia strumienia. Należy określić sprzężenia storeName strumienia lub joinSide dla sprzężenia strumienia, ale nie obu tych elementów. Prawidłowe wartości: left, right.
snapshotStartBatchId None Identyfikator partii migawki do użycia jako punkt początkowy podczas odczytywania stanu. Czytnik ponownie kompiluje stan przez ponowne odtworzenie zmian z tej migawki do .batchId Przydatne, gdy migawka jest uszkodzona. Należy określić razem z parametrem snapshotPartitionId. Nie można używać z readChangeFeedprogramem . Obsługuje magazyn stanów oparty na systemie plików HDFS i magazyn stanów bazy danych RocksDB z włączonym tworzeniem punktów kontrolnych dziennika zmian. Dostępne w środowisku Databricks Runtime 15.4 LTS lub nowszym. Prawidłowe wartości: liczby całkowite inne niż ujemne.
snapshotPartitionId None Jeśli zostanie określony, zapytanie odczytuje tylko tę partycję. Należy określić razem z parametrem snapshotStartBatchId. Nie można używać z readChangeFeedprogramem . Dostępne w środowisku Databricks Runtime 15.4 LTS lub nowszym. Prawidłowe wartości: liczby całkowite inne niż ujemne.
readChangeFeed false Gdy truefunkcja zwraca zmiany stanu w określonym zakresie partii między changeStartBatchId i changeEndBatchId. Wymaga changeStartBatchId. Nie można używać z parametrami joinSide, batchId, snapshotStartBatchIdlub snapshotPartitionId. Dostępne w środowisku Databricks Runtime 16.4 LTS i nowszym. Prawidłowe wartości: true, false.
Aby uzyskać szczegółowe informacje, zobacz Odczyt zmian stanu przesyłania strumieniowego ze strukturą.
changeStartBatchId None Identyfikator początkowej partii dla zakresu zestawienia zmian. Wymagane, gdy readChangeFeed ma wartość true. Ma zastosowanie tylko wtedy, gdy readChangeFeed ustawiono wartość true. Dostępne w środowisku Databricks Runtime 16.4 LTS i nowszym. Prawidłowe wartości: liczby całkowite inne niż ujemne.
changeEndBatchId Najnowszy identyfikator partii Końcowy identyfikator partii dla zakresu zestawienia zmian. Musi być większe lub równe changeStartBatchId. Ma zastosowanie tylko wtedy, gdy readChangeFeed ustawiono wartość true. Dostępne w środowisku Databricks Runtime 16.4 LTS i nowszym. Prawidłowe wartości: liczby całkowite inne niż ujemne.
stateVarName None Nazwa zmiennej stanu do odczytania. Nazwa zmiennej stanu jest unikatową nazwą każdej zmiennej w init ramach funkcji używanej StatefulProcessortransformWithState przez operator. Wymagane w przypadku korzystania z transformWithState operatora . Dostępne w środowisku Databricks Runtime 16.4 LTS i nowszym. Prawidłowe wartości: dowolny ciąg.
readRegisteredTimers false Gdy trueparametr odczytuje zarejestrowane czasomierze używane przez transformWithState operatora. Dotyczy transformWithState tylko operatora . Dostępne w środowisku Databricks Runtime 16.4 LTS i nowszym. Prawidłowe wartości: true, false.
flattenCollectionTypes true Gdy truefunkcja spłaszcza rekordy zwracane dla zmiennych stanu mapy i listy. Gdy falseparametr zwraca rekordy jako spark SQL Array lub Map. Dotyczy transformWithState tylko operatora . Dostępne w środowisku Databricks Runtime 16.4 LTS i nowszym. Prawidłowe wartości: true, false.

Tekst

Key Domyślny Description
encoding UTF-8 Nazwa kodowania separatora wiersza pliku TEXT. Aby uzyskać listę opcji, zobacz java.nio.charset.Charset. Na zawartość pliku nie ma wpływu ta opcja i jest odczytywana as-is.
lineSep Brak, który obejmuje \r, \r\n i \n Ciąg między dwoma następującymi po sobie rekordami TEXT.
wholeText false Czy odczytywać plik jako pojedynczy rekord.

XML

Key Domyślny Description
rowTag None Tag wiersza określający, jak traktować pliki XML jako wiersz. W przykładowym pliku XML <books> <book><book>...<books>odpowiednia wartość to book. Ta opcja jest wymagana.
samplingRatio 1.0 Definiuje ułamek wierszy używanych do wnioskowania schematu. Wbudowane funkcje XML ignorują tę opcję. Prawidłowe wartości: 0.0 do 1.0.
excludeAttribute false Czy wykluczać atrybuty w elementach.
mode None Tryb radzenia sobie z uszkodzonymi rekordami podczas analizowania. PERMISSIVE: W przypadku uszkodzonych rekordów, źle sformułowany ciąg jest umieszczany w polu skonfigurowanym przez columnNameOfCorruptRecord, a źle sformułowane pola są ustawiane na null. Aby zachować uszkodzone rekordy, można ustawić string pole typu o nazwie columnNameOfCorruptRecord w schemacie zdefiniowanym przez użytkownika. Jeśli schemat nie ma pola, uszkodzone rekordy są porzucane podczas analizowania. Podczas wnioskowania schematu analizator niejawnie dodaje columnNameOfCorruptRecord pole w schemacie wyjściowym. DROPMALFORMED: ignoruje uszkodzone rekordy. Ten tryb nie jest obsługiwany dla wbudowanych funkcji XML. FAILFAST: zgłasza wyjątek, gdy analizator napotyka uszkodzone rekordy.
inferSchema true Jeśli true, program próbuje wywnioskować odpowiedni typ dla każdej wynikowej kolumny DataFrame. Jeśli false, to wszystkie wynikowe kolumny są typu string. Wbudowane funkcje XML ignorują tę opcję.
columnNameOfCorruptRecord spark.sql.columnNameOfCorruptRecord Umożliwia zmianę nazwy nowego pola zawierającego źle sformułowany ciąg utworzony przez PERMISSIVE tryb.
attributePrefix None Prefiks atrybutów do odróżnienia atrybutów od elementów. Będzie to prefiks nazw pól. Wartość domyślna to _. Może być pusty do odczytywania kodu XML, ale nie do zapisu. Dotyczy również opcji XML elementu DataFrameWriter.
valueTag _VALUE Tag używany dla danych znakowych w elementach, które zawierają również atrybuty lub elementy podrzędne. Użytkownik może określić pole valueTag w schemacie, lub zostanie ono dodane automatycznie podczas wnioskowania schematu, gdy dane znakowe są obecne w elementach posiadających inne elementy lub atrybuty. Dotyczy również opcji XML elementu DataFrameWriter.
encoding UTF-8 Do odczytu dekoduje pliki XML według danego typu kodowania. Na potrzeby pisania określa kodowanie (charset) zapisanych plików XML. Wbudowane funkcje XML ignorują tę opcję. Dotyczy również opcji XML elementu DataFrameWriter.
ignoreSurroundingSpaces true Czy należy pominąć odstępy otaczające wartości. Dane zawierające wyłącznie znaki odstępu są ignorowane.
rowValidationXSDPath None Ścieżka do opcjonalnego pliku XSD używanego do sprawdzania poprawności kodu XML dla każdego wiersza osobno. Wiersze, które nie mogą sprawdzić poprawności, są traktowane jak błędy analizy. XSD nie ma w inny sposób wpływu na schemat, niezależnie od tego, czy podano, czy wywnioskowany.
ignoreNamespace false Jeśli true, prefiksy przestrzeni nazw dla elementów i atrybutów XML są ignorowane. Tagi <abc:author> i <def:author>, na przykład są traktowane tak, jakby oba były tylko <author>. Przestrzenie nazw nie mogą być ignorowane w elemencie rowTag, tylko jego odczytywalne elementy podrzędne. Analizowanie kodu XML nie uwzględnia przestrzeni nazw, nawet jeśli false.
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] Niestandardowy format ciągu znacznika czasu zgodny ze wzorcem daty/godziny. Dotyczy to typu timestamp. Dotyczy również opcji XML elementu DataFrameWriter.
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] Łańcuch znaków formatu niestandardowego dla znacznika czasu bez strefy czasowej, zgodny ze wzorcem formatu daty-czasu. Dotyczy to typu TimestampNTZType. Dotyczy również opcji XML elementu DataFrameWriter.
dateFormat yyyy-MM-dd Niestandardowy ciąg formatu daty zgodny ze wzorcem daty/godziny . Dotyczy to typu daty. Dotyczy również opcji XML elementu DataFrameWriter.
locale en-US Ustawia lokalizację jako tag języka w formacie IETF BCP 47. Na przykład locale jest używany podczas analizowania dat i sygnatur czasowych.
nullValue ciąg null Ustawia łańcuch znaków reprezentujący wartość null. Gdy wartość tego parametru to null, analizator nie zapisuje atrybutów i elementów dla pól. Dotyczy również opcji XML elementu DataFrameWriter.
readerCaseSensitive true Określa zachowanie rozróżniania wielkości liter, gdy rescuedDataColumn jest włączone. Jeśli to prawda, należy uratować kolumny danych, których nazwy różnią się wielkością liter od schematu. Gdy wartość false, odczytuje dane w sposób niewrażliwy na wielkość liter.
rescuedDataColumn None Czy należy zebrać wszystkie dane, których nie można przeanalizować z powodu niezgodności typu danych i niezgodności schematu (w tym wielkości liter kolumn), w celu umieszczenia ich w oddzielnej kolumnie. Ta kolumna jest domyślnie dołączana podczas korzystania z modułu automatycznego ładowania. Aby uzyskać więcej informacji, zobacz Co to jest uratowana kolumna danych?. COPY INTO (starsza wersja) nie obsługuje uratowanych kolumn danych, ponieważ nie można ręcznie ustawić schematu przy użyciu COPY INTO. Databricks zaleca używanie Auto Loader w przypadku większości scenariuszy przyjmowania danych.
singleVariantColumn none Określa nazwę kolumny pojedynczego wariantu. Jeśli ta opcja jest określona do odczytu, przeanalizuj cały rekord XML w jedną kolumnę Wariant z daną wartością ciągu opcji jako nazwą kolumny. Jeśli ta opcja jest dostępna do zapisu, zapisz wartość pojedynczej kolumny Variant w plikach XML. Dotyczy również opcji XML elementu DataFrameWriter.
useLegacyXMLParser true Czy używać starszego analizatora XML. Starszy analizator ma mniej rygorystyczną walidację dla źle sformułowanej zawartości, ale jest mniej wydajna w pamięci. Ustaw wartość , aby false wybrać bardziej rygorystyczny domyślny analizator.
wildcardColName xs_any Nazwa kolumny używana do przechwytywania elementów XML pasujących do elementu schematu symbolu wieloznacznego (xs:any). Nie można używać razem z rescuedDataColumn.

Opcje DataStreamReader

Te opcje umożliwiają DataStreamReader.option() skonfigurowanie odczytów przesyłanych strumieniowo z tabel usługi Delta Lake i innych źródeł opartych na plikach.

Aby uzyskać opcje formatowania plików (JSON, CSV, Parquet i inne), zobacz Opcje elementu DataFrameReader.

Aby zapoznać się z opcjami automatycznego modułu ładującego (), zobacz Auto Loader (cloudFiles.*Automatyczne ładowanie).

Example

W poniższym przykładzie ustawiono wartość maxFilesPerTrigger10 dla strumienia tabeli usługi Delta Lake:

Python
df = spark.readStream.format("delta").option("maxFilesPerTrigger", 10).load("/path/to/delta-table")
Scala
val df = spark.readStream.format("delta").option("maxFilesPerTrigger", "10").load("/path/to/delta-table")

Wspólne

Poniższe opcje dotyczą tabel usługi Delta Lake i innych źródeł przesyłania strumieniowego opartego na plikach.

Key Domyślny Description
cleanSource off Jak obsługiwać pliki źródłowe po ich przetworzeniu przez strumień. Prawidłowe wartości: off (bez akcji), delete (trwale usuń plik źródłowy), archive (przejdź do .sourceArchiveDir W przypadku ustawienia parametru należy również ustawić archivesourceArchiveDir wartość . Nie dotyczy przesyłania strumieniowego tabel usługi Delta Lake.
fileNameOnly false Czy zidentyfikować już przetworzone pliki tylko według nazwy pliku, a nie przez pełną ścieżkę. Gdy truepliki w różnych ścieżkach o tej samej nazwie pliku są traktowane jako ten sam plik i nie są ponownie przetwarzane. Nie dotyczy przesyłania strumieniowego tabel usługi Delta Lake.
latestFirst false Czy należy najpierw przetwarzać ostatnio zmodyfikowane pliki w ramach każdej mikrosadowej. Przydatne, gdy chcesz jak najszybciej przetworzyć najnowsze dane. Gdy true ustawienie i maxFilesPerTrigger lub maxBytesPerTrigger jest ustawione, maxFileAge jest ignorowane. Nie dotyczy przesyłania strumieniowego tabel usługi Delta Lake.
maxBytesPerTrigger None Miękka maksymalna ilość danych przetwarzanych dla każdej mikrosadowej partii. Partia może przetworzyć więcej niż limit, jeśli najmniejsza jednostka wejściowa przekroczy ją. W przypadku użycia razem z elementem maxFilesPerTriggermikrosadowe przetwarza dane do momentu osiągnięcia pierwszego limitu. Prawidłowe wartości: dodatnie liczby całkowite.
Zamiast tego użyj cloudFiles.maxBytesPerTrigger dla Auto Loader. Zobacz Typowe.
maxCachedFiles 10000 Maksymalna liczba nieprzetworzonych plików do pamięci podręcznej dla kolejnych mikrosadów. Ustaw wartość , aby 0 wyłączyć buforowanie. Zwiększ tę wartość, gdy katalog źródłowy zawiera wiele nowych plików dla każdego wyzwalacza. Nie dotyczy przesyłania strumieniowego tabel usługi Delta Lake. Prawidłowe wartości: dodatnie liczby całkowite lub 0.
maxFileAge 7d Maksymalny wiek plików rozważanych do przetwarzania w stosunku do znacznika czasu ostatnio zmodyfikowanego pliku, a nie bieżącego czasu systemowego. Pliki starsze niż ten próg są ignorowane. Akceptuje ciągi czasu trwania, takie jak 7d lub 4h. Ignorowane, gdy latestFirst wartość jest true ustawiona lub maxFilesPerTriggermaxBytesPerTrigger jest ustawiona. Nie dotyczy przesyłania strumieniowego tabel usługi Delta Lake.
maxFilesPerTrigger 1000 dla usług Delta Lake i Auto Loader. Brak maksymalnej wartości dla innych źródeł opartych na plikach. Górna granica liczby nowych plików przetworzonych w każdej mikrosadowej partii. W przypadku użycia razem z elementem maxBytesPerTriggermikrosadowe przetwarza dane do momentu osiągnięcia pierwszego limitu. Prawidłowe wartości: dodatnie liczby całkowite.
Zamiast tego użyj cloudFiles.maxFilesPerTrigger dla Auto Loader. Zobacz Typowe.
sourceArchiveDir None Ścieżka do katalogu archiwum, gdy cleanSource jest ustawiona na archivewartość . Pliki źródłowe są przenoszone do tej ścieżki po przetworzeniu, zachowując ich względną strukturę katalogów. Nie dotyczy przesyłania strumieniowego tabel usługi Delta Lake.

Moduł automatycznego ładowania

Użyj tych opcji ze cloudFiles źródłem, aby skonfigurować moduł automatycznego ładowania na potrzeby pozyskiwania danych przesyłanych strumieniowo z magazynu w chmurze. Opcje specyficzne dla cloudFiles źródła są poprzedzone prefiksem cloudFiles , aby zachować je w oddzielnej przestrzeni nazw od innych opcji źródła przesyłania strumieniowego ze strukturą .

Wspólne

Key Domyślny Description
cloudFiles.allowOverwrites false Czy zezwolić na zmianę pliku katalogu wejściowego w celu zastąpienia istniejących danych.
Aby uzyskać zastrzeżenia dotyczące konfiguracji, zobacz Czy funkcja automatycznego ładowania ponownie przetwarza plik, gdy plik zostanie dołączony lub zastąpiony?.
cloudFiles.backfillInterval None Auto Loader może wyzwalać asynchroniczne wypełnienia w określonym przedziale czasu. Na przykład 1 day do wypełniania kopii zapasowych codziennie lub 1 week do wypełniania kopii zapasowych co tydzień. Aby uzyskać więcej informacji, zobacz Wyzwalanie regularnych wypełniania przy użyciu pliku cloudFiles.backfillInterval.
Nie używaj, gdy cloudFiles.useManagedFileEvents jest ustawione na true.
cloudFiles.cleanSource OFF Czy automatycznie usuwać przetworzone pliki z katalogu wejściowego. W przypadku ustawienia wartości OFF (wartość domyślna) żadne pliki nie są usuwane.
Gdy jest ustawiona wartość DELETE, Auto Loader automatycznie usuwa pliki 30 dni po ich przetworzeniu. W tym celu moduł automatycznego ładowania musi mieć uprawnienia do zapisu w katalogu źródłowym.
Gdy jest ustawiona wartość MOVE, automatycznie moduł ładujący przenosi pliki do określonej lokalizacji w ciągu cloudFiles.cleanSource.moveDestination 30 dni po ich przetworzeniu. W tym celu Auto Loader musi mieć uprawnienia do zapisu w katalogu źródłowym, a także do lokalizacji docelowej.
Plik jest traktowany jako przetwarzany, gdy ma wartość commit_time inną niż null w wyniku cloud_files_state funkcji wartości tabeli. Zobacz cloud_files_state funkcji wartości tabeli. 30-dniowe dodatkowe oczekiwanie po zakończeniu przetwarzania można skonfigurować przy użyciu polecenia cloudFiles.cleanSource.retentionDuration.
Przed włączeniem cloudFiles.cleanSourcefunkcji zapoznaj się z następującymi zagadnieniami:
  • Azure Databricks nie zaleca korzystania z tej opcji, jeśli istnieje wiele strumieni zużywających dane z lokalizacji źródłowej, ponieważ najszybszy użytkownik usunie pliki i nie będzie pozyskiwany w wolniejszych źródłach.
  • Włączenie tej funkcji wymaga automatycznego modułu ładującego do zachowania dodatkowego stanu w punkcie kontrolnym, co powoduje narzut na wydajność, ale zapewnia lepszą możliwość obserwowania za pośrednictwem cloud_files_state funkcji wartości tabeli. Zobacz cloud_files_state funkcji wartości tabeli.
  • cleanSource używa bieżącego ustawienia, aby zdecydować, czy ma to być MOVEDELETE dany plik. Załóżmy na przykład, że ustawienie było MOVE wtedy, gdy plik został pierwotnie przetworzony, ale został zmieniony na DELETE , gdy plik stał się kandydatem do oczyszczenia 30 dni później. W takim przypadku cleanSource usunie plik.
  • Nie ma gwarancji, że pliki zostaną wyczyszczone zaraz po retentionDuration wygaśnięciu. Aby utrzymać niskie koszty, moduł ładujący automatycznie usuwa pliki jednocześnie z przetwarzaniem strumienia i kończy się natychmiast po zakończeniu przetwarzania strumienia lub zakończeniu. Pliki, które były kandydatami do oczyszczenia, ale nie można ich wyczyścić podczas przetwarzania strumienia zostaną pobrane przy następnym uruchomieniu automatycznego modułu ładującego.

Dostępne w środowisku Databricks Runtime 16.4 lub nowszym.
cloudFiles.cleanSource.retentionDuration 30 days Czas oczekiwania, zanim przetworzone pliki staną się kandydatami do archiwizacji za pomocą polecenia cleanSource. Wartość musi być większa niż 7 dni dla elementu DELETE. Brak minimalnego ograniczenia dla elementu MOVE.
Wartość jest ciągiem CalendarInterval . Na przykład , "14 days", "30 days", "2 weeks"lub "1 month".
Dostępne w środowisku Databricks Runtime 16.4 lub nowszym.
cloudFiles.cleanSource.moveDestination None Ścieżka do archiwizacji przetworzonych plików, gdy cloudFiles.cleanSource jest ustawiony na MOVE. Może to być ścieżka magazynu w chmurze lub ścieżka woluminu wykazu aparatu Unity (na przykład /Volumes/my_catalog/my_schema/my_volume/archive/).
Lokalizacja przenoszenia musi:
  • Nie jest elementem podrzędnym katalogu źródłowego. Jeśli umieścisz miejsce docelowe przenoszenia wewnątrz katalogu źródłowego, zarchiwizowane pliki zostaną ponownie pozyskane.
  • Bądź w tej samej lokalizacji zewnętrznej, woluminie lub instalacji systemu plików DBFS co źródło. Przenoszenie między zasobnikami i między kontenerami nie jest obsługiwane i powoduje wystąpienie błędu.

Automatyczny moduł ładujący musi mieć uprawnienia do zapisu w tym katalogu.
Dostępne w środowisku Databricks Runtime 16.4 lub nowszym.
cloudFiles.format Brak (wymagana opcja) Format pliku danych w ścieżce źródłowej. Prawidłowe wartości to:
cloudFiles.includeExistingFiles true Czy dołączyć istniejące pliki do ścieżki wejściowej przetwarzania strumienia, czy tylko przetworzyć nowe pliki przychodzące po wstępnej konfiguracji. Ta opcja jest oceniana tylko wtedy, gdy uruchamiasz strumień po raz pierwszy. Zmiana tej opcji po ponownym uruchomieniu strumienia nie ma żadnego wpływu.
cloudFiles.inferColumnTypes false Czy wywnioskować dokładne typy kolumn podczas korzystania z wnioskowania schematu. Domyślnie kolumny są wnioskowane jako ciągi podczas wnioskowania zestawów danych JSON i CSV. Aby uzyskać więcej informacji, zobacz wnioskowanie schematu .
cloudFiles.maxBytesPerTrigger None Maksymalna liczba nowych bajtów do przetworzenia w każdym wyzwalaczu. Można określić ciąg bajtów, taki jak 10g, aby ograniczyć każdą mikropartię do 10 GB danych. Jest to łagodne maksimum. Jeśli masz pliki o rozmiarze 3 GB, usługa Azure Databricks przetwarza 12 GB w mikrobajtach. W przypadku użycia razem z usługą cloudFiles.maxFilesPerTrigger, Azure Databricks zużywa do niższego limitu cloudFiles.maxFilesPerTrigger lub cloudFiles.maxBytesPerTrigger, w zależności od tego, który pierwszy zostanie osiągnięty. Ta opcja nie ma żadnego wpływu, gdy jest używana z Trigger.Once() (Trigger.Once() jest przestarzałe).
W środowisku Databricks Runtime 18.0 lub nowszym ta opcja jest konfigurowana dynamicznie i nie musi być ustawiana ręcznie.
cloudFiles.maxFileAge None Jak długo zdarzenie dotyczące pliku jest śledzone w celach deduplikacji. Usługa Databricks nie zaleca dostrajania tego parametru, chyba że przetwarzasz dane w ilości milionów plików na godzinę. Aby uzyskać więcej informacji, zobacz sekcję Dotyczącą śledzenia zdarzeń plików .
Zbyt agresywne dostrajanie cloudFiles.maxFileAge może powodować problemy z jakością danych, takie jak powtórne pobieranie danych lub brakujące pliki. W związku z tym Databricks rekomenduje ustawienie konserwatywne dla cloudFiles.maxFileAge, takie jak 90 dni, co jest podobne do zaleceń porównywalnych rozwiązań do pozyskiwania danych.
cloudFiles.maxFilesPerTrigger 1000 Maksymalna liczba nowych plików do przetworzenia w każdym wyzwalaczu. W przypadku użycia razem z usługą cloudFiles.maxBytesPerTrigger, Azure Databricks zużywa do niższego limitu cloudFiles.maxFilesPerTrigger lub cloudFiles.maxBytesPerTrigger, w zależności od tego, który pierwszy zostanie osiągnięty. Ta opcja nie ma żadnego wpływu w przypadku użycia z opcją Trigger.Once() (przestarzałe).
W środowisku Databricks Runtime 18.0 lub nowszym ta opcja jest konfigurowana dynamicznie i nie musi być ustawiana ręcznie.
cloudFiles.partitionColumns None Rozdzielona przecinkami lista kolumn partycji w stylu hive, które mają być wywnioskowane ze struktury katalogów plików. Kolumny partycji w stylu hive to pary klucz-wartość połączone znakiem równości, takim jak <base-path>/a=x/b=1/c=y/file.format. W tym przykładzie kolumny partycji to a, bi c. Domyślnie te kolumny są automatycznie dodawane do schematu, jeśli używasz wnioskowania schematu i udostępniasz element <base-path> do ładowania danych. Jeśli podasz schemat, moduł automatycznego ładowania oczekuje, że te kolumny zostaną uwzględnione w schemacie. Jeśli nie chcesz, aby te kolumny były częścią schematu, możesz określić "" , aby ignorować te kolumny. Ponadto możesz użyć tej opcji, jeśli chcesz, aby kolumny mogły być wywnioskowane ścieżką pliku w złożonych strukturach katalogów, podobnie jak w poniższym przykładzie:
<base-path>/year=2022/week=1/file1.csv
<base-path>/year=2022/month=2/day=3/file2.csv
<base-path>/year=2022/month=2/day=4/file3.csv
Określenie cloudFiles.partitionColumns jako year,month,day zwraca year=2022 dla file1.csv, ale kolumny month i daynull.
month i day są poprawnie analizowane dla file2.csv i file3.csv.
cloudFiles.schemaEvolutionMode addNewColumns gdy schemat nie jest podany, none w przeciwnym razie Tryb ewolucji schematu w miarę odnajdowania nowych kolumn w danych. Domyślnie kolumny są wnioskowane jako ciągi podczas wnioskowania zestawów danych JSON. Aby uzyskać więcej informacji, zobacz Ewolucja schematu .
cloudFiles.schemaHints None Informacje o schemacie, które podajesz do Auto Loader podczas wnioskowania schematu. Aby uzyskać więcej szczegółów, zobacz wskazówki dotyczące schematu .
cloudFiles.schemaLocation Brak (wymagane do wywnioskowania schematu) Lokalizacja do przechowywania wywnioskowanych schematów i kolejnych zmian. Aby uzyskać więcej informacji, zobacz wnioskowanie schematu .
cloudFiles.useStrictGlobber false Czy używać restrykcyjnego wzorca globowania, który odpowiada domyślnemu zachowaniu globbingu innych źródeł plików w Apache Spark. Aby uzyskać więcej informacji, zobacz Typowe wzorce ładowania danych . Dostępne w środowisku Databricks Runtime 12.2 LTS lub nowszym.
cloudFiles.validateOptions true Czy zweryfikować opcje "Auto Loader" i zwrócić błąd dla nieznanych lub niespójnych opcji.

Lista katalogów

Key Domyślny Description
cloudFiles.useIncrementalListing (przestarzałe) auto w środowisku Databricks Runtime 17.2 lub nowszym w false środowisku Databricks Runtime 17.3 lub nowszym Ta funkcja jest przestarzała. Databricks zaleca użycie trybu powiadomień plików ze zdarzeniami plików zamiast cloudFiles.useIncrementalListing.
Czy w trybie listowania katalogów używać listy przyrostowej zamiast pełnej listy. Domyślnie moduł automatycznego ładowania umożliwia automatyczne wykrywanie, czy dany katalog ma zastosowanie do listy przyrostowej. Można jawnie użyć listy przyrostowej lub użyć pełnej listy katalogów, ustawiając ją odpowiednio jako true lub false .
Błędne włączenie listy przyrostowej w nieuporządkowanym leksykalnie katalogu uniemożliwia funkcjonowanie modułu Auto Loader w odnajdywaniu nowych plików.
Współpracuje z usługą Azure Data Lake Storage (abfss://), S3 (s3://) i GCS (gs://).
Dostępne w środowisku Databricks Runtime 9.1 LTS lub nowszym.
Dostępne wartości: auto, , truefalse

Powiadomienie o pliku

Aby uzyskać informacje na temat konfigurowania trybu powiadomień plików, w tym wymaganych uprawnień do chmury, instrukcji konfiguracji i metod uwierzytelniania, zobacz Konfigurowanie strumieni automatycznego modułu ładującego w trybie powiadomień plików.

Key Domyślny Description
cloudFiles.fetchParallelism 1 Liczba wątków używanych do pobierania komunikatów z usługi kolejkowania.
Nie używaj, gdy cloudFiles.useManagedFileEvents jest ustawione na true.
cloudFiles.pathRewrites None Wymagane tylko w przypadku określenia queueUrl , który odbiera powiadomienia o plikach z wielu zasobników S3 i chcesz użyć punktów instalacji skonfigurowanych do uzyskiwania dostępu do danych w tych kontenerach. Użyj tej opcji, aby przepisać prefiks ścieżki bucket/key tak, aby odpowiadał punktowi montowania. Można przepisać tylko prefiksy. Na przykład w przypadku konfiguracji {"<databricks-mounted-bucket>/path": "dbfs:/mnt/data-warehouse"}ścieżka s3://<databricks-mounted-bucket>/path/2017/08/fileA.json zostanie przepisana na dbfs:/mnt/data-warehouse/2017/08/fileA.json.
Nie używaj, gdy cloudFiles.useManagedFileEvents jest ustawione na true.
cloudFiles.resourceTag None Seria par tagów klucz-wartość, które ułatwiają kojarzenie i identyfikowanie powiązanych zasobów, na przykład:
cloudFiles.option("cloudFiles.resourceTag.myFirstKey", "myFirstValue")
.option("cloudFiles.resourceTag.mySecondKey", "mySecondValue")
Aby uzyskać więcej informacji na temat AWS, zobacz Tagi alokacji kosztów Amazon SQS oraz Konfigurowanie tagów dla tematu Amazon SNS. (1)
Aby uzyskać więcej informacji na temat platformy Azure, zobacz Nazewnictwo kolejek i metadanych oraz omówienie properties.labels w subskrypcjach zdarzeń. Moduł automatycznego ładowania przechowuje te pary tagów klucz-wartość w formacie JSON jako etykiety. (1)
Aby uzyskać więcej informacji na temat GCP, zobacz Raportowanie użycia z etykietami. (1)
Nie używaj, gdy cloudFiles.useManagedFileEvents jest ustawione na true. Zamiast tego należy ustawić tagi zasobów przy użyciu konsoli dostawcy usług w chmurze.
cloudFiles.useManagedFileEvents false Gdy jest ustawiona wartość true, funkcja automatycznego modułu ładującego używa usługi zdarzeń plików do odnajdywania plików w lokalizacji zewnętrznej. Tej opcji można użyć tylko wtedy, gdy ścieżka ładowania znajduje się w lokalizacji zewnętrznej z włączonymi zdarzeniami plików. Zobacz Use file notification mode with file events (Używanie trybu powiadomień plików ze zdarzeniami plików).
Zdarzenia plików zapewniają wydajność na poziomie powiadomień podczas odnajdywania plików, ponieważ moduł automatycznego ładowania może odnaleźć nowe pliki po ostatnim uruchomieniu. W przeciwieństwie do listy katalogów ten proces nie musi zawierać listy wszystkich plików w katalogu.
W niektórych sytuacjach funkcja automatycznego ładowania używa listy katalogów, mimo że opcja zdarzeń plików jest włączona:
  • Podczas początkowego ładowania, gdy includeExistingFiles jest ustawiony na true, następuje pełne przeszukanie katalogu w celu odnalezienia wszystkich plików, które były obecne w katalogu przed uruchomieniem Auto Loader.
  • Usługa zdarzeń plików optymalizuje odnajdywanie plików przez buforowanie ostatnio utworzonych plików. Jeśli automatyczne ładowanie jest uruchamiane rzadko, ta pamięć podręczna może wygasnąć, a moduł automatycznego ładowania powróci do listy katalogów w celu odnalezienia plików i zaktualizowania pamięci podręcznej. Aby uniknąć tego scenariusza, należy wywołać moduł automatycznego ładowania co najmniej raz na siedem dni.

Zobacz Kiedy program Auto Loader ze zdarzeniami plików używa listy katalogów? aby uzyskać pełną listę sytuacji, w których program Auto loader używa listy katalogów z tą opcją.
Dostępne w środowisku Databricks Runtime 14.3 LTS lub nowszym.
cloudFiles.listOnStart false Gdy jest ustawiona wartość true, moduł automatycznego ładowania wykonuje pełną listę katalogów po uruchomieniu strumienia, zamiast rozpoczynać się od tokenu kontynuacji w punkcie kontrolnym. Użyj tej opcji, aby odzyskać dane po błędach, takich jak CF_MANAGED_FILE_EVENTS_INVALID_CONTINUATION_TOKEN. Zobacz Jak mogę odzyskać odzyskiwanie po CF_MANAGED_FILE_EVENTS_INVALID_CONTINUATION_TOKEN błędzie?.
cloudFiles.useNotifications false Czy używać trybu powiadomień plików do określenia, kiedy istnieją nowe pliki. Jeśli false, użyj trybu wylistowania katalogu. Zobacz Porównanie trybów wykrywania plików automatycznego modułu ładującego.
Nie używaj, gdy cloudFiles.useManagedFileEvents jest ustawione na true.

(1) Moduł automatycznego ładowania domyślnie dodaje następujące pary tagów klucz-wartość:

  • vendor: Databricks
  • path: lokalizacja, z której są ładowane dane. Niedostępne w GCP z powodu ograniczeń etykietowania.
  • checkpointLocation: lokalizacja punktu kontrolnego strumienia. Niedostępne w GCP z powodu ograniczeń etykietowania.
  • streamId: globalnie unikatowy identyfikator strumienia.

Usługa Databricks rezerwuje te nazwy kluczy i nie można zastąpić ich wartości.

Specyficzne dla chmury

Moduł automatycznego ładowania udostępnia opcje konfigurowania infrastruktury chmury dla trybu powiadomień plików. Aby uzyskać wymagane uprawnienia do chmury i instrukcje dotyczące konfiguracji, zobacz Konfigurowanie strumieni automatycznego ładowania w trybie powiadomień plików.

AWS

Podaj następujące opcje tylko wtedy, gdy wybierzesz cloudFiles.useNotifications = true i chcesz, aby Auto Loader skonfigurował usługi powiadomień za Ciebie:

Key Domyślny Description
cloudFiles.region Region wystąpienia usługi EC2 Region, w którym znajduje się źródłowy zasobnik S3 i gdzie chcesz utworzyć usługi AWS SNS i SQS.
Key Domyślny Description
cloudFiles.restrictNotificationSetupToSameAWSAccountId false Zezwalaj tylko na powiadomienia o zdarzeniach z zasobników usługi AWS S3 na tym samym koncie co temat SNS. Jeśli to prawda, moduł automatycznego ładowania akceptuje tylko powiadomienia o zdarzeniach z zasobników usługi AWS S3 na tym samym koncie co temat SNS.
Gdy false zasady dostępu nie ograniczają konfiguracji zasobników między kontami i tematyki SNS. Jest to przydatne, gdy temat usługi SNS i ścieżka do zasobnika są skojarzone z różnymi kontami.
Dostępne w środowisku Databricks Runtime 17.2 lub nowszym.

Podaj następującą opcję tylko wtedy, gdy wybierzesz cloudFiles.useNotifications = true i chcesz, aby Automatyczny Moduł Ładujący używał kolejki, którą już skonfigurowałeś:

Key Domyślny Description
cloudFiles.queueUrl None Adres URL kolejki SQS. Jeśli jest podana, Auto Loader bezpośrednio pobiera zdarzenia z tej kolejki zamiast konfigurowania własnych usług AWS SNS i SQS.

Opcje uwierzytelniania platformy AWS

Podaj następującą opcję uwierzytelniania, aby użyć poświadczeń usługi Databricks:

Key Domyślny Description
databricks.serviceCredential None Nazwa poświadczenia dla usługi Databricks . Dostępne w środowisku Databricks Runtime 16.1 lub nowszym.

Jeśli poświadczenia usługi Databricks lub role IAM nie są dostępne, możesz zamiast tego podać następujące opcje uwierzytelniania:

Key Domyślny Description
cloudFiles.awsAccessKey None Identyfikator klucza dostępu platformy AWS dla użytkownika. Musi być dostarczany z cloudFiles.awsSecretKey.
cloudFiles.awsSecretKey None Tajny klucz dostępu AWS dla użytkownika. Musi być dostarczany z cloudFiles.awsAccessKey.
cloudFiles.roleArn None ARN roli IAM, którą należy przyjąć, w razie potrzeby. Rolę można przyjąć z profilu wystąpienia klastra lub podając poświadczenia za pomocą cloudFiles.awsAccessKey i cloudFiles.awsSecretKey.
cloudFiles.roleExternalId None Identyfikator do podania podczas przyjmowania roli przy użyciu cloudFiles.roleArn.
cloudFiles.roleSessionName None Opcjonalna nazwa sesji do użycia podczas zakładania roli przy użyciu polecenia cloudFiles.roleArn.
cloudFiles.stsEndpoint None Opcjonalny punkt końcowy, który umożliwia uzyskanie dostępu do usługi AWS STS podczas przyjmowania roli przy użyciu cloudFiles.roleArn.
Azure

Musisz podać wartości dla wszystkich następujących opcji, jeśli określisz cloudFiles.useNotifications = true i chcesz, aby Auto Loader skonfigurował usługi powiadomień.

Key Domyślny Description
cloudFiles.resourceGroup None Grupa zasobów Azure, w której jest tworzone konto magazynu.
cloudFiles.subscriptionId None Identyfikator subskrypcji Azure, w którym jest tworzona grupa zasobów.
databricks.serviceCredential None Nazwa poświadczenia dla usługi Databricks . Dostępne w środowisku Databricks Runtime 16.1 lub nowszym.

Jeśli poświadczenie usługi Databricks jest niedostępne, możesz zamiast tego podać następujące opcje uwierzytelniania:

Key Domyślny Description
cloudFiles.clientId None Identyfikator klienta lub identyfikator aplikacji jednostki usługi.
cloudFiles.clientSecret None Tajny klucz klienta pryncypała usługi.
cloudFiles.connectionString None Łańcuch połączenia dla konta przechowywania, oparty na kluczu dostępu do konta lub sygnaturze dostępu współdzielonego (SAS).
cloudFiles.tenantId None Identyfikator dzierżawy Azure, w którym jest tworzona jednostka usługi.

Podaj następującą opcję tylko w przypadku ustawienia cloudFiles.useNotifications = true i chcesz, aby moduł automatycznego ładowania używał istniejącej kolejki:

Key Domyślny Description
cloudFiles.queueName None Nazwa kolejki platformy Azure. Jeśli zostanie to podane, źródło plików w chmurze bezpośrednio odbiera zdarzenia z tej kolejki, zamiast konfigurować własne rozwiązania Azure Event Grid i Queue Storage. W takim przypadku databricks.serviceCredential lub cloudFiles.connectionString wymagają tylko uprawnień do odczytu w kolejce.
GCP

Auto Loader może automatycznie skonfigurować usługi powiadomień, wykorzystując poświadczenia serwisowe Databricks. Konto usługi utworzone przy użyciu poświadczeń usługi Databricks będzie wymagać uprawnień określonych w temacie Konfigurowanie strumieni automatycznego ładowania w trybie powiadomień plików.

Key Domyślny Description
cloudFiles.projectId None Identyfikator projektu, w którego znajduje się zasobnik GCS. Subskrypcja Google Cloud Pub/Sub jest również tworzona w ramach tego projektu.
databricks.serviceCredential None Nazwa poświadczenia dla usługi Databricks . Dostępne w środowisku Databricks Runtime 16.1 lub nowszym.

Jeśli poświadczenie usługi Databricks jest niedostępne, możesz użyć kont usług Google bezpośrednio. Klaster można skonfigurować tak, aby zakładał konto usługi, postępując zgodnie z konfiguracją usługi Google lub bezpośrednio podać następujące opcje uwierzytelniania:

Key Domyślny Description
cloudFiles.client None Identyfikator klienta konta usługi Google.
cloudFiles.clientEmail None Adres e-mail konta usługi Google.
cloudFiles.privateKey None Klucz prywatny wygenerowany dla konta usługi Google.
cloudFiles.privateKeyId None Identyfikator klucza prywatnego wygenerowanego dla konta usługi Google.

Podaj następującą opcję tylko wtedy, gdy wybierzesz cloudFiles.useNotifications = true i chcesz, aby Automatyczny Moduł Ładujący używał kolejki, którą już skonfigurowałeś:

Key Domyślny Description
cloudFiles.subscription None Nazwa subskrypcji Google Cloud Pub/Sub. Jeśli zostanie dostarczone, źródło plików w chmurze korzysta ze zdarzeń z tej kolejki zamiast konfigurowania własnych usług GCS Notification i Google Cloud Pub/Sub.

Delta Lake

Poniższe opcje mają zastosowanie podczas odczytywania z tabeli usługi Delta Lake przy użyciu polecenia spark.readStream.

Key Domyślny Description
allowSourceColumnDrop None Ustaw numer wersji tabeli delty lub "always" zezwalaj strumieniu na kontynuowanie po usunięciu kolumn ze schematu tabeli źródłowej. Po ustawieniu numeru wersji potwierdza, że wszystkie schematy zmieniają się do tej wersji. Wymaga schemaTrackingLocation. Zobacz Zmień nazwę i usuń kolumny za pomocą mapowania kolumn Delta Lake.
allowSourceColumnRename None Ustaw numer wersji tabeli delty lub "always" zezwalaj strumieniu na kontynuowanie po zmianie nazwy kolumn w tabeli źródłowej. Po ustawieniu numeru wersji potwierdza, że wszystkie schematy zmieniają się do tej wersji. Wymaga schemaTrackingLocation. Zobacz Zmień nazwę i usuń kolumny za pomocą mapowania kolumn Delta Lake.
allowSourceColumnTypeChange None Ustaw numer wersji tabeli delty lub "always" zezwalaj strumieniu na kontynuowanie po zmianie typów kolumn w tabeli źródłowej. Po ustawieniu numeru wersji potwierdza, że wszystkie schematy zmieniają się do tej wersji. Wymaga schemaTrackingLocation. Sprawdź Rozszerzanie typu.
excludeRegex None Wzorzec wyrażenia regularnego. Pliki, których ścieżki są zgodne ze wzorcem, są wykluczone z odczytu przesyłania strumieniowego. Przydatne do filtrowania plików, które nie są zgodne z oczekiwaną konwencją nazewnictwa.
failOnDataLoss true Czy zapytanie przesyłania strumieniowego nie powiodło się, jeśli dane źródłowe zostały usunięte z powodu przechowywania dziennika (logRetentionDuration). Ustaw wartość , aby false pominąć brakujące dane i kontynuować przetwarzanie. Zobacz Konfigurowanie przechowywania danych dla zapytań dotyczących podróży w czasie.
ignoreChanges (przestarzałe) false Dostępne w środowisku Databricks Runtime 11.3 LTS i niższym. Ponownie emituje ponownie pliki danych po operacjach modyfikacji, takich jak UPDATE, MERGE INTO, DELETElub OVERWRITE. Niezmienione wiersze mogą być emitowane obok nowych wierszy, więc odbiorcy podrzędni muszą obsługiwać duplikaty. Usunięcia nie są propagowane w dół. Zastąpiono elementem skipChangeCommits w środowisku Databricks Runtime 12.2 LTS i nowszym.
ignoreDeletes (przestarzałe) false Ignoruje transakcje, które usuwają dane na granicach partycji (tylko pełna partycja spada). Nie obsługuje usuwania, aktualizacji ani innych modyfikacji niezwiązanych z partycjami. Użyj skipChangeCommits zamiast tego.
readChangeFeed lub readChangeData false Czy włączyć odczytywanie zestawienia danych zmian dla zapytania przesyłania strumieniowego. Po włączeniu strumień emituje zmiany na poziomie wiersza (wstawia, aktualizuje i usuwa) z dodatkowymi kolumnami metadanych. Zobacz Korzystanie z przepływu danych zmian w Delta Lake na platformie Azure Databricks.
schemaTrackingLocation None Ścieżka do katalogu, w którym usługa Delta Lake śledzi zmiany schematu dla odczytu przesyłania strumieniowego. Wymagane w przypadku przesyłania strumieniowego z tabel z włączonym mapowaniem kolumn i używaniem allowSourceColumn* opcji do obsługi ewolucji schematu. Musi znajdować się w obrębie checkpointLocation zapytania przesyłania strumieniowego. Zobacz Zmień nazwę i usuń kolumny za pomocą mapowania kolumn Delta Lake.
skipChangeCommits false Ignoruje transakcje, które usuwają lub modyfikują istniejące rekordy i procesy tylko dołączają. Usługa Databricks zaleca tę opcję w przypadku większości obciążeń, które nie używają zestawienia zmian danych. Dostępne w środowisku Databricks Runtime 12.2 LTS lub nowszym. Zobacz Pomijanie zatwierdzeń zmian nadrzędnych za pomocą polecenia skipChangeCommits.
startingTimestamp Najnowsza dostępna Sygnatura czasowa rozpoczęcia odczytywania od. Strumień odczytuje wszystkie zmiany tabeli zatwierdzone w godzinie lub po określonym znaczniku czasu. Jeśli znacznik czasu poprzedza wszystkie dostępne zatwierdzenia tabeli, strumień rozpoczyna się od najwcześniejszego dostępnego zatwierdzenia. Nie można używać razem z startingVersion. Ignorowane, jeśli punkt kontrolny przesyłania strumieniowego już istnieje.
Prawidłowe wartości: ciąg znacznika czasu, taki jak lub ciąg daty, taki jak "2019-01-01T00:00:00.000Z""2019-01-01".
startingVersion Najnowsza dostępna Wersja tabeli delty do rozpoczęcia odczytywania. Strumień odczytuje wszystkie zmiany zatwierdzone w określonej wersji lub po tej wersji. Określ "latest" , aby rozpocząć od tylko najnowszych zmian. Nie można używać razem z startingTimestamp. Ignorowane, jeśli punkt kontrolny przesyłania strumieniowego już istnieje. Zobacz Praca z historią tabel.
withEventTimeOrder false Dzieli początkową migawkę tabeli na przedziały czasu zdarzenia, aby zapobiec nieprawidłowemu oznaczeniu rekordów jako opóźnione zdarzenia i porzuceniu w zapytaniach stanowych z znakami wodnymi. Nie można zmienić po rozpoczęciu początkowego przetwarzania migawki bez usuwania punktu kontrolnego. Dostępne w środowisku Databricks Runtime 11.3 LTS i nowszym. Patrz Przetwarzanie początkowego zrzutu bez usuwania danych.

Kafka

Użyj tych opcji z opcją spark.readStream.format("kafka") lub spark.read.format("kafka"):

Key Domyślny Description
assign None Określone partycje do korzystania. Musisz określić dokładnie jedną z subscribeopcji , subscribePatternlub assign . Prawidłowe wartości: ciąg JSON, taki jak {"topicA":[0,1],"topicB":[2,4]}.
failOnDataLoss true Czy zapytanie nie powiodło się, jeśli dane mogły zostać utracone, na przykład z powodu usuniętych tematów lub obcinania przesunięcia. Ustaw wartość , aby false pominąć brakujące dane i kontynuować. Prawidłowe wartości: true, false.
Usługa Databricks szacuje konserwatywnie, czy dane mogły zostać utracone. Może to jednak spowodować fałszywe alarmy.
fetchoffset.numretries 3 Liczba ponownych prób podczas pobierania przesunięć platformy Kafka kończy się niepowodzeniem. Prawidłowe wartości: liczby całkowite inne niż ujemne.
fetchoffset.retryintervalms 1000 Interwał w milisekundach między ponawianiami pobierania przesunięcia. Prawidłowe wartości: liczby całkowite inne niż ujemne.
groupIdPrefix spark-kafka-source (przesyłanie strumieniowe), spark-kafka-relation (partia) Dostosowany prefiks do użycia dla automatycznie generowanego identyfikatora grupy odbiorców platformy Kafka. Jeśli kafka.group.id jest jawnie ustawiona, łącznik ignoruje tę opcję. Prawidłowe wartości: dowolny ciąg.
includeHeaders false Określa, czy nagłówki komunikatów platformy Kafka mają być dołączane jako kolumna w danych wyjściowych. Prawidłowe wartości: true, false.
kafkaconsumer.polltimeoutms None Limit czasu w milisekundach dla wywołania użytkownika poll() platformy Kafka. Prawidłowe wartości: dodatnie liczby całkowite.
kafka.bootstrap.servers None Rozdzielona przecinkami lista adresów host:portów dla brokerów platformy Kafka. Ustawia właściwość klienta platformy bootstrap.servers Kafka.
Jeśli okaże się, że nie ma danych z platformy Kafka, sprawdź tę listę adresów brokera pod kątem nieprawidłowych adresów. Jeśli lista adresów brokera jest niepoprawna, może nie występować żadne błędy. Klienci platformy Kafka zakładają, że brokerzy będą w końcu dostępne i ponawiają próbę na zawsze, gdy wystąpią błędy sieci.
maxRecordsPerPartition None Maksymalna liczba rekordów dla każdej partycji platformy Spark. Po ustawieniu łącznik dzieli partycje platformy Kafka tak, aby każda partycja platformy Spark odczytywała co najwyżej te wiele rekordów. Prawidłowe wartości: dodatnie liczby całkowite.
Możesz również użyć tej opcji z opcją minPartitions. Po ustawieniu obu opcji platforma Spark używa dowolnej opcji w wyniku większej liczby partycji.
minPartitions None Minimalna liczba partycji platformy Spark do odczytu z platformy Kafka. Po ustawieniu łącznik dzieli duże partycje platformy Kafka w celu zwiększenia równoległości. Jeśli nie jest ustawiona, platforma Spark tworzy jedną partycję dla każdej partycji tematu platformy Kafka. Przydatne do obsługi niesymetryczności danych lub szczytowych obciążeń. Prawidłowe wartości: dodatnie liczby całkowite.
Ta opcja ponownie inicjuje odbiorców platformy Kafka dla każdego wyzwalacza, co może mieć wpływ na wydajność przy użyciu protokołu SSL.
startingOffsets latest (przesyłanie strumieniowe), earliest (partia) Przesunięcie, od którego rozpoczyna się odczyt zapytania. Prawidłowe wartości: earliest, latestlub ciąg JSON przesunięcia dla każdej partycji, na przykład {"topicA":{"0":23,"1":-2}}. W ciągu -1 JSON jest najnowszym przesunięciem. -2 jest najwcześniejszym przesunięciem.
W przypadku zapytań przesyłanych strumieniowo ta opcja ma zastosowanie tylko wtedy, gdy zostanie uruchomione nowe zapytanie. Wznawiane zapytania zawsze używają punktu kontrolnego. Podczas zapytania nowe partycje zaczynają odczytywać najwcześniejsze przesunięcie.
W przypadku zapytań wsadowych latest jest niedozwolone.
startingOffsetsByTimestamp None Lista przesunięć początkowych dla każdej partycji określona jako znaczniki czasu w milisekundach. Jeśli nie istnieje przesunięcie dla znacznika czasu, zachowanie zapytania jest określane przez startingOffsetsByTimestampStrategy. Prawidłowe wartości: ciąg JSON sygnatur czasowych dla każdej partycji, taki jak {"topicA":{"0":1000,"1":2000}}.
W przypadku zapytań przesyłanych strumieniowo ta opcja ma zastosowanie tylko wtedy, gdy zostanie uruchomione nowe zapytanie. Wznawiane zapytania zawsze używają punktu kontrolnego. Podczas zapytania nowe partycje zaczynają odczytywać najwcześniejsze przesunięcie.
startingOffsetsByTimestampStrategy error Strategia do użycia, gdy nie znaleziono przesunięcia dla znacznika czasu określonego w startingOffsetsByTimestamp lub startingTimestamp. Prawidłowe wartości: error (zgłasza wyjątek) latest (używa najnowszego dostępnego przesunięcia).
startingTimestamp None Globalny znacznik czasu rozpoczęcia w milisekundach, który ma zastosowanie do wszystkich partycji. Jeśli nie istnieje przesunięcie znacznika czasu, zachowanie jest kontrolowane przez startingOffsetsByTimestampStrategyelement . Prawidłowe wartości: liczby całkowite inne niż ujemne.
subscribe None Tematy do subskrybowania. Musisz określić dokładnie jedną z subscribeopcji , subscribePatternlub assign . Prawidłowe wartości: rozdzielona przecinkami lista nazw tematów.
subscribePattern None Wzorzec używany do subskrybowania tematów. Musisz określić dokładnie jedną z subscribeopcji , subscribePatternlub assign . Na przykład topic.*. Prawidłowe wartości: dowolny ciąg wyrażeń regularnych Java.

Następujące opcje dotyczą tylko odczytów przesyłanych strumieniowo za pomocą polecenia spark.readStream.format("kafka"):

Key Domyślny Description
bytesEstimateWindowLength 300s Przedział czasu służący do szacowania pozostałych bajtów dla estimatedTotalBytesBehindLatest metryki. Prawidłowe wartości: ciągi czasu trwania, takie jak 10m lub 600s. Zobacz Pobieranie metryk platformy Kafka.
maxOffsetsPerTrigger None Maksymalna liczba przesunięć przetwarzania na interwał wyzwalacza. Przesunięcia są dystrybuowane proporcjonalnie między partycjami tematu. Prawidłowe wartości: dodatnie liczby całkowite.
maxTriggerDelay 15m Maksymalny czas oczekiwania na minOffsetsPerTrigger gromadzenie się przed wyzwoleniem. Prawidłowe wartości: ciągi czasu trwania, takie jak 10m lub 600s.
minOffsetsPerTrigger None Minimalna liczba przesunięć do gromadzenia przed wyzwoleniem mikrosadowej partii. Gdy maxTriggerDelay zostanie osiągnięty, mikrosadowa jest uruchamiana niezależnie od tego. Prawidłowe wartości: dodatnie liczby całkowite.

Aby uzyskać opcje przesunięcia, które mają zastosowanie tylko do operacji odczytu wsadowego za pomocą spark.read.format("kafka")polecenia , zobacz Opcje elementu DataFrameReader platformy Kafka.

Aby uzyskać informacje o klientach platformy Kafka (kafka.*) i opcjach uwierzytelniania, zobacz Opcje.

Opcje elementu DataFrameWriter

Użyj tych opcji z DataFrameWriter.option() i DataFrameWriterV2.option(), aby kontrolować, jak Azure Databricks zapisuje dane.

Example

W poniższym przykładzie ustawiono wartość mergeSchema na True potrzeby pisania tabeli usługi Delta Lake:

Python
df.write.format("delta").option("mergeSchema", True).saveAsTable("my_table")
Scala
df.write.format("delta").option("mergeSchema", "true").saveAsTable("my_table")

Avro

Key Domyślny Description
avroSchema None Pełny schemat Avro jako ciąg JSON. Użyj tej opcji, aby przekonwertować typy Spark SQL na określone typy Avro. Dotyczy pliku Avro.
avroSchemaUrl None Adres URL wskazujący plik schematu Avro. Użyj zamiast avroSchema , gdy schemat jest przechowywany zewnętrznie. Wzajemnie wykluczające się z avroSchema. Dotyczy pliku Avro.
compression snappy Koder koder-dekoder kompresji do użycia podczas pisania. Prawidłowe wartości: uncompressed, , deflatesnappy, bzip2, xz, zstandard. Dotyczy pliku Avro.
recordName topLevelRecord Nazwa rekordu najwyższego poziomu w wyjściowym schemacie Avro. Dotyczy pliku Avro.
positionalFieldMatching false Określa, czy dopasować kolumny między schematem platformy Spark a schematem Avro według pozycji pola zamiast według nazwy. Dotyczy pliku Avro.
recordNamespace Pusty ciąg Przestrzeń nazw rekordu najwyższego poziomu w wyjściowym schemacie Avro. Dotyczy pliku Avro.

Delta Lake i Apache Iceberg

Key Domyślny Description
clusterByAuto false Czy włączyć automatyczne klastrowanie liquid, gdzie Azure Databricks wybiera kolumny klastrowania na podstawie wzorców zapytań. Tylko prawidłowe z mode("overwrite"). Nie można używać z trybem append . Dostępne w środowisku Databricks Runtime 16.4 lub nowszym. Dotyczy metody Use liquid clustering for tables (Używanie klastrowania płynnego dla tabel).
mergeSchema None Czy włączyć ewolucję schematu dla operacji zapisu. Nowe kolumny w źródłowej ramce danych są dodawane do schematu tabeli docelowej. Dotyczy dołączania wsadowego i przesyłania strumieniowego. Dotyczy schematu tabeli aktualizacji.
overwriteSchema None Czy zastąpić schemat tabeli i partycjonowanie podczas zastępowania. Wymaga mode("overwrite") bez replaceWhere. Nie można używać z partitionOverwriteMode. Dotyczy schematu tabeli aktualizacji.
partitionOverwriteMode None Tryb zastępowania partycji. Ustaw tę opcję na dynamic wartość , aby zastąpić tylko partycje zawierające nowe dane, pozostawiając wszystkie inne partycje bez zmian. Starszy tryb, nieobsługiwany w przypadku bezserwerowych zasobów obliczeniowych lub usługi Databricks SQL. Prawidłowe wartości: static, dynamic. Dotyczy selektywnego zastępowania danych za pomocą usługi Delta Lake.
replaceOn None Wyrażenie logiczne, które pasuje do wierszy w tabeli docelowej w celu zastąpienia wierszami z zapytania źródłowego. Może odwoływać się do kolumn zarówno z tabeli docelowej, jak i zapytania źródłowego. Wiersze w obiekcie docelowym, które pasują do wiersza źródłowego, są usuwane i zastępowane. Jeśli źródło jest puste, nie występują żadne usunięcia. Służy targetAlias do uściślania odwołań do kolumn. Dostępne w środowisku Databricks Runtime 17.1 lub nowszym. Dotyczy selektywnego zastępowania danych za pomocą usługi Delta Lake.
replaceUsing None Rozdzielona przecinkami lista nazw kolumn używanych do dopasowywania wierszy między tabelą docelową a zapytaniem źródłowym. Zarówno element docelowy, jak i źródło muszą zawierać wszystkie wymienione kolumny. Wiersze w obiekcie docelowym, które pasują do wiersza źródłowego w ramach porównania równości, są usuwane i zastępowane. NULL wartości są traktowane jako nie równe i nie będą zgodne. Dostępne w środowisku Databricks Runtime 16.3 lub nowszym. Dotyczy selektywnego zastępowania danych za pomocą usługi Delta Lake.
replaceWhere None Wyrażenie predykatu. Niepodzielne zastępuje tylko rekordy zgodne z predykatem. Dotyczy selektywnego zastępowania danych za pomocą usługi Delta Lake.
targetAlias None Alias ciągu dla tabeli docelowej. Użyj polecenia z replaceOn lub replaceWhere , aby uściślić odwołania do kolumn, gdy warunek odwołuje się do kolumn zarówno z tabeli docelowej, jak i zapytania źródłowego. Dotyczy selektywnego zastępowania danych za pomocą usługi Delta Lake.
txnAppId None Unikatowy ciąg identyfikujący aplikację dla idempotentnych zapisów w foreachBatch operacjach. Użyj polecenia z , txnVersion aby zapewnić dokładnie jednokrotne zapisy w wielu tabelach usługi Delta Lake. Dotyczy polecenia Use for idempotent table writes (Użyj foreachBatch dla operacji zapisu w tabeli idempotentnych).
txnVersion None Monotonicznie rosnąca liczba używana jako wersja transakcji dla idempotentnych zapisów w foreachBatch operacjach. Użyj polecenia z , txnAppId aby zapewnić dokładnie jednokrotne zapisy w wielu tabelach usługi Delta Lake. Dotyczy polecenia Use for idempotent table writes (Użyj foreachBatch dla operacji zapisu w tabeli idempotentnych).
optimizeWrite None Czy włączyć automatyczne optymalizowanie zapisu dla tej operacji zapisu. Zastępuje konfigurację spark.databricks.delta.optimizeWrite.enabled . Dotyczy Jak jest usługa Delta Lake w Azure Databricks?.
userMetadata None Ciąg zdefiniowany przez użytkownika dołączony do metadanych zatwierdzenia dla operacji zapisu. Widoczne w danych wyjściowych elementu DESCRIBE HISTORY. Dotyczy wzbogacenia tabel z niestandardowymi metadanymi.

CSV

Key Domyślny Description
charToEscapeQuoteEscaping \0 (nie włączono) Znak używany do ucieczki znaku ucieczki, gdy różni się od znaku cudzysłowu. Dotyczy pliku csv (DataFrameWriter).
compression none Koder koder-dekoder kompresji do użycia podczas pisania. Prawidłowe wartości: none, , bzip2gzip, lz4snappy, , deflate, zstd. Dotyczy pliku csv (DataFrameWriter).
dateFormat yyyy-MM-dd Formatuj ciąg dla wartości kolumn daty. Dotyczy pliku csv (DataFrameWriter).
emptyValue Pusty ciąg Ciąg zapisany dla pustych wartości (bez wartości null). Dotyczy pliku csv (DataFrameWriter).
encoding UTF-8 Kodowanie znaków dla plików wyjściowych. Dotyczy pliku csv (DataFrameWriter).
escape \ Znak używany do ucieczki wartości cytowanych. Dotyczy pliku csv (DataFrameWriter).
escapeQuotes true Czy należy unikać znaków cudzysłowu wewnątrz wartości pól cytowanych. Dotyczy pliku csv (DataFrameWriter).
header false Określa, czy należy pisać nazwy kolumn jako pierwszy wiersz danych wyjściowych. Dotyczy pliku csv (DataFrameWriter).
ignoreLeadingWhiteSpace false Czy przycinać wiodące białe znaki z wartości podczas pisania. Dotyczy pliku csv (DataFrameWriter).
ignoreTrailingWhiteSpace false Czy przycinać końcowe odstępy od wartości podczas pisania. Dotyczy pliku csv (DataFrameWriter).
lineSep \n Ciąg separatora wiersza używany między rekordami. Dotyczy pliku csv (DataFrameWriter).
locale en-US Identyfikator java.util.Locale. Wpływa na formatowanie wartości daty i znacznika czasu podczas pisania.
nullValue Pusty ciąg Ciąg napisany dla wartości null. Dotyczy pliku csv (DataFrameWriter).
quote " Znak używany do cudzysłowu wartości pól, które zawierają separator. Dotyczy pliku csv (DataFrameWriter).
quoteAll false Czy ująć wszystkie wartości pól w cudzysłowie niezależnie od zawartości. Dotyczy pliku csv (DataFrameWriter).
sep , Znak ogranicznika pola. Dotyczy pliku csv (DataFrameWriter).
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] Ciąg formatu dla wartości kolumn sygnatury czasowej. Dotyczy pliku csv (DataFrameWriter).
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] Formatuj ciąg dla znacznika czasu bez wartości kolumn strefy czasowej (TimestampNTZType).

Excel

Key Domyślny Description
dataAddress None Nazwa arkusza lub komórka początkowa dla zapisu. Jeśli pominięto, zapisuje w arkuszu o nazwie Sheet1 rozpoczynającej się od komórki A1. Akceptuje nazwę arkusza ("SheetName") lub pojedyncze odwołanie do komórki ("SheetName!A1"). Zakresy komórek nie są obsługiwane w przypadku zapisów.
dateFormatInWrite yyyy-mm-dd Excel ciąg formatu komórki zastosowany do kolumn Date. Używa składni formatu Excel.
headerRows 0 Określa, czy należy pisać nazwy kolumn jako pierwszy wiersz. Prawidłowe wartości: 0, 1.
timestampNTZFormat yyyy-mm-dd hh:mm:ss Excel ciąg formatu komórki zastosowany do kolumn TimestampNTZ i Timestamp. Używa składni formatu Excel.
version xlsx Wersja formatu pliku Excel do zapisu. Prawidłowe wartości: xlsx, xls.

JSON

Key Domyślny Description
compression none Koder koder-dekoder kompresji do użycia podczas pisania. Prawidłowe wartości: none, , bzip2gzip, lz4snappy, , deflate, zstd. Dotyczy formatu json (DataFrameWriter).
dateFormat yyyy-MM-dd Formatuj ciąg dla wartości kolumn daty. Dotyczy formatu json (DataFrameWriter).
encoding UTF-8 Kodowanie znaków dla plików wyjściowych. Dotyczy formatu json (DataFrameWriter).
ignoreNullFields wartość spark.sql.jsonGenerator.ignoreNullFields Czy pominąć pola z wartościami null z danych wyjściowych JSON. Dotyczy formatu json (DataFrameWriter).
lineSep \n Ciąg separatora wiersza używany między rekordami. Dotyczy formatu json (DataFrameWriter).
locale en-US Identyfikator java.util.Locale. Wpływa na formatowanie wartości daty i znacznika czasu podczas pisania.
pretty false Czy włączyć dość (wcięcie, wielowierszowe) dane wyjściowe JSON.
sortKeys false Czy posortować klucze obiektów JSON alfabetycznie w danych wyjściowych. Przydatne do tworzenia danych wyjściowych deterministycznych.
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] Ciąg formatu dla wartości kolumn sygnatury czasowej. Dotyczy formatu json (DataFrameWriter).
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] Formatuj ciąg dla znacznika czasu bez wartości kolumn strefy czasowej (TimestampNTZType).
writeNonAsciiCharacterAsCodePoint false Czy kodować znaki inne niż ASCII jako \uXXXX sekwencje ucieczki Unicode zamiast literału UTF-8 znaków w danych wyjściowych.

ORC

Key Domyślny Description
compression zstd Koder koder-dekoder kompresji do użycia podczas pisania. Prawidłowe wartości: none, , uncompressedsnappy, zliblzozstd, lz4, . brotli Dotyczy orc (DataFrameWriter).

Parkiet

Key Domyślny Description
compression snappy Koder koder-dekoder kompresji do użycia podczas pisania. Prawidłowe wartości: none, uncompressedbrotlisnappygziplzo, lz4, lz4_raw, . zstd Dotyczy parquet (DataFrameWriter).
spark.sql.parquet.outputTimestampType INT96 Typ fizyczny używany do kodowania kolumn sygnatury czasowej. Prawidłowe wartości: INT96, , TIMESTAMP_MICROSTIMESTAMP_MILLIS. Służy INT96 do zapewniania zgodności ze starszymi czytnikami Parquet, które nie obsługują standardowych typów sygnatur czasowych.

Tekst

Key Domyślny Description
compression none Koder koder-dekoder kompresji do użycia podczas pisania. Prawidłowe wartości: none, , bzip2gzip, lz4snappy, , deflate, zstd. Dotyczy tekstu (DataFrameWriter).
encoding UTF-8 Kodowanie znaków dla plików wyjściowych.
lineSep \n Ciąg separatora wiersza używany między rekordami. Dotyczy tekstu (DataFrameWriter).

XML

Key Domyślny Description
arrayElementName item Nazwa elementu dla elementów tablicy, które nie mają jawnej nazwy. Dotyczy xml (DataFrameWriter).
attributePrefix _ Prefiks poprzedzony nazwami pól, które odpowiadają atrybutom XML. Dotyczy xml (DataFrameWriter).
compression none Koder koder-dekoder kompresji do użycia podczas pisania. Prawidłowe wartości: none, , bzip2gzip, lz4snappy, , deflate, zstd. Dotyczy xml (DataFrameWriter).
dateFormat yyyy-MM-dd Formatuj ciąg dla wartości kolumn daty. Dotyczy xml (DataFrameWriter).
declaration version="1.0" encoding="UTF-8" standalone="yes" Ciąg deklaracji XML zapisany w górnej części każdego pliku wyjściowego. Ustaw wartość na pusty ciąg, aby pominąć deklarację. Dotyczy xml (DataFrameWriter).
encoding UTF-8 Kodowanie znaków dla plików wyjściowych. Dotyczy xml (DataFrameWriter).
indent 4 spacje Ciąg używany do wcięcia elementów podrzędnych w danych wyjściowych. Ustaw na pusty ciąg, aby wyłączyć wcięcie i zapisać każdy wiersz w jednym wierszu.
locale en-US Identyfikator java.util.Locale. Wpływa na formatowanie wartości daty i znacznika czasu podczas pisania.
nullValue null Ciąg napisany dla wartości null. Po ustawieniu nullwartości na atrybuty i elementy podrzędne dla pól o wartości null zostaną pominięte. Dotyczy xml (DataFrameWriter).
rootTag ROWS Tag elementu głównego, który opakowuje wszystkie elementy wiersza w danych wyjściowych. Dotyczy xml (DataFrameWriter).
rowTag ROW Tag elementu reprezentujący wiersz w danych wyjściowych. Dotyczy xml (DataFrameWriter).
singleVariantColumn None Nazwa pojedynczej kolumny Wariant do zapisu w plikach XML. Dotyczy xml (DataFrameWriter).
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] Ciąg formatu dla wartości kolumn sygnatury czasowej. Dotyczy xml (DataFrameWriter).
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] Formatuj ciąg dla znacznika czasu bez wartości kolumn strefy czasowej. Dotyczy xml (DataFrameWriter).
validateName true Czy zgłosić wyjątek, jeśli nazwa kolumny nie jest prawidłowym identyfikatorem elementu XML. Dotyczy xml (DataFrameWriter).
valueTag _VALUE Nazwa pola używana dla danych znaków w elementach XML, które mają również atrybuty lub elementy podrzędne. Dotyczy xml (DataFrameWriter).

Opcje DataStreamWriter

Te opcje umożliwiają DataStreamWriter.option() skonfigurowanie zapisów przesyłanych strumieniowo.

Example

W poniższym przykładzie ustawiono lokalizację punktu kontrolnego dla strumienia:

Python
(df.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .start("/path/to/table"))
Scala
df.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .start("/path/to/table")

Wspólne

Key Domyślny Description
checkpointLocation Brak (wymagane) Ścieżka do katalogu punktu kontrolnego dla zapytania przesyłania strumieniowego. Wymagana do zapewnienia odporności na uszkodzenia i dokładnie jednokrotnych gwarancji przetwarzania. Każde zapytanie przesyłane strumieniowo musi używać unikatowej lokalizacji punktu kontrolnego. Usługa Databricks zaleca przechowywanie punktów kontrolnych w woluminie wykazu aparatu Unity lub ścieżce magazynu w chmurze. Zobacz Ustrukturyzowane punkty kontrolne przesyłania strumieniowego.
path None Ścieżka wyjściowa dla ujściów przesyłania strumieniowego opartego na plikach, takich jak Parquet. Dotyczy tylko formatów opartych na plikach.

Ujście konsoli

Key Domyślny Description
numRows 20 Liczba wierszy do wyświetlenia dla każdej mikrosadowej podczas zapisywania w ujściu konsoli.
truncate true Czy obcinać długie ciągi podczas wyświetlania wierszy. Ustaw wartość na , aby false wyświetlić pełne wartości ciągu.

Delta Lake

Poniższe opcje mają zastosowanie podczas zapisywania strumienia do tabeli usługi Delta Lake przy użyciu polecenia format("delta"). Opcje tylko zastępowania, takie jak overwriteSchema, replaceWherei partitionOverwriteMode nie są obsługiwane w przypadku zapisów przesyłanych strumieniowo.

Key Domyślny Description
mergeSchema false Czy rozwijać schemat tabeli usługi Delta Lake, gdy ramka danych przesyłania strumieniowego zawiera nowe kolumny. Dotyczy tylko trybu dołączania danych wyjściowych. Dotyczy schematu tabeli aktualizacji.
userMetadata None Ciąg zdefiniowany przez użytkownika dołączony do metadanych zatwierdzenia dla operacji zapisu. Widoczne w danych wyjściowych elementu DESCRIBE HISTORY. Dotyczy wzbogacenia tabel z niestandardowymi metadanymi.

Ujście pliku

Poniższa opcja ma zastosowanie podczas zapisywania strumienia w formatach opartych na plikach (Parquet, JSON, CSV, ORC, text). Aby uzyskać informacje o opcjach specyficznych dla formatu, zobacz Opcje elementu DataFrameWriter.

Key Domyślny Description
retention None Jak długo zachować pliki metadanych ujścia używane do odporności na uszkodzenia i kompaktowania. Akceptuje ciąg czasu, taki jak 7 days lub 24 hours. Jeśli nie jest ustawiona, pliki metadanych są zachowywane przez czas nieokreślony.

Ujście platformy Kafka

Aby uzyskać pełną listę opcji pisania strumieni na platformie Kafka, zobacz Opcje.

Key Domyślny Description
kafka.bootstrap.servers None Required. Rozdzielona przecinkami lista adresów brokera platformy host:port Kafka.
topic None Docelowy temat platformy Kafka dla wszystkich wierszy. Wymagane, jeśli ramka danych nie zawiera kolumny topic .
kafka.* None Dowolna konfiguracja producenta platformy Kafka poprzedzona prefiksem kafka.. Na przykład kafka.compression.type.

Ujście pamięci

Key Domyślny Description
queryName Brak (wymagane) Nazwa tabeli w pamięci, do którego zapisuje zapytanie. Wymagane do ujścia pamięci. Można również konfigurować za pomocą polecenia .queryName().
mode exactlyonce Gwarancja dostarczenia dla ujścia pamięci. exactlyonce używa trybu mikrosadowego z semantykami dokładnie jednokrotnymi. atleastonce używa trybu ciągłego z semantykami co najmniej raz. Prawidłowe wartości: exactlyonce, atleastonce.

Opcje funkcji platformy Spark

Niektóre wbudowane funkcje platformy Spark SQL akceptują options mapę, która steruje zachowaniem analizowania lub serializacji. Przekaż opcje jako Python dict lub Scala Map[String, String].

Example

Poniższy przykład analizuje kolumnę JSON podczas upuszczania nieprawidłowo sformułowanych rekordów:

Python
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([StructField("name", StringType())])
df = df.withColumn("parsed", from_json("json_col", schema, {"mode": "DROPMALFORMED"}))
Scala
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._

val schema = StructType(Seq(StructField("name", StringType)))
val df = df.withColumn("parsed", from_json(col("json_col"), schema, Map("mode" -> "DROPMALFORMED")))

Avro

Funkcje Avro akceptują te same opcje co odpowiednie opcje ramki danych:

Example

Poniższy przykład dekoduje kolumnę Avro z włączoną ewolucją schematu:

Python
from pyspark.sql.functions import from_avro

df = df.withColumn("decoded", from_avro("avro_col", json_schema, {"avroSchemaEvolutionMode": "restart"}))
Scala
import org.apache.spark.sql.avro.functions.from_avro

val df = df.withColumn("decoded", from_avro(col("avro_col"), jsonSchema, Map("avroSchemaEvolutionMode" -> "restart")))

Ponadto warianty rejestru schematów from_avro i to_avro akceptują następujące opcje:

Key Domyślny Description
schemaId None Identyfikator schematu z rejestru schematów Confluent do użycia podczas dekodowania danych Avro zakodowanych przy użyciu schematu niezgodnego z jsonFormatSchema. from_avro Dotyczy tylko.
confluent.schema.registry.* None Właściwości konfiguracji klienta rejestru schematów confluent. Przekaż dowolną właściwość klienta sr platformy Confluent przy użyciu tego prefiksu, na przykład confluent.schema.registry.basic.auth.user.info w przypadku poświadczeń uwierzytelniania podstawowego. Wymagane dla wariantów rejestru schematów i from_avroto_avro.

CSV

Funkcje CSV akceptują te same opcje co odpowiednie opcje ramki danych:

Example

Poniższy przykład odczytuje plik CSV z separatorem niestandardowym i NULL wartością:

Python
from pyspark.sql.functions import from_csv
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([StructField("id", IntegerType()), StructField("name", StringType())])
df = df.withColumn("parsed", from_csv("csv_col", schema, {"sep": "|", "nullValue": "N/A"}))
Scala
import org.apache.spark.sql.functions.from_csv
import org.apache.spark.sql.types._

val schema = StructType(Seq(StructField("id", IntegerType), StructField("name", StringType)))
val df = df.withColumn("parsed", from_csv(col("csv_col"), schema, Map("sep" -> "|", "nullValue" -> "N/A")))

JSON

Funkcje JSON akceptują te same opcje co odpowiednie opcje ramki danych:

Example

W poniższym przykładzie jest zapisywany kod JSON z polami NULL ignorowanymi i włączonym dość formatowaniem:

Python
from pyspark.sql.functions import to_json

df = df.withColumn("json_str", to_json("struct_col", {"pretty": "true", "ignoreNullFields": "true"}))
Scala
import org.apache.spark.sql.functions.to_json

val df = df.withColumn("json_str", to_json(col("struct_col"), Map("pretty" -> "true", "ignoreNullFields" -> "true")))

Protobuf

from_protobuf nie to_protobuf należy używać źródła danych opartego na plikach. Dane Protobuf są zawsze odczytywane i zapisywane jako kolumny binarne przy użyciu tych funkcji. Opcje są przekazywane jako i Map[String, String] są uwzględniane wielkość liter.

Example

Poniższy przykład dekoduje kolumnę Protobuf przy użyciu trybu PERMISSIVE:

Python
from pyspark.sql.functions import from_protobuf

df = df.withColumn("decoded", from_protobuf("proto_col", "MyMessage", "/path/to/descriptor.desc",
    {"mode": "PERMISSIVE", "enums.as.ints": "true"}))
Scala
import org.apache.spark.sql.protobuf.functions.from_protobuf

val df = df.withColumn("decoded", from_protobuf(col("proto_col"), "MyMessage", "/path/to/descriptor.desc",
    Map("mode" -> "PERMISSIVE", "enums.as.ints" -> "true")))

Funkcje Protobuf korzystają z następujących opcji:

Key Domyślny Description
mode FAILFAST Jak obsługiwać uszkodzone rekordy. Funkcja FAILFAST zgłasza wyjątek. PERMISSIVE Ustawia źle sformułowane pola na null. Prawidłowe wartości: FAILFAST, PERMISSIVE. Dotyczy .from_protobuf
recursive.fields.max.depth -1 (wyłączone) Maksymalna głębokość rekursji dla cyklicznych pól Protobuf. Ustaw wartość , aby 0 wyłączyć obsługę pól cyklicznych. Prawidłowe wartości: 0 do 10. Dotyczy .from_protobuf
convert.any.fields.to.json false Czy przekonwertować pola Protobuf Any na ciąg JSON zamiast STRUCT. Dotyczy .from_protobuf
emit.default.values false Czy emitować pola z wartościami zerowymi lub domyślnymi (semantyka proto3). Gdy falsepola z wartościami domyślnymi zostaną pominięte z danych wyjściowych. Dotyczy .from_protobuf
enums.as.ints false Czy renderować pola wyliczenia jako wartości całkowite zamiast ciągów. Dotyczy .from_protobuf
upcast.unsigned.ints false Czy przesłać uint32 do Long i uint64 , aby Decimal(20,0) zapobiec przepełnieniu liczby całkowitej. Dotyczy .from_protobuf
unwrap.primitive.wrapper.types false Określa, Int32Value czy należy odpakowywać google.protobuf typy otoki (na przykład i StringValue) do odpowiadających im typów pierwotnych platformy Spark. Dotyczy .from_protobuf
retain.empty.message.types false Czy zachować puste typy komunikatów Protobuf w schemacie danych wyjściowych przez wstawienie fikcyjnej kolumny. Dotyczy .from_protobuf
schema.registry.subject None Nazwa podmiotu rejestru schematów. Wymagane w przypadku używania wariantów rejestru schematów from_protobuf i to_protobuf.
schema.registry.address None Adres rejestru schematów (host i port). Wymagane w przypadku używania wariantów rejestru schematów from_protobuf i to_protobuf.
schema.registry.protobuf.name None Określa, który komunikat Protobuf ma być używany, gdy temat rejestru schematów zawiera wiele komunikatów. Optional.

XML

Funkcje XML akceptują te same opcje co odpowiednie opcje ramki danych:

Example

Poniższy przykład zapisuje kod XML z niestandardowymi tagami katalogu głównego i wiersza:

Python
from pyspark.sql.functions import to_xml

df = df.withColumn("xml_str", to_xml("struct_col", {"rootTag": "records", "rowTag": "record"}))
Scala
import org.apache.spark.sql.functions.to_xml

val df = df.withColumn("xml_str", to_xml(col("struct_col"), Map("rootTag" -> "records", "rowTag" -> "record")))