Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Na tej stronie wymieniono dostępne opcje danych wejściowych i wyjściowych dla interfejsów API platformy Spark, które odczytują i zapisują dane.
Opcje elementu DataFrameReader
Użyj tych opcji z DataFrameReader.option(), DataFrameReader.options(), read_files, COPY INTO i Auto Loader kontrolować, jak Azure Databricks odczytuje pliki danych.
Example
Poniższy przykład ustawia wartość multiLine na True do odczytywania plików JSON:
Python
df = spark.read.format("json").option("multiLine", True).load("/path/to/data")
Scala
val df = spark.read.format("json").option("multiLine", "true").load("/path/to/data")
SQL
SELECT * FROM read_files("/path/to/data", format => "json", multiLine => true)
Wspólne
Poniższe opcje mają zastosowanie do wszystkich formatów plików.
| Key | Domyślny | Description |
|---|---|---|
ignoreCorruptFiles |
false |
Czy ignorować uszkodzone pliki. Jeśli ma wartość true, zadania platformy Spark będą nadal działać po napotkaniu uszkodzonych plików, a zawartość, która została odczytowana, będzie nadal zwracana. W przypadku COPY INTOprogramu można obserwować pomijanie uszkodzonych plików, jak numSkippedCorruptFiles w operationMetrics kolumnie historii usługi Delta Lake. Dostępne w środowisku Databricks Runtime 11.3 LTS i nowszym. |
ignoreMissingFiles |
false dla automatycznego modułu ładującego dla trueCOPY INTO (starsza wersja) |
Czy ignorować brakujące pliki. Jeśli to prawda, zadania platformy Spark będą nadal uruchamiane po napotkaniu brakujących plików, a zawartość jest nadal zwracana. Dostępne w środowisku Databricks Runtime 11.3 LTS i nowszym. |
modifiedAfter |
None | Opcjonalny znacznik czasu jako filtr do pobierania tylko tych plików, które mają znacznik czasu modyfikacji po podanym znaczniku czasowym. |
modifiedBefore |
None | Opcjonalny znacznik czasu jako filtr do wczytywania tylko plików, które mają znacznik czasu modyfikacji wcześniejszy niż podany znacznik czasu. |
pathGlobFilter lub fileNamePattern |
None | Potencjalny wzorzec globu umożliwiający wybór plików.
PATTERN Odpowiednik w (COPY INTOstarsza wersja).
fileNamePattern można użyć w pliku read_files. |
recursiveFileLookup |
false |
Gdy trueta opcja wyszukuje katalogi zagnieżdżone, nawet jeśli ich nazwy nie są zgodne ze schematem nazewnictwa partycji, takim jak date=2019-07-01. |
Avro
| Key | Domyślny | Description |
|---|---|---|
avroSchema |
None | Opcjonalny schemat dostarczony przez użytkownika w formacie Avro. Podczas odczytywania avro tę opcję można ustawić na rozwinięty schemat, który jest zgodny, ale różni się od rzeczywistego schematu Avro. Schemat deserializacji jest zgodny ze schematem rozwiniętym. Jeśli na przykład ustawisz rozwinięty schemat zawierający jedną dodatkową kolumnę z wartością domyślną, wynik odczytu również zawiera nową kolumnę. |
avroSchemaEvolutionMode |
none |
Jak obsługiwać ewolucję schematu podczas korzystania z rejestru schematów. Prawidłowe wartości: none (ignoruj zmiany schematu i kontynuuj zadanie), restart (gdy zostaną wykryte zmiany schematu, zgłasza UnknownFieldException element i wymaga ponownego uruchomienia zadania). |
datetimeRebaseMode |
LEGACY |
Steruje przestawieniem wartości DATE i TIMESTAMP między kalendarzami juliańskim a proleptycznym gregoriańskim. Prawidłowe wartości: EXCEPTION, LEGACYi CORRECTED. |
enableStableIdentifiersForUnionType |
false |
Określa, czy używać stabilnych nazw pól dla typów unii Avro. Po włączeniu nazwy pól typu unii pochodzą z nazw typów w małych literach (na przykład member_int, member_string). Zgłasza wyjątek, jeśli dwie nazwy typów są identyczne po małych literach. |
mergeSchema |
false |
Czy należy wywnioskować schemat dla wielu plików i scalić schematy każdego pliku.
mergeSchema dla Avro nie jest możliwe złagodzenie typów danych. |
mode |
FAILFAST |
Tryb analizatora do obsługi uszkodzonych rekordów. Prawidłowe wartości: FAILFAST (zgłasza wyjątek), PERMISSIVE (ustawia źle sformułowane pola na wartość null) DROPMALFORMED (dyskretnie porzuca nieprawidłowe rekordy). |
readerCaseSensitive |
true |
Określa zachowanie wrażliwości na wielkość liter po włączeniu rescuedDataColumn. Jeśli to prawda, należy uratować kolumny danych, których nazwy różnią się wielkością liter od schematu. Gdy wartość false, odczytuje dane w sposób niewrażliwy na wielkość liter. |
recursiveFieldMaxDepth |
None | Maksymalna głębokość rekursji dla cyklicznych pól Avro. Ustaw wartość na 1 obcięcie wszystkich pól cyklicznych, 2 aby zezwolić na jeden poziom rekursji itd.15 W przypadku niezastawionych pól cyklicznych lub 0pola cyklicznego nie są dozwolone. Prawidłowe wartości: 0 do 15. |
rescuedDataColumn |
None | Czy zebrać wszystkie dane, których nie można przeanalizować z powodu niezgodności typu danych oraz niezgodności schematu (w tym wielkości liter kolumn), do osobnej kolumny. Ta kolumna jest domyślnie dołączana podczas korzystania z modułu automatycznego ładowania.COPY INTO (starsza wersja) nie obsługuje uratowanych kolumn danych, ponieważ nie można ręcznie ustawić schematu przy użyciu COPY INTO. Databricks zaleca używanie Auto Loader w przypadku większości scenariuszy przyjmowania danych.Aby uzyskać więcej informacji, zobacz Co to jest uratowana kolumna danych?. |
stableIdentifierPrefixForUnionType |
member_ |
Prefiks używany dla nazw pól typu stabilnego unii, gdy enableStableIdentifiersForUnionType=true. |
CSV
| Key | Domyślny | Description |
|---|---|---|
badRecordsPath |
None | Ścieżka do przechowywania plików do rejestrowania informacji o nieprawidłowych rekordach CSV. |
charToEscapeQuoteEscaping |
\0 |
Znak używany do unikania znaku używanego do unikania cudzysłowów. Na przykład dla następującego rekordu: : [ " a\\", b ]
|
columnNameOfCorruptRecord |
_corrupt_record |
Obsługiwane dla funkcji Auto Loader. Nie obsługiwane w przypadku COPY INTO (starsza wersja).Kolumna do przechowywania rekordów, które są źle sformułowane i nie można ich przeanalizować. mode Jeśli dla analizowania ustawiono wartość DROPMALFORMED, ta kolumna będzie pusta. |
comment |
\0 |
Definiuje znak reprezentujący komentarz liniowy znajdujący się na początku linii tekstu. Użyj polecenia '\0' , aby wyłączyć pomijanie komentarza. |
dateFormat |
yyyy-MM-dd |
Format analizowania ciągów dat. |
emptyValue |
Pusty ciąg | Wartość reprezentowana jako pusty ciąg. |
enableDateTimeParsingFallback |
false |
Czy powrócić do starszego zachowania analizy daty i znacznika czasu, gdy nie można przeanalizować wartości przy użyciu określonego formatu. Gdy falsebłędy analizowania zgłaszają błąd lub powodują wygenerowanie wartości null w zależności od modemetody . |
encoding lub charset |
UTF-8 |
Nazwa kodowania plików CSV. Zobacz java.nio.charset.Charset listę opcji.
UTF-16 i UTF-32 nie mogą być używane, gdy multiline to true. |
enforceSchema |
true |
Czy wymuszać stosowanie zadanego lub wnioskowanego schematu do plików CSV. Jeśli opcja jest włączona, nagłówki plików CSV są ignorowane. Ta opcja jest domyślnie ignorowana podczas używania automatycznego modułu ładującego do ratowania danych i zezwalania na ewolucję schematu. |
escape |
\ |
Znak ucieczki do użycia podczas analizowania danych. |
extension |
csv |
Oczekiwane rozszerzenie nazwy pliku. Pliki bez tego rozszerzenia są filtrowane podczas odczytu. |
failOnUnknownFields |
false |
Czy rekord CSV nie może występować w schemacie, czy rekord CSV zawiera kolumny, które nie są obecne. W przypadku false, nierozpoznane kolumny są dyskretnie porzucane lub ratowane w zależności od rescuedDataColumn. |
failOnWidenedFields |
false |
Czy nie można przeanalizować wartości pola jako zadeklarowanego typu schematu bez rozszerzania. Gdy falsewartości rozszerzone typu są dyskretnie ratowane w zależności od rescuedDataColumn. Ustawienie failOnUnknownFields=true może maskować efekty tej opcji. |
header |
false |
Określa, czy pliki CSV zawierają nagłówek. Automatyczny ładowacz zakłada, że pliki mają nagłówki podczas ustalania schematu. |
ignoreLeadingWhiteSpace |
false |
Czy ignorować początkowe spacje dla każdej parsowanej wartości. |
ignoreTrailingWhiteSpace |
false |
Czy należy pomijać zbędne spacje dla każdej analizowanej wartości. |
inferSchema |
false |
Czy wywnioskować typy danych analizowanych rekordów CSV, czy przyjąć, że wszystkie kolumny mają wartość StringType. Wymaga dodatkowego przetwarzania danych, jeśli ustawiono wartość true. Zamiast tego użyj cloudFiles.inferColumnTypes dla Auto Loader. |
inputBufferSize |
1048576 (1 MB) |
Rozmiar buforu w bajtach analizatora CSV. Przydatne do dostrajania użycia pamięci podczas analizowania dużych plików CSV. Prawidłowe wartości: dodatnie liczby całkowite. |
lineSep |
Brak, który obejmuje \r, \r\ni \n |
Ciąg pomiędzy dwoma następującymi po sobie rekordami CSV. |
locale |
US |
Identyfikator java.util.Locale. Wpływa na sposób domyślnego interpretowania dat, znaczników czasu oraz liczb dziesiętnych w pliku CSV. |
maxCharsPerColumn |
-1 |
Maksymalna liczba znaków oczekiwana od wartości do przeanalizowania. Może służyć do unikania błędów pamięci. Wartość domyślna to -1, co oznacza brak ograniczeń. Prawidłowe wartości: dodatnie liczby całkowite lub -1 nieograniczone. |
maxColumns |
20480 |
Stały limit liczby kolumn, które może mieć rekord. Prawidłowe wartości: dodatnie liczby całkowite. |
mergeSchema |
false |
Czy należy wywnioskować schemat dla wielu plików i scalić schematy każdego pliku. Domyślnie włączono funkcję automatycznego ładowania podczas wnioskowania schematu. |
mode |
PERMISSIVE |
Tryb analizatora dotyczący obsługi nieprawidłowo sformułowanych danych. Prawidłowe wartości: PERMISSIVE, , DROPMALFORMEDFAILFAST. |
multiLine |
false |
Określa, czy rekordy CSV obejmują wiele wierszy. |
nanValue |
NaN |
Reprezentacja ciągu znakowego wartości niebędącej liczbą podczas parsowania kolumn FloatType i DoubleType. |
negativeInf |
-Inf |
Tekstowa reprezentacja nieskończoności ujemnej podczas parsowania kolumn FloatType lub DoubleType. |
nullValue |
Pusty ciąg | Reprezentacja wartości null jako ciąg znaków. |
parserCaseSensitive (przestarzałe) |
false |
Czy podczas odczytywania plików należy wyrównywać kolumny zadeklarowane w nagłówku z uwzględnieniem wielkości liter w schemacie.
true To jest domyślne ustawienie dla Automatycznego Ładowania. Kolumny, które różnią się wielkością liter, zostaną zachowane w rescuedDataColumn, jeśli funkcja ta zostanie włączona. Ta opcja została uznana za przestarzałą na rzecz readerCaseSensitive. |
positiveInf |
Inf |
Postać tekstowa nieskończoności dodatniej podczas parsowania kolumn FloatType lub DoubleType. |
preferDate |
true |
Próbuje interpretować ciągi znaków jako daty, zamiast traktować je jako znaczniki czasu, gdy jest to możliwe. Należy również użyć wnioskowania schematu, włączając inferSchema lub używając cloudFiles.inferColumnTypes funkcji automatycznego modułu ładującego. |
quote |
" |
Znak używany do unikania wartości, w których ogranicznik pola jest częścią wartości. |
readerCaseSensitive |
true |
Określa zachowanie wrażliwości na wielkość liter po włączeniu rescuedDataColumn. Jeśli to prawda, należy uratować kolumny danych, których nazwy różnią się wielkością liter od schematu. Gdy wartość false, odczytuje dane w sposób niewrażliwy na wielkość liter. |
rescuedDataColumn |
None | Czy zebrać wszystkie dane, których nie można przeanalizować z powodu niezgodności typu danych oraz niezgodności schematu (w tym wielkości liter kolumn), do osobnej kolumny. Ta kolumna jest domyślnie dołączana podczas korzystania z modułu automatycznego ładowania. Aby uzyskać więcej informacji, zobacz Co to jest uratowana kolumna danych?.COPY INTO (starsza wersja) nie obsługuje uratowanych kolumn danych, ponieważ nie można ręcznie ustawić schematu przy użyciu COPY INTO. Databricks zaleca używanie Auto Loader w przypadku większości scenariuszy przyjmowania danych. |
sep lub delimiter |
, |
Ciąg separatorów między kolumnami. |
singleVariantColumn |
None | Po ustawieniu nazwy kolumny odczytuje cały rekord CSV w jedną VariantType kolumnę o tej nazwie zamiast analizować każde pole do własnej kolumny. Wymaga header=true. |
skipRows |
0 |
Liczba wierszy z początku pliku CSV, które powinny być ignorowane (w tym z komentarzami i pustymi wierszami). Jeśli header ma wartość true, nagłówek będzie pierwszym wierszem, który nie został pominięty ani skomentowany. Prawidłowe wartości: dodatnie liczby całkowite lub 0. |
timeFormat |
HH:mm:ss |
Format analizowania TimeType wartości kolumn. |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
Format analizowania ciągów znacznika czasu. |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
Format analizowania znacznika czasu bez ciągów strefy czasowej (TimestampNTZType). |
timeZone |
None | Element java.time.ZoneId do użycia podczas analizowania sygnatur czasowych i dat. |
unescapedQuoteHandling |
STOP_AT_DELIMITER |
Strategia obsługi niewyłuskanych cudzysłowów. Dozwolone opcje:
|
Excel
| Key | Domyślny | Description |
|---|---|---|
dataAddress |
None | Zakres komórek do odczytania w składni Excel. Jeśli pominięto, odczytuje wszystkie prawidłowe komórki z pierwszego arkusza. Użyj "SheetName!C5:H10" polecenia , aby odczytać zakres z nazwanego arkusza, "C5:H10" aby odczytać zakres z pierwszego arkusza lub "SheetName" odczytać wszystkie dane z określonego arkusza. |
headerRows |
0 |
Liczba początkowych wierszy do użycia jako nagłówki nazw kolumn. Gdy dataAddress zostanie określony, ma to zastosowanie w zakresie komórek. Gdy 0nazwy kolumn są generowane automatycznie jako _c1, _c2, _c3itp. Prawidłowe wartości: 0, 1. |
ignoreMissingSheet |
false |
Czy dyskretnie pominąć pliki, które nie zawierają arkusza określonego przez dataAddress. W przypadku falsewystąpienia błędu zgłaszany jest błąd, jeśli brakuje żądanego arkusza. Ma zastosowanie tylko wtedy, gdy nazwa arkusza jest określona w pliku dataAddress. Prawidłowe wartości: true, false. |
includePhoneticRuns |
false |
Czy dołączyć adnotacje fonetyczne (takie jak pinyin lub furigana) łączone z wartościami ciągów komórek podczas odczytywania plików XLSX. Prawidłowe wartości: true, false. |
operation |
readSheet |
Operacja do wykonania w skoroszycie Excel. Prawidłowe wartości: readSheet (odczytuje dane z arkusza) listSheets (zwraca strukturę z polami sheetIndex: long i sheetName: String dla każdego arkusza). |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
Niestandardowy ciąg formatu dla wartości timestamp-without-timezone przechowywanych jako ciągi w Excel. Niestandardowe formaty dat są zgodne z formatami we wzorcach Datetime patterns. |
dateFormat |
yyyy-MM-dd |
Ciąg formatu niestandardowego dla wartości ciągów odczytanych jako Date. Niestandardowe formaty dat są zgodne z formatami we wzorcach Datetime patterns. |
JSON
| Key | Domyślny | Description |
|---|---|---|
allowBackslashEscapingAnyCharacter |
false |
Czy zezwolić na użycie ukośników odwrotnych do zasłaniania dowolnego znaku, który po nich następuje. Jeśli nie jest włączona, tylko znaki, które są jawnie wymienione przez specyfikację JSON, mogą zostać uniknione. |
allowComments |
false |
Czy zezwalać na używanie komentarzy w stylu Java, C i C++ ('/', '*', i '//' odmian) w analizowanej zawartości, czy nie. |
allowNonNumericNumbers |
true |
Czy zezwalać na zestaw tokenów nie będących liczbami (NaN) jako prawidłowe wartości liczb zmiennoprzecinkowych. |
allowNumericLeadingZeros |
false |
Czy pozwolić na rozpoczynanie liczb całkowitych od dodatkowych (ignorowanych) zer (na przykład 000001). |
allowSingleQuotes |
true |
Czy zezwalać na używanie pojedynczych cudzysłowów (apostrof, znak '\') do cytowania ciągów (nazw i wartości ciągu). |
allowUnquotedControlChars |
false |
Czy zezwolić ciągom JSON na zawieranie nieoznaczonych znaków kontrolnych (znaki ASCII o wartości mniejszej niż 32, w tym znaki tabulatora i przesunięcia wiersza), czy nie. |
allowUnquotedFieldNames |
false |
Czy zezwalać na używanie niekwotowanych nazw pól, które są dozwolone przez język JavaScript, ale nie przez specyfikację JSON. |
alternateVariantEncoding |
None | Kodowanie używane dla wartości wariantów w źródłowym formacie JSON. Ustaw wartość na wartość , aby Z85 zdekodować wartości wariantów, które zostały zakodowane w formacie Base85 zamiast przechowywanych jako wbudowany kod JSON. |
badRecordsPath |
None | Ścieżka do przechowywania plików do rejestrowania informacji o nieprawidłowych rekordach JSON.badRecordsPath Użycie opcji w źródle danych opartym na plikach ma następujące ograniczenia:
|
columnNameOfCorruptRecord |
_corrupt_record |
Kolumna do przechowywania rekordów, które są źle sformułowane i nie można ich przeanalizować.
mode Jeśli dla analizowania ustawiono wartość DROPMALFORMED, ta kolumna będzie pusta. |
dateFormat |
yyyy-MM-dd |
Format analizowania ciągów dat. |
dropFieldIfAllNull |
false |
Czy ignorować kolumny wszystkich wartości null, czy puste tablice i struktury podczas wnioskowania schematu. |
encoding lub charset |
UTF-8 |
Nazwa kodowania plików JSON. Zobacz java.nio.charset.Charset listę opcji. Nie można użyć polecenia UTF-16 i UTF-32 gdy multiline ma wartość true. |
inferTimestamp |
false |
Czy należy spróbować wywnioskować ciągi znacznika czasu jako TimestampType? W przypadku ustawienia trueopcji wnioskowanie schematu może trwać znacznie dłużej. Aby korzystać z Auto Loader, musisz włączyć cloudFiles.inferColumnTypes. |
lineSep |
Brak, który obejmuje \r, \r\ni \n |
Ciąg między dwoma następującymi po sobie rekordami JSON. |
locale |
US |
Identyfikator java.util.Locale. Wpływa na domyślną datę, znacznik czasu i analizowanie dziesiętne w formacie JSON. |
maxNestingDepth |
500 |
Maksymalna dozwolona głębokość zagnieżdżania dla obiektów i tablic JSON. Zwiększ tę wartość dla głęboko zagnieżdżonych dokumentów. Prawidłowe wartości: dodatnie liczby całkowite. |
maxNumLen |
1000 |
Maksymalna długość tokenów liczbowych w danych wejściowych JSON. Zwiększ tę wartość dla formatu JSON przy użyciu dużych literałów liczbowych. Prawidłowe wartości: dodatnie liczby całkowite. |
maxStringLen |
Nieograniczony | Maksymalna długość wartości ciągu w danych wejściowych JSON. Ustaw wartość , aby ograniczyć użycie pamięci podczas analizowania kodu JSON z dużymi ciągami. Prawidłowe wartości: dodatnie liczby całkowite. |
mode |
PERMISSIVE |
Tryb analizatora dotyczący obsługi nieprawidłowo sformułowanych danych. Prawidłowe wartości: PERMISSIVE, , DROPMALFORMEDFAILFAST. |
multiLine |
false |
Czy rekordy JSON rozciągają się na wiele wierszy? |
prefersDecimal |
false |
Próbuje interpretować ciągi znaków jako DecimalType zamiast typu float lub double, jeśli jest to możliwe. Należy również użyć wnioskowania schematu, włączając inferSchema lub używając cloudFiles.inferColumnTypes funkcji automatycznego modułu ładującego. |
primitivesAsString |
false |
Czy wywnioskować typy pierwotne, takie jak liczby i booleany, np. jako StringType. |
readerCaseSensitive |
true |
Określa zachowanie wrażliwości na wielkość liter po włączeniu rescuedDataColumn. Jeśli to prawda, należy uratować kolumny danych, których nazwy różnią się wielkością liter od schematu. Gdy wartość false, odczytuje dane w sposób niewrażliwy na wielkość liter. Dostępne w środowisku Databricks Runtime 13.3 lub nowszym. |
rescuedDataColumn |
None | Czy zebrać wszystkie dane, których nie można przeanalizować z powodu niezgodności typu danych lub niezgodności schematu (w tym wielkości liter kolumn) do oddzielnej kolumny. Ta kolumna jest domyślnie dołączana podczas korzystania z modułu automatycznego ładowania. Aby uzyskać więcej informacji, zobacz Co to jest uratowana kolumna danych?.COPY INTO (starsza wersja) nie obsługuje uratowanych kolumn danych, ponieważ nie można ręcznie ustawić schematu przy użyciu COPY INTO. Databricks zaleca używanie Auto Loader w przypadku większości scenariuszy przyjmowania danych. |
singleVariantColumn |
None | Czy pozyskać cały dokument JSON, przeanalizowany w jednej kolumnie Wariant z określonym ciągiem jako nazwą kolumny. Jeśli nie zostanie ustawiona, pola JSON są pozyskiwane do własnych kolumn. Prawidłowe wartości: dowolny ciąg. |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
Format analizowania ciągów znacznika czasu. |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
Format analizowania znacznika czasu bez ciągów strefy czasowej (TimestampNTZType). |
timeZone |
None | Element java.time.ZoneId do użycia podczas analizowania sygnatur czasowych i dat. |
upgradeExceptionAsBadRecord |
false |
Czy traktować wyjątki uaktualniania typu (na przykład jeśli nie można rozszerzyć wartości do zadeklarowanego typu kolumny) jako nieprawidłowe rekordy, a nie zgłaszać wyjątku. |
Kafka
Aby uzyskać pełną listę opcji czytnika platformy Kafka, zobacz Opcje dataStreamReader Kafka. Poniższe opcje dotyczą tylko operacji odczytu wsadowego przy użyciu polecenia spark.read.format("kafka").
| Key | Domyślny | Description |
|---|---|---|
endingOffsets |
latest |
Gdzie przestać czytać. Prawidłowe wartości: latestlub ciąg JSON przesunięcia dla każdej partycji, na przykład {"topicA":{"0":50,"1":-1}}.W ciągu -1 JSON jest najnowszym przesunięciem.
-2, który jest najwcześniejszym przesunięciem, nie jest dozwolony jako przesunięcie końcowe. |
endingOffsetsByTimestamp |
None | Przesunięcia końcowe dla partycji określone jako znaczniki czasu w milisekundach. Prawidłowe wartości: ciąg JSON sygnatur czasowych dla każdej partycji, taki jak {"topicA":{"0":2000,"1":3000}}. |
endingTimestamp |
None | Globalny znacznik czasu zakończenia w milisekundach zastosowany do wszystkich partycji. Prawidłowe wartości: liczby całkowite inne niż ujemne. |
ORC
| Key | Domyślny | Description |
|---|---|---|
mergeSchema |
false |
Czy należy wywnioskować schemat dla wielu plików i scalić schematy każdego pliku. |
Parkiet
| Key | Domyślny | Description |
|---|---|---|
datetimeRebaseMode |
LEGACY |
Steruje przestawieniem wartości DATE i TIMESTAMP między kalendarzami juliańskim a proleptycznym gregoriańskim. Prawidłowe wartości: EXCEPTION, LEGACYi CORRECTED. |
int96RebaseMode |
LEGACY |
Zarządza przebazowaniem wartości znacznika czasu INT96 między kalendarzami juliańskim a proleptycznym gregoriańskim. Prawidłowe wartości: EXCEPTION, LEGACYi CORRECTED. |
mergeSchema |
false |
Czy należy wywnioskować schemat dla wielu plików i scalić schematy każdego pliku. |
readerCaseSensitive |
true |
Określa zachowanie wrażliwości na wielkość liter po włączeniu rescuedDataColumn. Jeśli to prawda, należy uratować kolumny danych, których nazwy różnią się wielkością liter od schematu. Gdy wartość false, odczytuje dane w sposób niewrażliwy na wielkość liter. |
rescuedDataColumn |
None | Czy zebrać wszystkie dane, których nie można przeanalizować z powodu niezgodności typu danych oraz niezgodności schematu (w tym wielkości liter kolumn), do osobnej kolumny. Ta kolumna jest domyślnie dołączana podczas korzystania z modułu automatycznego ładowania. Aby uzyskać więcej informacji, zobacz Co to jest uratowana kolumna danych?.COPY INTO (starsza wersja) nie obsługuje uratowanych kolumn danych, ponieważ nie można ręcznie ustawić schematu przy użyciu COPY INTO. Databricks zaleca używanie Auto Loader w przypadku większości scenariuszy przyjmowania danych. |
Magazyn stanów
Użyj tych opcji z funkcją lub funkcją spark.read.format("statestore")read_statestore wartości tabeli, aby odczytać dane stanu przesyłania strumieniowego ze strukturą. Zobacz Jak odczytać informacje o stanie strukturalnego strumieniowania danych.
| Key | Domyślny | Description |
|---|---|---|
batchId |
Najnowszy identyfikator partii | Docelowa partia do odczytu. Użyj polecenia , aby wysłać zapytanie do wcześniejszego stanu zapytania. Partia musi zostać zatwierdzona, ale nie została jeszcze wyczyszczona. Prawidłowe wartości: liczby całkowite inne niż ujemne. |
operatorId |
0 |
Operator docelowy do odczytu. Użyj polecenia , gdy zapytanie ma wiele operatorów stanowych. Prawidłowe wartości: liczby całkowite inne niż ujemne. |
storeName |
DEFAULT |
Docelowa nazwa magazynu stanów do odczytania. Użyj, gdy operator stanowy ma wiele wystąpień magazynu stanów. Należy określić sprzężenia storeName strumienia lub joinSide dla sprzężenia strumienia, ale nie obu tych elementów. Prawidłowe wartości: dowolny ciąg. |
joinSide |
None | Strona docelowa do odczytu ze sprzężenia strumienia. Należy określić sprzężenia storeName strumienia lub joinSide dla sprzężenia strumienia, ale nie obu tych elementów. Prawidłowe wartości: left, right. |
snapshotStartBatchId |
None | Identyfikator partii migawki do użycia jako punkt początkowy podczas odczytywania stanu. Czytnik ponownie kompiluje stan przez ponowne odtworzenie zmian z tej migawki do .batchId Przydatne, gdy migawka jest uszkodzona. Należy określić razem z parametrem snapshotPartitionId. Nie można używać z readChangeFeedprogramem . Obsługuje magazyn stanów oparty na systemie plików HDFS i magazyn stanów bazy danych RocksDB z włączonym tworzeniem punktów kontrolnych dziennika zmian. Dostępne w środowisku Databricks Runtime 15.4 LTS lub nowszym. Prawidłowe wartości: liczby całkowite inne niż ujemne. |
snapshotPartitionId |
None | Jeśli zostanie określony, zapytanie odczytuje tylko tę partycję. Należy określić razem z parametrem snapshotStartBatchId. Nie można używać z readChangeFeedprogramem . Dostępne w środowisku Databricks Runtime 15.4 LTS lub nowszym. Prawidłowe wartości: liczby całkowite inne niż ujemne. |
readChangeFeed |
false |
Gdy truefunkcja zwraca zmiany stanu w określonym zakresie partii między changeStartBatchId i changeEndBatchId. Wymaga changeStartBatchId. Nie można używać z parametrami joinSide, batchId, snapshotStartBatchIdlub snapshotPartitionId. Dostępne w środowisku Databricks Runtime 16.4 LTS i nowszym. Prawidłowe wartości: true, false.Aby uzyskać szczegółowe informacje, zobacz Odczyt zmian stanu przesyłania strumieniowego ze strukturą. |
changeStartBatchId |
None | Identyfikator początkowej partii dla zakresu zestawienia zmian. Wymagane, gdy readChangeFeed ma wartość true. Ma zastosowanie tylko wtedy, gdy readChangeFeed ustawiono wartość true. Dostępne w środowisku Databricks Runtime 16.4 LTS i nowszym. Prawidłowe wartości: liczby całkowite inne niż ujemne. |
changeEndBatchId |
Najnowszy identyfikator partii | Końcowy identyfikator partii dla zakresu zestawienia zmian. Musi być większe lub równe changeStartBatchId. Ma zastosowanie tylko wtedy, gdy readChangeFeed ustawiono wartość true. Dostępne w środowisku Databricks Runtime 16.4 LTS i nowszym. Prawidłowe wartości: liczby całkowite inne niż ujemne. |
stateVarName |
None | Nazwa zmiennej stanu do odczytania. Nazwa zmiennej stanu jest unikatową nazwą każdej zmiennej w init ramach funkcji używanej StatefulProcessortransformWithState przez operator. Wymagane w przypadku korzystania z transformWithState operatora . Dostępne w środowisku Databricks Runtime 16.4 LTS i nowszym. Prawidłowe wartości: dowolny ciąg. |
readRegisteredTimers |
false |
Gdy trueparametr odczytuje zarejestrowane czasomierze używane przez transformWithState operatora. Dotyczy transformWithState tylko operatora . Dostępne w środowisku Databricks Runtime 16.4 LTS i nowszym. Prawidłowe wartości: true, false. |
flattenCollectionTypes |
true |
Gdy truefunkcja spłaszcza rekordy zwracane dla zmiennych stanu mapy i listy. Gdy falseparametr zwraca rekordy jako spark SQL Array lub Map. Dotyczy transformWithState tylko operatora . Dostępne w środowisku Databricks Runtime 16.4 LTS i nowszym. Prawidłowe wartości: true, false. |
Tekst
| Key | Domyślny | Description |
|---|---|---|
encoding |
UTF-8 |
Nazwa kodowania separatora wiersza pliku TEXT. Aby uzyskać listę opcji, zobacz java.nio.charset.Charset. Na zawartość pliku nie ma wpływu ta opcja i jest odczytywana as-is. |
lineSep |
Brak, który obejmuje \r, \r\n i \n |
Ciąg między dwoma następującymi po sobie rekordami TEXT. |
wholeText |
false |
Czy odczytywać plik jako pojedynczy rekord. |
XML
| Key | Domyślny | Description |
|---|---|---|
rowTag |
None | Tag wiersza określający, jak traktować pliki XML jako wiersz. W przykładowym pliku XML <books> <book><book>...<books>odpowiednia wartość to book. Ta opcja jest wymagana. |
samplingRatio |
1.0 |
Definiuje ułamek wierszy używanych do wnioskowania schematu. Wbudowane funkcje XML ignorują tę opcję. Prawidłowe wartości: 0.0 do 1.0. |
excludeAttribute |
false |
Czy wykluczać atrybuty w elementach. |
mode |
None | Tryb radzenia sobie z uszkodzonymi rekordami podczas analizowania.
PERMISSIVE: W przypadku uszkodzonych rekordów, źle sformułowany ciąg jest umieszczany w polu skonfigurowanym przez columnNameOfCorruptRecord, a źle sformułowane pola są ustawiane na null. Aby zachować uszkodzone rekordy, można ustawić string pole typu o nazwie columnNameOfCorruptRecord w schemacie zdefiniowanym przez użytkownika. Jeśli schemat nie ma pola, uszkodzone rekordy są porzucane podczas analizowania. Podczas wnioskowania schematu analizator niejawnie dodaje columnNameOfCorruptRecord pole w schemacie wyjściowym.
DROPMALFORMED: ignoruje uszkodzone rekordy. Ten tryb nie jest obsługiwany dla wbudowanych funkcji XML.
FAILFAST: zgłasza wyjątek, gdy analizator napotyka uszkodzone rekordy. |
inferSchema |
true |
Jeśli true, program próbuje wywnioskować odpowiedni typ dla każdej wynikowej kolumny DataFrame. Jeśli false, to wszystkie wynikowe kolumny są typu string. Wbudowane funkcje XML ignorują tę opcję. |
columnNameOfCorruptRecord |
spark.sql.columnNameOfCorruptRecord |
Umożliwia zmianę nazwy nowego pola zawierającego źle sformułowany ciąg utworzony przez PERMISSIVE tryb. |
attributePrefix |
None | Prefiks atrybutów do odróżnienia atrybutów od elementów. Będzie to prefiks nazw pól. Wartość domyślna to _. Może być pusty do odczytywania kodu XML, ale nie do zapisu. Dotyczy również opcji XML elementu DataFrameWriter. |
valueTag |
_VALUE |
Tag używany dla danych znakowych w elementach, które zawierają również atrybuty lub elementy podrzędne. Użytkownik może określić pole valueTag w schemacie, lub zostanie ono dodane automatycznie podczas wnioskowania schematu, gdy dane znakowe są obecne w elementach posiadających inne elementy lub atrybuty. Dotyczy również opcji XML elementu DataFrameWriter. |
encoding |
UTF-8 |
Do odczytu dekoduje pliki XML według danego typu kodowania. Na potrzeby pisania określa kodowanie (charset) zapisanych plików XML. Wbudowane funkcje XML ignorują tę opcję. Dotyczy również opcji XML elementu DataFrameWriter. |
ignoreSurroundingSpaces |
true |
Czy należy pominąć odstępy otaczające wartości. Dane zawierające wyłącznie znaki odstępu są ignorowane. |
rowValidationXSDPath |
None | Ścieżka do opcjonalnego pliku XSD używanego do sprawdzania poprawności kodu XML dla każdego wiersza osobno. Wiersze, które nie mogą sprawdzić poprawności, są traktowane jak błędy analizy. XSD nie ma w inny sposób wpływu na schemat, niezależnie od tego, czy podano, czy wywnioskowany. |
ignoreNamespace |
false |
Jeśli true, prefiksy przestrzeni nazw dla elementów i atrybutów XML są ignorowane. Tagi <abc:author> i <def:author>, na przykład są traktowane tak, jakby oba były tylko <author>. Przestrzenie nazw nie mogą być ignorowane w elemencie rowTag, tylko jego odczytywalne elementy podrzędne. Analizowanie kodu XML nie uwzględnia przestrzeni nazw, nawet jeśli false. |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
Niestandardowy format ciągu znacznika czasu zgodny ze wzorcem daty/godziny. Dotyczy to typu timestamp. Dotyczy również opcji XML elementu DataFrameWriter. |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
Łańcuch znaków formatu niestandardowego dla znacznika czasu bez strefy czasowej, zgodny ze wzorcem formatu daty-czasu. Dotyczy to typu TimestampNTZType. Dotyczy również opcji XML elementu DataFrameWriter. |
dateFormat |
yyyy-MM-dd |
Niestandardowy ciąg formatu daty zgodny ze wzorcem daty/godziny . Dotyczy to typu daty. Dotyczy również opcji XML elementu DataFrameWriter. |
locale |
en-US |
Ustawia lokalizację jako tag języka w formacie IETF BCP 47. Na przykład locale jest używany podczas analizowania dat i sygnatur czasowych. |
nullValue |
ciąg null |
Ustawia łańcuch znaków reprezentujący wartość null. Gdy wartość tego parametru to null, analizator nie zapisuje atrybutów i elementów dla pól. Dotyczy również opcji XML elementu DataFrameWriter. |
readerCaseSensitive |
true |
Określa zachowanie rozróżniania wielkości liter, gdy rescuedDataColumn jest włączone. Jeśli to prawda, należy uratować kolumny danych, których nazwy różnią się wielkością liter od schematu. Gdy wartość false, odczytuje dane w sposób niewrażliwy na wielkość liter. |
rescuedDataColumn |
None | Czy należy zebrać wszystkie dane, których nie można przeanalizować z powodu niezgodności typu danych i niezgodności schematu (w tym wielkości liter kolumn), w celu umieszczenia ich w oddzielnej kolumnie. Ta kolumna jest domyślnie dołączana podczas korzystania z modułu automatycznego ładowania. Aby uzyskać więcej informacji, zobacz Co to jest uratowana kolumna danych?.
COPY INTO (starsza wersja) nie obsługuje uratowanych kolumn danych, ponieważ nie można ręcznie ustawić schematu przy użyciu COPY INTO. Databricks zaleca używanie Auto Loader w przypadku większości scenariuszy przyjmowania danych. |
singleVariantColumn |
none |
Określa nazwę kolumny pojedynczego wariantu. Jeśli ta opcja jest określona do odczytu, przeanalizuj cały rekord XML w jedną kolumnę Wariant z daną wartością ciągu opcji jako nazwą kolumny. Jeśli ta opcja jest dostępna do zapisu, zapisz wartość pojedynczej kolumny Variant w plikach XML. Dotyczy również opcji XML elementu DataFrameWriter. |
useLegacyXMLParser |
true |
Czy używać starszego analizatora XML. Starszy analizator ma mniej rygorystyczną walidację dla źle sformułowanej zawartości, ale jest mniej wydajna w pamięci. Ustaw wartość , aby false wybrać bardziej rygorystyczny domyślny analizator. |
wildcardColName |
xs_any |
Nazwa kolumny używana do przechwytywania elementów XML pasujących do elementu schematu symbolu wieloznacznego (xs:any). Nie można używać razem z rescuedDataColumn. |
Opcje DataStreamReader
Te opcje umożliwiają DataStreamReader.option() skonfigurowanie odczytów przesyłanych strumieniowo z tabel usługi Delta Lake i innych źródeł opartych na plikach.
Aby uzyskać opcje formatowania plików (JSON, CSV, Parquet i inne), zobacz Opcje elementu DataFrameReader.
Aby zapoznać się z opcjami automatycznego modułu ładującego (), zobacz Auto Loader (cloudFiles.*Automatyczne ładowanie).
Example
W poniższym przykładzie ustawiono wartość maxFilesPerTrigger10 dla strumienia tabeli usługi Delta Lake:
Python
df = spark.readStream.format("delta").option("maxFilesPerTrigger", 10).load("/path/to/delta-table")
Scala
val df = spark.readStream.format("delta").option("maxFilesPerTrigger", "10").load("/path/to/delta-table")
Wspólne
Poniższe opcje dotyczą tabel usługi Delta Lake i innych źródeł przesyłania strumieniowego opartego na plikach.
| Key | Domyślny | Description |
|---|---|---|
cleanSource |
off |
Jak obsługiwać pliki źródłowe po ich przetworzeniu przez strumień. Prawidłowe wartości: off (bez akcji), delete (trwale usuń plik źródłowy), archive (przejdź do .sourceArchiveDir W przypadku ustawienia parametru należy również ustawić archivesourceArchiveDir wartość . Nie dotyczy przesyłania strumieniowego tabel usługi Delta Lake. |
fileNameOnly |
false |
Czy zidentyfikować już przetworzone pliki tylko według nazwy pliku, a nie przez pełną ścieżkę. Gdy truepliki w różnych ścieżkach o tej samej nazwie pliku są traktowane jako ten sam plik i nie są ponownie przetwarzane. Nie dotyczy przesyłania strumieniowego tabel usługi Delta Lake. |
latestFirst |
false |
Czy należy najpierw przetwarzać ostatnio zmodyfikowane pliki w ramach każdej mikrosadowej. Przydatne, gdy chcesz jak najszybciej przetworzyć najnowsze dane. Gdy true ustawienie i maxFilesPerTrigger lub maxBytesPerTrigger jest ustawione, maxFileAge jest ignorowane. Nie dotyczy przesyłania strumieniowego tabel usługi Delta Lake. |
maxBytesPerTrigger |
None | Miękka maksymalna ilość danych przetwarzanych dla każdej mikrosadowej partii. Partia może przetworzyć więcej niż limit, jeśli najmniejsza jednostka wejściowa przekroczy ją. W przypadku użycia razem z elementem maxFilesPerTriggermikrosadowe przetwarza dane do momentu osiągnięcia pierwszego limitu. Prawidłowe wartości: dodatnie liczby całkowite.Zamiast tego użyj cloudFiles.maxBytesPerTrigger dla Auto Loader. Zobacz Typowe. |
maxCachedFiles |
10000 |
Maksymalna liczba nieprzetworzonych plików do pamięci podręcznej dla kolejnych mikrosadów. Ustaw wartość , aby 0 wyłączyć buforowanie. Zwiększ tę wartość, gdy katalog źródłowy zawiera wiele nowych plików dla każdego wyzwalacza. Nie dotyczy przesyłania strumieniowego tabel usługi Delta Lake. Prawidłowe wartości: dodatnie liczby całkowite lub 0. |
maxFileAge |
7d |
Maksymalny wiek plików rozważanych do przetwarzania w stosunku do znacznika czasu ostatnio zmodyfikowanego pliku, a nie bieżącego czasu systemowego. Pliki starsze niż ten próg są ignorowane. Akceptuje ciągi czasu trwania, takie jak 7d lub 4h. Ignorowane, gdy latestFirst wartość jest true ustawiona lub maxFilesPerTriggermaxBytesPerTrigger jest ustawiona. Nie dotyczy przesyłania strumieniowego tabel usługi Delta Lake. |
maxFilesPerTrigger |
1000 dla usług Delta Lake i Auto Loader. Brak maksymalnej wartości dla innych źródeł opartych na plikach. |
Górna granica liczby nowych plików przetworzonych w każdej mikrosadowej partii. W przypadku użycia razem z elementem maxBytesPerTriggermikrosadowe przetwarza dane do momentu osiągnięcia pierwszego limitu. Prawidłowe wartości: dodatnie liczby całkowite.Zamiast tego użyj cloudFiles.maxFilesPerTrigger dla Auto Loader. Zobacz Typowe. |
sourceArchiveDir |
None | Ścieżka do katalogu archiwum, gdy cleanSource jest ustawiona na archivewartość . Pliki źródłowe są przenoszone do tej ścieżki po przetworzeniu, zachowując ich względną strukturę katalogów. Nie dotyczy przesyłania strumieniowego tabel usługi Delta Lake. |
Moduł automatycznego ładowania
Użyj tych opcji ze cloudFiles źródłem, aby skonfigurować moduł automatycznego ładowania na potrzeby pozyskiwania danych przesyłanych strumieniowo z magazynu w chmurze. Opcje specyficzne dla cloudFiles źródła są poprzedzone prefiksem cloudFiles , aby zachować je w oddzielnej przestrzeni nazw od innych opcji źródła przesyłania strumieniowego ze strukturą .
Wspólne
| Key | Domyślny | Description |
|---|---|---|
cloudFiles.allowOverwrites |
false |
Czy zezwolić na zmianę pliku katalogu wejściowego w celu zastąpienia istniejących danych. Aby uzyskać zastrzeżenia dotyczące konfiguracji, zobacz Czy funkcja automatycznego ładowania ponownie przetwarza plik, gdy plik zostanie dołączony lub zastąpiony?. |
cloudFiles.backfillInterval |
None | Auto Loader może wyzwalać asynchroniczne wypełnienia w określonym przedziale czasu. Na przykład 1 day do wypełniania kopii zapasowych codziennie lub 1 week do wypełniania kopii zapasowych co tydzień. Aby uzyskać więcej informacji, zobacz Wyzwalanie regularnych wypełniania przy użyciu pliku cloudFiles.backfillInterval.Nie używaj, gdy cloudFiles.useManagedFileEvents jest ustawione na true. |
cloudFiles.cleanSource |
OFF |
Czy automatycznie usuwać przetworzone pliki z katalogu wejściowego. W przypadku ustawienia wartości OFF (wartość domyślna) żadne pliki nie są usuwane.Gdy jest ustawiona wartość DELETE, Auto Loader automatycznie usuwa pliki 30 dni po ich przetworzeniu. W tym celu moduł automatycznego ładowania musi mieć uprawnienia do zapisu w katalogu źródłowym.Gdy jest ustawiona wartość MOVE, automatycznie moduł ładujący przenosi pliki do określonej lokalizacji w ciągu cloudFiles.cleanSource.moveDestination 30 dni po ich przetworzeniu. W tym celu Auto Loader musi mieć uprawnienia do zapisu w katalogu źródłowym, a także do lokalizacji docelowej.Plik jest traktowany jako przetwarzany, gdy ma wartość commit_time inną niż null w wyniku cloud_files_state funkcji wartości tabeli. Zobacz cloud_files_state funkcji wartości tabeli. 30-dniowe dodatkowe oczekiwanie po zakończeniu przetwarzania można skonfigurować przy użyciu polecenia cloudFiles.cleanSource.retentionDuration.Przed włączeniem cloudFiles.cleanSourcefunkcji zapoznaj się z następującymi zagadnieniami:
Dostępne w środowisku Databricks Runtime 16.4 lub nowszym. |
cloudFiles.cleanSource.retentionDuration |
30 days |
Czas oczekiwania, zanim przetworzone pliki staną się kandydatami do archiwizacji za pomocą polecenia cleanSource. Wartość musi być większa niż 7 dni dla elementu DELETE. Brak minimalnego ograniczenia dla elementu MOVE.Wartość jest ciągiem CalendarInterval . Na przykład , "14 days", "30 days", "2 weeks"lub "1 month".Dostępne w środowisku Databricks Runtime 16.4 lub nowszym. |
cloudFiles.cleanSource.moveDestination |
None | Ścieżka do archiwizacji przetworzonych plików, gdy cloudFiles.cleanSource jest ustawiony na MOVE. Może to być ścieżka magazynu w chmurze lub ścieżka woluminu wykazu aparatu Unity (na przykład /Volumes/my_catalog/my_schema/my_volume/archive/).Lokalizacja przenoszenia musi:
Automatyczny moduł ładujący musi mieć uprawnienia do zapisu w tym katalogu. Dostępne w środowisku Databricks Runtime 16.4 lub nowszym. |
cloudFiles.format |
Brak (wymagana opcja) |
Format pliku danych w ścieżce źródłowej. Prawidłowe wartości to:
|
cloudFiles.includeExistingFiles |
true |
Czy dołączyć istniejące pliki do ścieżki wejściowej przetwarzania strumienia, czy tylko przetworzyć nowe pliki przychodzące po wstępnej konfiguracji. Ta opcja jest oceniana tylko wtedy, gdy uruchamiasz strumień po raz pierwszy. Zmiana tej opcji po ponownym uruchomieniu strumienia nie ma żadnego wpływu. |
cloudFiles.inferColumnTypes |
false |
Czy wywnioskować dokładne typy kolumn podczas korzystania z wnioskowania schematu. Domyślnie kolumny są wnioskowane jako ciągi podczas wnioskowania zestawów danych JSON i CSV. Aby uzyskać więcej informacji, zobacz wnioskowanie schematu . |
cloudFiles.maxBytesPerTrigger |
None | Maksymalna liczba nowych bajtów do przetworzenia w każdym wyzwalaczu. Można określić ciąg bajtów, taki jak 10g, aby ograniczyć każdą mikropartię do 10 GB danych. Jest to łagodne maksimum. Jeśli masz pliki o rozmiarze 3 GB, usługa Azure Databricks przetwarza 12 GB w mikrobajtach. W przypadku użycia razem z usługą cloudFiles.maxFilesPerTrigger, Azure Databricks zużywa do niższego limitu cloudFiles.maxFilesPerTrigger lub cloudFiles.maxBytesPerTrigger, w zależności od tego, który pierwszy zostanie osiągnięty. Ta opcja nie ma żadnego wpływu, gdy jest używana z Trigger.Once() (Trigger.Once() jest przestarzałe).W środowisku Databricks Runtime 18.0 lub nowszym ta opcja jest konfigurowana dynamicznie i nie musi być ustawiana ręcznie. |
cloudFiles.maxFileAge |
None | Jak długo zdarzenie dotyczące pliku jest śledzone w celach deduplikacji. Usługa Databricks nie zaleca dostrajania tego parametru, chyba że przetwarzasz dane w ilości milionów plików na godzinę. Aby uzyskać więcej informacji, zobacz sekcję Dotyczącą śledzenia zdarzeń plików . Zbyt agresywne dostrajanie cloudFiles.maxFileAge może powodować problemy z jakością danych, takie jak powtórne pobieranie danych lub brakujące pliki. W związku z tym Databricks rekomenduje ustawienie konserwatywne dla cloudFiles.maxFileAge, takie jak 90 dni, co jest podobne do zaleceń porównywalnych rozwiązań do pozyskiwania danych. |
cloudFiles.maxFilesPerTrigger |
1000 |
Maksymalna liczba nowych plików do przetworzenia w każdym wyzwalaczu. W przypadku użycia razem z usługą cloudFiles.maxBytesPerTrigger, Azure Databricks zużywa do niższego limitu cloudFiles.maxFilesPerTrigger lub cloudFiles.maxBytesPerTrigger, w zależności od tego, który pierwszy zostanie osiągnięty. Ta opcja nie ma żadnego wpływu w przypadku użycia z opcją Trigger.Once() (przestarzałe).W środowisku Databricks Runtime 18.0 lub nowszym ta opcja jest konfigurowana dynamicznie i nie musi być ustawiana ręcznie. |
cloudFiles.partitionColumns |
None | Rozdzielona przecinkami lista kolumn partycji w stylu hive, które mają być wywnioskowane ze struktury katalogów plików. Kolumny partycji w stylu hive to pary klucz-wartość połączone znakiem równości, takim jak <base-path>/a=x/b=1/c=y/file.format. W tym przykładzie kolumny partycji to a, bi c. Domyślnie te kolumny są automatycznie dodawane do schematu, jeśli używasz wnioskowania schematu i udostępniasz element <base-path> do ładowania danych. Jeśli podasz schemat, moduł automatycznego ładowania oczekuje, że te kolumny zostaną uwzględnione w schemacie. Jeśli nie chcesz, aby te kolumny były częścią schematu, możesz określić "" , aby ignorować te kolumny. Ponadto możesz użyć tej opcji, jeśli chcesz, aby kolumny mogły być wywnioskowane ścieżką pliku w złożonych strukturach katalogów, podobnie jak w poniższym przykładzie:<base-path>/year=2022/week=1/file1.csv<base-path>/year=2022/month=2/day=3/file2.csv<base-path>/year=2022/month=2/day=4/file3.csvOkreślenie cloudFiles.partitionColumns jako year,month,day zwraca year=2022 dla file1.csv, ale kolumny month i day są null.month i day są poprawnie analizowane dla file2.csv i file3.csv. |
cloudFiles.schemaEvolutionMode |
addNewColumns gdy schemat nie jest podany, none w przeciwnym razie |
Tryb ewolucji schematu w miarę odnajdowania nowych kolumn w danych. Domyślnie kolumny są wnioskowane jako ciągi podczas wnioskowania zestawów danych JSON. Aby uzyskać więcej informacji, zobacz Ewolucja schematu . |
cloudFiles.schemaHints |
None | Informacje o schemacie, które podajesz do Auto Loader podczas wnioskowania schematu. Aby uzyskać więcej szczegółów, zobacz wskazówki dotyczące schematu . |
cloudFiles.schemaLocation |
Brak (wymagane do wywnioskowania schematu) | Lokalizacja do przechowywania wywnioskowanych schematów i kolejnych zmian. Aby uzyskać więcej informacji, zobacz wnioskowanie schematu . |
cloudFiles.useStrictGlobber |
false |
Czy używać restrykcyjnego wzorca globowania, który odpowiada domyślnemu zachowaniu globbingu innych źródeł plików w Apache Spark. Aby uzyskać więcej informacji, zobacz Typowe wzorce ładowania danych . Dostępne w środowisku Databricks Runtime 12.2 LTS lub nowszym. |
cloudFiles.validateOptions |
true |
Czy zweryfikować opcje "Auto Loader" i zwrócić błąd dla nieznanych lub niespójnych opcji. |
Lista katalogów
| Key | Domyślny | Description |
|---|---|---|
cloudFiles.useIncrementalListing (przestarzałe) |
auto w środowisku Databricks Runtime 17.2 lub nowszym w false środowisku Databricks Runtime 17.3 lub nowszym |
Ta funkcja jest przestarzała. Databricks zaleca użycie trybu powiadomień plików ze zdarzeniami plików zamiast cloudFiles.useIncrementalListing.Czy w trybie listowania katalogów używać listy przyrostowej zamiast pełnej listy. Domyślnie moduł automatycznego ładowania umożliwia automatyczne wykrywanie, czy dany katalog ma zastosowanie do listy przyrostowej. Można jawnie użyć listy przyrostowej lub użyć pełnej listy katalogów, ustawiając ją odpowiednio jako true lub false .Błędne włączenie listy przyrostowej w nieuporządkowanym leksykalnie katalogu uniemożliwia funkcjonowanie modułu Auto Loader w odnajdywaniu nowych plików. Współpracuje z usługą Azure Data Lake Storage ( abfss://), S3 (s3://) i GCS (gs://).Dostępne w środowisku Databricks Runtime 9.1 LTS lub nowszym. Dostępne wartości: auto, , truefalse |
Powiadomienie o pliku
Aby uzyskać informacje na temat konfigurowania trybu powiadomień plików, w tym wymaganych uprawnień do chmury, instrukcji konfiguracji i metod uwierzytelniania, zobacz Konfigurowanie strumieni automatycznego modułu ładującego w trybie powiadomień plików.
| Key | Domyślny | Description |
|---|---|---|
cloudFiles.fetchParallelism |
1 |
Liczba wątków używanych do pobierania komunikatów z usługi kolejkowania. Nie używaj, gdy cloudFiles.useManagedFileEvents jest ustawione na true. |
cloudFiles.pathRewrites |
None | Wymagane tylko w przypadku określenia queueUrl , który odbiera powiadomienia o plikach z wielu zasobników S3 i chcesz użyć punktów instalacji skonfigurowanych do uzyskiwania dostępu do danych w tych kontenerach. Użyj tej opcji, aby przepisać prefiks ścieżki bucket/key tak, aby odpowiadał punktowi montowania. Można przepisać tylko prefiksy. Na przykład w przypadku konfiguracji {"<databricks-mounted-bucket>/path": "dbfs:/mnt/data-warehouse"}ścieżka s3://<databricks-mounted-bucket>/path/2017/08/fileA.json zostanie przepisana na dbfs:/mnt/data-warehouse/2017/08/fileA.json.Nie używaj, gdy cloudFiles.useManagedFileEvents jest ustawione na true. |
cloudFiles.resourceTag |
None | Seria par tagów klucz-wartość, które ułatwiają kojarzenie i identyfikowanie powiązanych zasobów, na przykład:cloudFiles.option("cloudFiles.resourceTag.myFirstKey", "myFirstValue") .option("cloudFiles.resourceTag.mySecondKey", "mySecondValue")Aby uzyskać więcej informacji na temat AWS, zobacz Tagi alokacji kosztów Amazon SQS oraz Konfigurowanie tagów dla tematu Amazon SNS. (1) Aby uzyskać więcej informacji na temat platformy Azure, zobacz Nazewnictwo kolejek i metadanych oraz omówienie properties.labels w subskrypcjach zdarzeń. Moduł automatycznego ładowania przechowuje te pary tagów klucz-wartość w formacie JSON jako etykiety.
(1)Aby uzyskać więcej informacji na temat GCP, zobacz Raportowanie użycia z etykietami. (1) Nie używaj, gdy cloudFiles.useManagedFileEvents jest ustawione na true. Zamiast tego należy ustawić tagi zasobów przy użyciu konsoli dostawcy usług w chmurze. |
cloudFiles.useManagedFileEvents |
false |
Gdy jest ustawiona wartość true, funkcja automatycznego modułu ładującego używa usługi zdarzeń plików do odnajdywania plików w lokalizacji zewnętrznej. Tej opcji można użyć tylko wtedy, gdy ścieżka ładowania znajduje się w lokalizacji zewnętrznej z włączonymi zdarzeniami plików. Zobacz Use file notification mode with file events (Używanie trybu powiadomień plików ze zdarzeniami plików).Zdarzenia plików zapewniają wydajność na poziomie powiadomień podczas odnajdywania plików, ponieważ moduł automatycznego ładowania może odnaleźć nowe pliki po ostatnim uruchomieniu. W przeciwieństwie do listy katalogów ten proces nie musi zawierać listy wszystkich plików w katalogu. W niektórych sytuacjach funkcja automatycznego ładowania używa listy katalogów, mimo że opcja zdarzeń plików jest włączona:
Zobacz Kiedy program Auto Loader ze zdarzeniami plików używa listy katalogów? aby uzyskać pełną listę sytuacji, w których program Auto loader używa listy katalogów z tą opcją. Dostępne w środowisku Databricks Runtime 14.3 LTS lub nowszym. |
cloudFiles.listOnStart |
false |
Gdy jest ustawiona wartość true, moduł automatycznego ładowania wykonuje pełną listę katalogów po uruchomieniu strumienia, zamiast rozpoczynać się od tokenu kontynuacji w punkcie kontrolnym. Użyj tej opcji, aby odzyskać dane po błędach, takich jak CF_MANAGED_FILE_EVENTS_INVALID_CONTINUATION_TOKEN. Zobacz Jak mogę odzyskać odzyskiwanie po CF_MANAGED_FILE_EVENTS_INVALID_CONTINUATION_TOKEN błędzie?. |
cloudFiles.useNotifications |
false |
Czy używać trybu powiadomień plików do określenia, kiedy istnieją nowe pliki. Jeśli false, użyj trybu wylistowania katalogu. Zobacz Porównanie trybów wykrywania plików automatycznego modułu ładującego.Nie używaj, gdy cloudFiles.useManagedFileEvents jest ustawione na true. |
(1) Moduł automatycznego ładowania domyślnie dodaje następujące pary tagów klucz-wartość:
-
vendor:Databricks -
path: lokalizacja, z której są ładowane dane. Niedostępne w GCP z powodu ograniczeń etykietowania. -
checkpointLocation: lokalizacja punktu kontrolnego strumienia. Niedostępne w GCP z powodu ograniczeń etykietowania. -
streamId: globalnie unikatowy identyfikator strumienia.
Usługa Databricks rezerwuje te nazwy kluczy i nie można zastąpić ich wartości.
Specyficzne dla chmury
Moduł automatycznego ładowania udostępnia opcje konfigurowania infrastruktury chmury dla trybu powiadomień plików. Aby uzyskać wymagane uprawnienia do chmury i instrukcje dotyczące konfiguracji, zobacz Konfigurowanie strumieni automatycznego ładowania w trybie powiadomień plików.
AWS
Podaj następujące opcje tylko wtedy, gdy wybierzesz cloudFiles.useNotifications = true i chcesz, aby Auto Loader skonfigurował usługi powiadomień za Ciebie:
| Key | Domyślny | Description |
|---|---|---|
cloudFiles.region |
Region wystąpienia usługi EC2 | Region, w którym znajduje się źródłowy zasobnik S3 i gdzie chcesz utworzyć usługi AWS SNS i SQS. |
| Key | Domyślny | Description |
|---|---|---|
cloudFiles.restrictNotificationSetupToSameAWSAccountId |
false |
Zezwalaj tylko na powiadomienia o zdarzeniach z zasobników usługi AWS S3 na tym samym koncie co temat SNS. Jeśli to prawda, moduł automatycznego ładowania akceptuje tylko powiadomienia o zdarzeniach z zasobników usługi AWS S3 na tym samym koncie co temat SNS. Gdy false zasady dostępu nie ograniczają konfiguracji zasobników między kontami i tematyki SNS. Jest to przydatne, gdy temat usługi SNS i ścieżka do zasobnika są skojarzone z różnymi kontami.Dostępne w środowisku Databricks Runtime 17.2 lub nowszym. |
Podaj następującą opcję tylko wtedy, gdy wybierzesz cloudFiles.useNotifications = true i chcesz, aby Automatyczny Moduł Ładujący używał kolejki, którą już skonfigurowałeś:
| Key | Domyślny | Description |
|---|---|---|
cloudFiles.queueUrl |
None | Adres URL kolejki SQS. Jeśli jest podana, Auto Loader bezpośrednio pobiera zdarzenia z tej kolejki zamiast konfigurowania własnych usług AWS SNS i SQS. |
Opcje uwierzytelniania platformy AWS
Podaj następującą opcję uwierzytelniania, aby użyć poświadczeń usługi Databricks:
| Key | Domyślny | Description |
|---|---|---|
databricks.serviceCredential |
None | Nazwa poświadczenia dla usługi Databricks . Dostępne w środowisku Databricks Runtime 16.1 lub nowszym. |
Jeśli poświadczenia usługi Databricks lub role IAM nie są dostępne, możesz zamiast tego podać następujące opcje uwierzytelniania:
| Key | Domyślny | Description |
|---|---|---|
cloudFiles.awsAccessKey |
None | Identyfikator klucza dostępu platformy AWS dla użytkownika. Musi być dostarczany z cloudFiles.awsSecretKey. |
cloudFiles.awsSecretKey |
None | Tajny klucz dostępu AWS dla użytkownika. Musi być dostarczany z cloudFiles.awsAccessKey. |
cloudFiles.roleArn |
None | ARN roli IAM, którą należy przyjąć, w razie potrzeby. Rolę można przyjąć z profilu wystąpienia klastra lub podając poświadczenia za pomocą cloudFiles.awsAccessKey i cloudFiles.awsSecretKey. |
cloudFiles.roleExternalId |
None | Identyfikator do podania podczas przyjmowania roli przy użyciu cloudFiles.roleArn. |
cloudFiles.roleSessionName |
None | Opcjonalna nazwa sesji do użycia podczas zakładania roli przy użyciu polecenia cloudFiles.roleArn. |
cloudFiles.stsEndpoint |
None | Opcjonalny punkt końcowy, który umożliwia uzyskanie dostępu do usługi AWS STS podczas przyjmowania roli przy użyciu cloudFiles.roleArn. |
Azure
Musisz podać wartości dla wszystkich następujących opcji, jeśli określisz cloudFiles.useNotifications = true i chcesz, aby Auto Loader skonfigurował usługi powiadomień.
Jeśli poświadczenie usługi Databricks jest niedostępne, możesz zamiast tego podać następujące opcje uwierzytelniania:
| Key | Domyślny | Description |
|---|---|---|
cloudFiles.clientId |
None | Identyfikator klienta lub identyfikator aplikacji jednostki usługi. |
cloudFiles.clientSecret |
None | Tajny klucz klienta pryncypała usługi. |
cloudFiles.connectionString |
None | Łańcuch połączenia dla konta przechowywania, oparty na kluczu dostępu do konta lub sygnaturze dostępu współdzielonego (SAS). |
cloudFiles.tenantId |
None | Identyfikator dzierżawy Azure, w którym jest tworzona jednostka usługi. |
Podaj następującą opcję tylko w przypadku ustawienia cloudFiles.useNotifications = true i chcesz, aby moduł automatycznego ładowania używał istniejącej kolejki:
| Key | Domyślny | Description |
|---|---|---|
cloudFiles.queueName |
None | Nazwa kolejki platformy Azure. Jeśli zostanie to podane, źródło plików w chmurze bezpośrednio odbiera zdarzenia z tej kolejki, zamiast konfigurować własne rozwiązania Azure Event Grid i Queue Storage. W takim przypadku databricks.serviceCredential lub cloudFiles.connectionString wymagają tylko uprawnień do odczytu w kolejce. |
GCP
Auto Loader może automatycznie skonfigurować usługi powiadomień, wykorzystując poświadczenia serwisowe Databricks. Konto usługi utworzone przy użyciu poświadczeń usługi Databricks będzie wymagać uprawnień określonych w temacie Konfigurowanie strumieni automatycznego ładowania w trybie powiadomień plików.
Jeśli poświadczenie usługi Databricks jest niedostępne, możesz użyć kont usług Google bezpośrednio. Klaster można skonfigurować tak, aby zakładał konto usługi, postępując zgodnie z konfiguracją usługi Google lub bezpośrednio podać następujące opcje uwierzytelniania:
| Key | Domyślny | Description |
|---|---|---|
cloudFiles.client |
None | Identyfikator klienta konta usługi Google. |
cloudFiles.clientEmail |
None | Adres e-mail konta usługi Google. |
cloudFiles.privateKey |
None | Klucz prywatny wygenerowany dla konta usługi Google. |
cloudFiles.privateKeyId |
None | Identyfikator klucza prywatnego wygenerowanego dla konta usługi Google. |
Podaj następującą opcję tylko wtedy, gdy wybierzesz cloudFiles.useNotifications = true i chcesz, aby Automatyczny Moduł Ładujący używał kolejki, którą już skonfigurowałeś:
| Key | Domyślny | Description |
|---|---|---|
cloudFiles.subscription |
None | Nazwa subskrypcji Google Cloud Pub/Sub. Jeśli zostanie dostarczone, źródło plików w chmurze korzysta ze zdarzeń z tej kolejki zamiast konfigurowania własnych usług GCS Notification i Google Cloud Pub/Sub. |
Delta Lake
Poniższe opcje mają zastosowanie podczas odczytywania z tabeli usługi Delta Lake przy użyciu polecenia spark.readStream.
| Key | Domyślny | Description |
|---|---|---|
allowSourceColumnDrop |
None | Ustaw numer wersji tabeli delty lub "always" zezwalaj strumieniu na kontynuowanie po usunięciu kolumn ze schematu tabeli źródłowej. Po ustawieniu numeru wersji potwierdza, że wszystkie schematy zmieniają się do tej wersji. Wymaga schemaTrackingLocation. Zobacz Zmień nazwę i usuń kolumny za pomocą mapowania kolumn Delta Lake. |
allowSourceColumnRename |
None | Ustaw numer wersji tabeli delty lub "always" zezwalaj strumieniu na kontynuowanie po zmianie nazwy kolumn w tabeli źródłowej. Po ustawieniu numeru wersji potwierdza, że wszystkie schematy zmieniają się do tej wersji. Wymaga schemaTrackingLocation. Zobacz Zmień nazwę i usuń kolumny za pomocą mapowania kolumn Delta Lake. |
allowSourceColumnTypeChange |
None | Ustaw numer wersji tabeli delty lub "always" zezwalaj strumieniu na kontynuowanie po zmianie typów kolumn w tabeli źródłowej. Po ustawieniu numeru wersji potwierdza, że wszystkie schematy zmieniają się do tej wersji. Wymaga schemaTrackingLocation. Sprawdź Rozszerzanie typu. |
excludeRegex |
None | Wzorzec wyrażenia regularnego. Pliki, których ścieżki są zgodne ze wzorcem, są wykluczone z odczytu przesyłania strumieniowego. Przydatne do filtrowania plików, które nie są zgodne z oczekiwaną konwencją nazewnictwa. |
failOnDataLoss |
true |
Czy zapytanie przesyłania strumieniowego nie powiodło się, jeśli dane źródłowe zostały usunięte z powodu przechowywania dziennika (logRetentionDuration). Ustaw wartość , aby false pominąć brakujące dane i kontynuować przetwarzanie. Zobacz Konfigurowanie przechowywania danych dla zapytań dotyczących podróży w czasie. |
ignoreChanges (przestarzałe) |
false |
Dostępne w środowisku Databricks Runtime 11.3 LTS i niższym. Ponownie emituje ponownie pliki danych po operacjach modyfikacji, takich jak UPDATE, MERGE INTO, DELETElub OVERWRITE. Niezmienione wiersze mogą być emitowane obok nowych wierszy, więc odbiorcy podrzędni muszą obsługiwać duplikaty. Usunięcia nie są propagowane w dół. Zastąpiono elementem skipChangeCommits w środowisku Databricks Runtime 12.2 LTS i nowszym. |
ignoreDeletes (przestarzałe) |
false |
Ignoruje transakcje, które usuwają dane na granicach partycji (tylko pełna partycja spada). Nie obsługuje usuwania, aktualizacji ani innych modyfikacji niezwiązanych z partycjami. Użyj skipChangeCommits zamiast tego. |
readChangeFeed lub readChangeData |
false |
Czy włączyć odczytywanie zestawienia danych zmian dla zapytania przesyłania strumieniowego. Po włączeniu strumień emituje zmiany na poziomie wiersza (wstawia, aktualizuje i usuwa) z dodatkowymi kolumnami metadanych. Zobacz Korzystanie z przepływu danych zmian w Delta Lake na platformie Azure Databricks. |
schemaTrackingLocation |
None | Ścieżka do katalogu, w którym usługa Delta Lake śledzi zmiany schematu dla odczytu przesyłania strumieniowego. Wymagane w przypadku przesyłania strumieniowego z tabel z włączonym mapowaniem kolumn i używaniem allowSourceColumn* opcji do obsługi ewolucji schematu. Musi znajdować się w obrębie checkpointLocation zapytania przesyłania strumieniowego. Zobacz Zmień nazwę i usuń kolumny za pomocą mapowania kolumn Delta Lake. |
skipChangeCommits |
false |
Ignoruje transakcje, które usuwają lub modyfikują istniejące rekordy i procesy tylko dołączają. Usługa Databricks zaleca tę opcję w przypadku większości obciążeń, które nie używają zestawienia zmian danych. Dostępne w środowisku Databricks Runtime 12.2 LTS lub nowszym. Zobacz Pomijanie zatwierdzeń zmian nadrzędnych za pomocą polecenia skipChangeCommits. |
startingTimestamp |
Najnowsza dostępna | Sygnatura czasowa rozpoczęcia odczytywania od. Strumień odczytuje wszystkie zmiany tabeli zatwierdzone w godzinie lub po określonym znaczniku czasu. Jeśli znacznik czasu poprzedza wszystkie dostępne zatwierdzenia tabeli, strumień rozpoczyna się od najwcześniejszego dostępnego zatwierdzenia. Nie można używać razem z startingVersion. Ignorowane, jeśli punkt kontrolny przesyłania strumieniowego już istnieje.Prawidłowe wartości: ciąg znacznika czasu, taki jak lub ciąg daty, taki jak "2019-01-01T00:00:00.000Z""2019-01-01". |
startingVersion |
Najnowsza dostępna | Wersja tabeli delty do rozpoczęcia odczytywania. Strumień odczytuje wszystkie zmiany zatwierdzone w określonej wersji lub po tej wersji. Określ "latest" , aby rozpocząć od tylko najnowszych zmian. Nie można używać razem z startingTimestamp. Ignorowane, jeśli punkt kontrolny przesyłania strumieniowego już istnieje. Zobacz Praca z historią tabel. |
withEventTimeOrder |
false |
Dzieli początkową migawkę tabeli na przedziały czasu zdarzenia, aby zapobiec nieprawidłowemu oznaczeniu rekordów jako opóźnione zdarzenia i porzuceniu w zapytaniach stanowych z znakami wodnymi. Nie można zmienić po rozpoczęciu początkowego przetwarzania migawki bez usuwania punktu kontrolnego. Dostępne w środowisku Databricks Runtime 11.3 LTS i nowszym. Patrz Przetwarzanie początkowego zrzutu bez usuwania danych. |
Kafka
Użyj tych opcji z opcją spark.readStream.format("kafka") lub spark.read.format("kafka"):
| Key | Domyślny | Description |
|---|---|---|
assign |
None | Określone partycje do korzystania. Musisz określić dokładnie jedną z subscribeopcji , subscribePatternlub assign . Prawidłowe wartości: ciąg JSON, taki jak {"topicA":[0,1],"topicB":[2,4]}. |
failOnDataLoss |
true |
Czy zapytanie nie powiodło się, jeśli dane mogły zostać utracone, na przykład z powodu usuniętych tematów lub obcinania przesunięcia. Ustaw wartość , aby false pominąć brakujące dane i kontynuować. Prawidłowe wartości: true, false.Usługa Databricks szacuje konserwatywnie, czy dane mogły zostać utracone. Może to jednak spowodować fałszywe alarmy. |
fetchoffset.numretries |
3 |
Liczba ponownych prób podczas pobierania przesunięć platformy Kafka kończy się niepowodzeniem. Prawidłowe wartości: liczby całkowite inne niż ujemne. |
fetchoffset.retryintervalms |
1000 |
Interwał w milisekundach między ponawianiami pobierania przesunięcia. Prawidłowe wartości: liczby całkowite inne niż ujemne. |
groupIdPrefix |
spark-kafka-source (przesyłanie strumieniowe), spark-kafka-relation (partia) |
Dostosowany prefiks do użycia dla automatycznie generowanego identyfikatora grupy odbiorców platformy Kafka. Jeśli kafka.group.id jest jawnie ustawiona, łącznik ignoruje tę opcję. Prawidłowe wartości: dowolny ciąg. |
includeHeaders |
false |
Określa, czy nagłówki komunikatów platformy Kafka mają być dołączane jako kolumna w danych wyjściowych. Prawidłowe wartości: true, false. |
kafkaconsumer.polltimeoutms |
None | Limit czasu w milisekundach dla wywołania użytkownika poll() platformy Kafka. Prawidłowe wartości: dodatnie liczby całkowite. |
kafka.bootstrap.servers |
None | Rozdzielona przecinkami lista adresów host:portów dla brokerów platformy Kafka. Ustawia właściwość klienta platformy bootstrap.servers Kafka.Jeśli okaże się, że nie ma danych z platformy Kafka, sprawdź tę listę adresów brokera pod kątem nieprawidłowych adresów. Jeśli lista adresów brokera jest niepoprawna, może nie występować żadne błędy. Klienci platformy Kafka zakładają, że brokerzy będą w końcu dostępne i ponawiają próbę na zawsze, gdy wystąpią błędy sieci. |
maxRecordsPerPartition |
None | Maksymalna liczba rekordów dla każdej partycji platformy Spark. Po ustawieniu łącznik dzieli partycje platformy Kafka tak, aby każda partycja platformy Spark odczytywała co najwyżej te wiele rekordów. Prawidłowe wartości: dodatnie liczby całkowite. Możesz również użyć tej opcji z opcją minPartitions. Po ustawieniu obu opcji platforma Spark używa dowolnej opcji w wyniku większej liczby partycji. |
minPartitions |
None | Minimalna liczba partycji platformy Spark do odczytu z platformy Kafka. Po ustawieniu łącznik dzieli duże partycje platformy Kafka w celu zwiększenia równoległości. Jeśli nie jest ustawiona, platforma Spark tworzy jedną partycję dla każdej partycji tematu platformy Kafka. Przydatne do obsługi niesymetryczności danych lub szczytowych obciążeń. Prawidłowe wartości: dodatnie liczby całkowite. Ta opcja ponownie inicjuje odbiorców platformy Kafka dla każdego wyzwalacza, co może mieć wpływ na wydajność przy użyciu protokołu SSL. |
startingOffsets |
latest (przesyłanie strumieniowe), earliest (partia) |
Przesunięcie, od którego rozpoczyna się odczyt zapytania. Prawidłowe wartości: earliest, latestlub ciąg JSON przesunięcia dla każdej partycji, na przykład {"topicA":{"0":23,"1":-2}}. W ciągu -1 JSON jest najnowszym przesunięciem.
-2 jest najwcześniejszym przesunięciem.W przypadku zapytań przesyłanych strumieniowo ta opcja ma zastosowanie tylko wtedy, gdy zostanie uruchomione nowe zapytanie. Wznawiane zapytania zawsze używają punktu kontrolnego. Podczas zapytania nowe partycje zaczynają odczytywać najwcześniejsze przesunięcie. W przypadku zapytań wsadowych latest jest niedozwolone. |
startingOffsetsByTimestamp |
None | Lista przesunięć początkowych dla każdej partycji określona jako znaczniki czasu w milisekundach. Jeśli nie istnieje przesunięcie dla znacznika czasu, zachowanie zapytania jest określane przez startingOffsetsByTimestampStrategy. Prawidłowe wartości: ciąg JSON sygnatur czasowych dla każdej partycji, taki jak {"topicA":{"0":1000,"1":2000}}.W przypadku zapytań przesyłanych strumieniowo ta opcja ma zastosowanie tylko wtedy, gdy zostanie uruchomione nowe zapytanie. Wznawiane zapytania zawsze używają punktu kontrolnego. Podczas zapytania nowe partycje zaczynają odczytywać najwcześniejsze przesunięcie. |
startingOffsetsByTimestampStrategy |
error |
Strategia do użycia, gdy nie znaleziono przesunięcia dla znacznika czasu określonego w startingOffsetsByTimestamp lub startingTimestamp. Prawidłowe wartości: error (zgłasza wyjątek) latest (używa najnowszego dostępnego przesunięcia). |
startingTimestamp |
None | Globalny znacznik czasu rozpoczęcia w milisekundach, który ma zastosowanie do wszystkich partycji. Jeśli nie istnieje przesunięcie znacznika czasu, zachowanie jest kontrolowane przez startingOffsetsByTimestampStrategyelement . Prawidłowe wartości: liczby całkowite inne niż ujemne. |
subscribe |
None | Tematy do subskrybowania. Musisz określić dokładnie jedną z subscribeopcji , subscribePatternlub assign . Prawidłowe wartości: rozdzielona przecinkami lista nazw tematów. |
subscribePattern |
None | Wzorzec używany do subskrybowania tematów. Musisz określić dokładnie jedną z subscribeopcji , subscribePatternlub assign . Na przykład topic.*. Prawidłowe wartości: dowolny ciąg wyrażeń regularnych Java. |
Następujące opcje dotyczą tylko odczytów przesyłanych strumieniowo za pomocą polecenia spark.readStream.format("kafka"):
| Key | Domyślny | Description |
|---|---|---|
bytesEstimateWindowLength |
300s |
Przedział czasu służący do szacowania pozostałych bajtów dla estimatedTotalBytesBehindLatest metryki. Prawidłowe wartości: ciągi czasu trwania, takie jak 10m lub 600s. Zobacz Pobieranie metryk platformy Kafka. |
maxOffsetsPerTrigger |
None | Maksymalna liczba przesunięć przetwarzania na interwał wyzwalacza. Przesunięcia są dystrybuowane proporcjonalnie między partycjami tematu. Prawidłowe wartości: dodatnie liczby całkowite. |
maxTriggerDelay |
15m |
Maksymalny czas oczekiwania na minOffsetsPerTrigger gromadzenie się przed wyzwoleniem. Prawidłowe wartości: ciągi czasu trwania, takie jak 10m lub 600s. |
minOffsetsPerTrigger |
None | Minimalna liczba przesunięć do gromadzenia przed wyzwoleniem mikrosadowej partii. Gdy maxTriggerDelay zostanie osiągnięty, mikrosadowa jest uruchamiana niezależnie od tego. Prawidłowe wartości: dodatnie liczby całkowite. |
Aby uzyskać opcje przesunięcia, które mają zastosowanie tylko do operacji odczytu wsadowego za pomocą spark.read.format("kafka")polecenia , zobacz Opcje elementu DataFrameReader platformy Kafka.
Aby uzyskać informacje o klientach platformy Kafka (kafka.*) i opcjach uwierzytelniania, zobacz Opcje.
Opcje elementu DataFrameWriter
Użyj tych opcji z DataFrameWriter.option() i DataFrameWriterV2.option(), aby kontrolować, jak Azure Databricks zapisuje dane.
Example
W poniższym przykładzie ustawiono wartość mergeSchema na True potrzeby pisania tabeli usługi Delta Lake:
Python
df.write.format("delta").option("mergeSchema", True).saveAsTable("my_table")
Scala
df.write.format("delta").option("mergeSchema", "true").saveAsTable("my_table")
Avro
| Key | Domyślny | Description |
|---|---|---|
avroSchema |
None | Pełny schemat Avro jako ciąg JSON. Użyj tej opcji, aby przekonwertować typy Spark SQL na określone typy Avro. Dotyczy pliku Avro. |
avroSchemaUrl |
None | Adres URL wskazujący plik schematu Avro. Użyj zamiast avroSchema , gdy schemat jest przechowywany zewnętrznie. Wzajemnie wykluczające się z avroSchema. Dotyczy pliku Avro. |
compression |
snappy |
Koder koder-dekoder kompresji do użycia podczas pisania. Prawidłowe wartości: uncompressed, , deflatesnappy, bzip2, xz, zstandard. Dotyczy pliku Avro. |
recordName |
topLevelRecord |
Nazwa rekordu najwyższego poziomu w wyjściowym schemacie Avro. Dotyczy pliku Avro. |
positionalFieldMatching |
false |
Określa, czy dopasować kolumny między schematem platformy Spark a schematem Avro według pozycji pola zamiast według nazwy. Dotyczy pliku Avro. |
recordNamespace |
Pusty ciąg | Przestrzeń nazw rekordu najwyższego poziomu w wyjściowym schemacie Avro. Dotyczy pliku Avro. |
Delta Lake i Apache Iceberg
| Key | Domyślny | Description |
|---|---|---|
clusterByAuto |
false |
Czy włączyć automatyczne klastrowanie liquid, gdzie Azure Databricks wybiera kolumny klastrowania na podstawie wzorców zapytań. Tylko prawidłowe z mode("overwrite"). Nie można używać z trybem append . Dostępne w środowisku Databricks Runtime 16.4 lub nowszym. Dotyczy metody Use liquid clustering for tables (Używanie klastrowania płynnego dla tabel). |
mergeSchema |
None | Czy włączyć ewolucję schematu dla operacji zapisu. Nowe kolumny w źródłowej ramce danych są dodawane do schematu tabeli docelowej. Dotyczy dołączania wsadowego i przesyłania strumieniowego. Dotyczy schematu tabeli aktualizacji. |
overwriteSchema |
None | Czy zastąpić schemat tabeli i partycjonowanie podczas zastępowania. Wymaga mode("overwrite") bez replaceWhere. Nie można używać z partitionOverwriteMode. Dotyczy schematu tabeli aktualizacji. |
partitionOverwriteMode |
None | Tryb zastępowania partycji. Ustaw tę opcję na dynamic wartość , aby zastąpić tylko partycje zawierające nowe dane, pozostawiając wszystkie inne partycje bez zmian. Starszy tryb, nieobsługiwany w przypadku bezserwerowych zasobów obliczeniowych lub usługi Databricks SQL. Prawidłowe wartości: static, dynamic. Dotyczy selektywnego zastępowania danych za pomocą usługi Delta Lake. |
replaceOn |
None | Wyrażenie logiczne, które pasuje do wierszy w tabeli docelowej w celu zastąpienia wierszami z zapytania źródłowego. Może odwoływać się do kolumn zarówno z tabeli docelowej, jak i zapytania źródłowego. Wiersze w obiekcie docelowym, które pasują do wiersza źródłowego, są usuwane i zastępowane. Jeśli źródło jest puste, nie występują żadne usunięcia. Służy targetAlias do uściślania odwołań do kolumn. Dostępne w środowisku Databricks Runtime 17.1 lub nowszym. Dotyczy selektywnego zastępowania danych za pomocą usługi Delta Lake. |
replaceUsing |
None | Rozdzielona przecinkami lista nazw kolumn używanych do dopasowywania wierszy między tabelą docelową a zapytaniem źródłowym. Zarówno element docelowy, jak i źródło muszą zawierać wszystkie wymienione kolumny. Wiersze w obiekcie docelowym, które pasują do wiersza źródłowego w ramach porównania równości, są usuwane i zastępowane.
NULL wartości są traktowane jako nie równe i nie będą zgodne. Dostępne w środowisku Databricks Runtime 16.3 lub nowszym. Dotyczy selektywnego zastępowania danych za pomocą usługi Delta Lake. |
replaceWhere |
None | Wyrażenie predykatu. Niepodzielne zastępuje tylko rekordy zgodne z predykatem. Dotyczy selektywnego zastępowania danych za pomocą usługi Delta Lake. |
targetAlias |
None | Alias ciągu dla tabeli docelowej. Użyj polecenia z replaceOn lub replaceWhere , aby uściślić odwołania do kolumn, gdy warunek odwołuje się do kolumn zarówno z tabeli docelowej, jak i zapytania źródłowego. Dotyczy selektywnego zastępowania danych za pomocą usługi Delta Lake. |
txnAppId |
None | Unikatowy ciąg identyfikujący aplikację dla idempotentnych zapisów w foreachBatch operacjach. Użyj polecenia z , txnVersion aby zapewnić dokładnie jednokrotne zapisy w wielu tabelach usługi Delta Lake. Dotyczy polecenia Use for idempotent table writes (Użyj foreachBatch dla operacji zapisu w tabeli idempotentnych). |
txnVersion |
None | Monotonicznie rosnąca liczba używana jako wersja transakcji dla idempotentnych zapisów w foreachBatch operacjach. Użyj polecenia z , txnAppId aby zapewnić dokładnie jednokrotne zapisy w wielu tabelach usługi Delta Lake. Dotyczy polecenia Use for idempotent table writes (Użyj foreachBatch dla operacji zapisu w tabeli idempotentnych). |
optimizeWrite |
None | Czy włączyć automatyczne optymalizowanie zapisu dla tej operacji zapisu. Zastępuje konfigurację spark.databricks.delta.optimizeWrite.enabled . Dotyczy Jak jest usługa Delta Lake w Azure Databricks?. |
userMetadata |
None | Ciąg zdefiniowany przez użytkownika dołączony do metadanych zatwierdzenia dla operacji zapisu. Widoczne w danych wyjściowych elementu DESCRIBE HISTORY. Dotyczy wzbogacenia tabel z niestandardowymi metadanymi. |
CSV
| Key | Domyślny | Description |
|---|---|---|
charToEscapeQuoteEscaping |
\0 (nie włączono) |
Znak używany do ucieczki znaku ucieczki, gdy różni się od znaku cudzysłowu. Dotyczy pliku csv (DataFrameWriter). |
compression |
none |
Koder koder-dekoder kompresji do użycia podczas pisania. Prawidłowe wartości: none, , bzip2gzip, lz4snappy, , deflate, zstd. Dotyczy pliku csv (DataFrameWriter). |
dateFormat |
yyyy-MM-dd |
Formatuj ciąg dla wartości kolumn daty. Dotyczy pliku csv (DataFrameWriter). |
emptyValue |
Pusty ciąg | Ciąg zapisany dla pustych wartości (bez wartości null). Dotyczy pliku csv (DataFrameWriter). |
encoding |
UTF-8 |
Kodowanie znaków dla plików wyjściowych. Dotyczy pliku csv (DataFrameWriter). |
escape |
\ |
Znak używany do ucieczki wartości cytowanych. Dotyczy pliku csv (DataFrameWriter). |
escapeQuotes |
true |
Czy należy unikać znaków cudzysłowu wewnątrz wartości pól cytowanych. Dotyczy pliku csv (DataFrameWriter). |
header |
false |
Określa, czy należy pisać nazwy kolumn jako pierwszy wiersz danych wyjściowych. Dotyczy pliku csv (DataFrameWriter). |
ignoreLeadingWhiteSpace |
false |
Czy przycinać wiodące białe znaki z wartości podczas pisania. Dotyczy pliku csv (DataFrameWriter). |
ignoreTrailingWhiteSpace |
false |
Czy przycinać końcowe odstępy od wartości podczas pisania. Dotyczy pliku csv (DataFrameWriter). |
lineSep |
\n |
Ciąg separatora wiersza używany między rekordami. Dotyczy pliku csv (DataFrameWriter). |
locale |
en-US |
Identyfikator java.util.Locale. Wpływa na formatowanie wartości daty i znacznika czasu podczas pisania. |
nullValue |
Pusty ciąg | Ciąg napisany dla wartości null. Dotyczy pliku csv (DataFrameWriter). |
quote |
" |
Znak używany do cudzysłowu wartości pól, które zawierają separator. Dotyczy pliku csv (DataFrameWriter). |
quoteAll |
false |
Czy ująć wszystkie wartości pól w cudzysłowie niezależnie od zawartości. Dotyczy pliku csv (DataFrameWriter). |
sep |
, |
Znak ogranicznika pola. Dotyczy pliku csv (DataFrameWriter). |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
Ciąg formatu dla wartości kolumn sygnatury czasowej. Dotyczy pliku csv (DataFrameWriter). |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
Formatuj ciąg dla znacznika czasu bez wartości kolumn strefy czasowej (TimestampNTZType). |
Excel
| Key | Domyślny | Description |
|---|---|---|
dataAddress |
None | Nazwa arkusza lub komórka początkowa dla zapisu. Jeśli pominięto, zapisuje w arkuszu o nazwie Sheet1 rozpoczynającej się od komórki A1. Akceptuje nazwę arkusza ("SheetName") lub pojedyncze odwołanie do komórki ("SheetName!A1"). Zakresy komórek nie są obsługiwane w przypadku zapisów. |
dateFormatInWrite |
yyyy-mm-dd |
Excel ciąg formatu komórki zastosowany do kolumn Date. Używa składni formatu Excel. |
headerRows |
0 |
Określa, czy należy pisać nazwy kolumn jako pierwszy wiersz. Prawidłowe wartości: 0, 1. |
timestampNTZFormat |
yyyy-mm-dd hh:mm:ss |
Excel ciąg formatu komórki zastosowany do kolumn TimestampNTZ i Timestamp. Używa składni formatu Excel. |
version |
xlsx |
Wersja formatu pliku Excel do zapisu. Prawidłowe wartości: xlsx, xls. |
JSON
| Key | Domyślny | Description |
|---|---|---|
compression |
none |
Koder koder-dekoder kompresji do użycia podczas pisania. Prawidłowe wartości: none, , bzip2gzip, lz4snappy, , deflate, zstd. Dotyczy formatu json (DataFrameWriter). |
dateFormat |
yyyy-MM-dd |
Formatuj ciąg dla wartości kolumn daty. Dotyczy formatu json (DataFrameWriter). |
encoding |
UTF-8 |
Kodowanie znaków dla plików wyjściowych. Dotyczy formatu json (DataFrameWriter). |
ignoreNullFields |
wartość spark.sql.jsonGenerator.ignoreNullFields |
Czy pominąć pola z wartościami null z danych wyjściowych JSON. Dotyczy formatu json (DataFrameWriter). |
lineSep |
\n |
Ciąg separatora wiersza używany między rekordami. Dotyczy formatu json (DataFrameWriter). |
locale |
en-US |
Identyfikator java.util.Locale. Wpływa na formatowanie wartości daty i znacznika czasu podczas pisania. |
pretty |
false |
Czy włączyć dość (wcięcie, wielowierszowe) dane wyjściowe JSON. |
sortKeys |
false |
Czy posortować klucze obiektów JSON alfabetycznie w danych wyjściowych. Przydatne do tworzenia danych wyjściowych deterministycznych. |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
Ciąg formatu dla wartości kolumn sygnatury czasowej. Dotyczy formatu json (DataFrameWriter). |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
Formatuj ciąg dla znacznika czasu bez wartości kolumn strefy czasowej (TimestampNTZType). |
writeNonAsciiCharacterAsCodePoint |
false |
Czy kodować znaki inne niż ASCII jako \uXXXX sekwencje ucieczki Unicode zamiast literału UTF-8 znaków w danych wyjściowych. |
ORC
| Key | Domyślny | Description |
|---|---|---|
compression |
zstd |
Koder koder-dekoder kompresji do użycia podczas pisania. Prawidłowe wartości: none, , uncompressedsnappy, zliblzozstd, lz4, . brotli Dotyczy orc (DataFrameWriter). |
Parkiet
| Key | Domyślny | Description |
|---|---|---|
compression |
snappy |
Koder koder-dekoder kompresji do użycia podczas pisania. Prawidłowe wartości: none, uncompressedbrotlisnappygziplzo, lz4, lz4_raw, . zstd Dotyczy parquet (DataFrameWriter). |
spark.sql.parquet.outputTimestampType |
INT96 |
Typ fizyczny używany do kodowania kolumn sygnatury czasowej. Prawidłowe wartości: INT96, , TIMESTAMP_MICROSTIMESTAMP_MILLIS. Służy INT96 do zapewniania zgodności ze starszymi czytnikami Parquet, które nie obsługują standardowych typów sygnatur czasowych. |
Tekst
| Key | Domyślny | Description |
|---|---|---|
compression |
none |
Koder koder-dekoder kompresji do użycia podczas pisania. Prawidłowe wartości: none, , bzip2gzip, lz4snappy, , deflate, zstd. Dotyczy tekstu (DataFrameWriter). |
encoding |
UTF-8 |
Kodowanie znaków dla plików wyjściowych. |
lineSep |
\n |
Ciąg separatora wiersza używany między rekordami. Dotyczy tekstu (DataFrameWriter). |
XML
| Key | Domyślny | Description |
|---|---|---|
arrayElementName |
item |
Nazwa elementu dla elementów tablicy, które nie mają jawnej nazwy. Dotyczy xml (DataFrameWriter). |
attributePrefix |
_ |
Prefiks poprzedzony nazwami pól, które odpowiadają atrybutom XML. Dotyczy xml (DataFrameWriter). |
compression |
none |
Koder koder-dekoder kompresji do użycia podczas pisania. Prawidłowe wartości: none, , bzip2gzip, lz4snappy, , deflate, zstd. Dotyczy xml (DataFrameWriter). |
dateFormat |
yyyy-MM-dd |
Formatuj ciąg dla wartości kolumn daty. Dotyczy xml (DataFrameWriter). |
declaration |
version="1.0" encoding="UTF-8" standalone="yes" |
Ciąg deklaracji XML zapisany w górnej części każdego pliku wyjściowego. Ustaw wartość na pusty ciąg, aby pominąć deklarację. Dotyczy xml (DataFrameWriter). |
encoding |
UTF-8 |
Kodowanie znaków dla plików wyjściowych. Dotyczy xml (DataFrameWriter). |
indent |
4 spacje | Ciąg używany do wcięcia elementów podrzędnych w danych wyjściowych. Ustaw na pusty ciąg, aby wyłączyć wcięcie i zapisać każdy wiersz w jednym wierszu. |
locale |
en-US |
Identyfikator java.util.Locale. Wpływa na formatowanie wartości daty i znacznika czasu podczas pisania. |
nullValue |
null |
Ciąg napisany dla wartości null. Po ustawieniu nullwartości na atrybuty i elementy podrzędne dla pól o wartości null zostaną pominięte. Dotyczy xml (DataFrameWriter). |
rootTag |
ROWS |
Tag elementu głównego, który opakowuje wszystkie elementy wiersza w danych wyjściowych. Dotyczy xml (DataFrameWriter). |
rowTag |
ROW |
Tag elementu reprezentujący wiersz w danych wyjściowych. Dotyczy xml (DataFrameWriter). |
singleVariantColumn |
None | Nazwa pojedynczej kolumny Wariant do zapisu w plikach XML. Dotyczy xml (DataFrameWriter). |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
Ciąg formatu dla wartości kolumn sygnatury czasowej. Dotyczy xml (DataFrameWriter). |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
Formatuj ciąg dla znacznika czasu bez wartości kolumn strefy czasowej. Dotyczy xml (DataFrameWriter). |
validateName |
true |
Czy zgłosić wyjątek, jeśli nazwa kolumny nie jest prawidłowym identyfikatorem elementu XML. Dotyczy xml (DataFrameWriter). |
valueTag |
_VALUE |
Nazwa pola używana dla danych znaków w elementach XML, które mają również atrybuty lub elementy podrzędne. Dotyczy xml (DataFrameWriter). |
Opcje DataStreamWriter
Te opcje umożliwiają DataStreamWriter.option() skonfigurowanie zapisów przesyłanych strumieniowo.
Example
W poniższym przykładzie ustawiono lokalizację punktu kontrolnego dla strumienia:
Python
(df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.start("/path/to/table"))
Scala
df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.start("/path/to/table")
Wspólne
| Key | Domyślny | Description |
|---|---|---|
checkpointLocation |
Brak (wymagane) | Ścieżka do katalogu punktu kontrolnego dla zapytania przesyłania strumieniowego. Wymagana do zapewnienia odporności na uszkodzenia i dokładnie jednokrotnych gwarancji przetwarzania. Każde zapytanie przesyłane strumieniowo musi używać unikatowej lokalizacji punktu kontrolnego. Usługa Databricks zaleca przechowywanie punktów kontrolnych w woluminie wykazu aparatu Unity lub ścieżce magazynu w chmurze. Zobacz Ustrukturyzowane punkty kontrolne przesyłania strumieniowego. |
path |
None | Ścieżka wyjściowa dla ujściów przesyłania strumieniowego opartego na plikach, takich jak Parquet. Dotyczy tylko formatów opartych na plikach. |
Ujście konsoli
| Key | Domyślny | Description |
|---|---|---|
numRows |
20 |
Liczba wierszy do wyświetlenia dla każdej mikrosadowej podczas zapisywania w ujściu konsoli. |
truncate |
true |
Czy obcinać długie ciągi podczas wyświetlania wierszy. Ustaw wartość na , aby false wyświetlić pełne wartości ciągu. |
Delta Lake
Poniższe opcje mają zastosowanie podczas zapisywania strumienia do tabeli usługi Delta Lake przy użyciu polecenia format("delta"). Opcje tylko zastępowania, takie jak overwriteSchema, replaceWherei partitionOverwriteMode nie są obsługiwane w przypadku zapisów przesyłanych strumieniowo.
| Key | Domyślny | Description |
|---|---|---|
mergeSchema |
false |
Czy rozwijać schemat tabeli usługi Delta Lake, gdy ramka danych przesyłania strumieniowego zawiera nowe kolumny. Dotyczy tylko trybu dołączania danych wyjściowych. Dotyczy schematu tabeli aktualizacji. |
userMetadata |
None | Ciąg zdefiniowany przez użytkownika dołączony do metadanych zatwierdzenia dla operacji zapisu. Widoczne w danych wyjściowych elementu DESCRIBE HISTORY. Dotyczy wzbogacenia tabel z niestandardowymi metadanymi. |
Ujście pliku
Poniższa opcja ma zastosowanie podczas zapisywania strumienia w formatach opartych na plikach (Parquet, JSON, CSV, ORC, text). Aby uzyskać informacje o opcjach specyficznych dla formatu, zobacz Opcje elementu DataFrameWriter.
| Key | Domyślny | Description |
|---|---|---|
retention |
None | Jak długo zachować pliki metadanych ujścia używane do odporności na uszkodzenia i kompaktowania. Akceptuje ciąg czasu, taki jak 7 days lub 24 hours. Jeśli nie jest ustawiona, pliki metadanych są zachowywane przez czas nieokreślony. |
Ujście platformy Kafka
Aby uzyskać pełną listę opcji pisania strumieni na platformie Kafka, zobacz Opcje.
| Key | Domyślny | Description |
|---|---|---|
kafka.bootstrap.servers |
None | Required. Rozdzielona przecinkami lista adresów brokera platformy host:port Kafka. |
topic |
None | Docelowy temat platformy Kafka dla wszystkich wierszy. Wymagane, jeśli ramka danych nie zawiera kolumny topic . |
kafka.* |
None | Dowolna konfiguracja producenta platformy Kafka poprzedzona prefiksem kafka.. Na przykład kafka.compression.type. |
Ujście pamięci
| Key | Domyślny | Description |
|---|---|---|
queryName |
Brak (wymagane) | Nazwa tabeli w pamięci, do którego zapisuje zapytanie. Wymagane do ujścia pamięci. Można również konfigurować za pomocą polecenia .queryName(). |
mode |
exactlyonce |
Gwarancja dostarczenia dla ujścia pamięci.
exactlyonce używa trybu mikrosadowego z semantykami dokładnie jednokrotnymi.
atleastonce używa trybu ciągłego z semantykami co najmniej raz. Prawidłowe wartości: exactlyonce, atleastonce. |
Opcje funkcji platformy Spark
Niektóre wbudowane funkcje platformy Spark SQL akceptują options mapę, która steruje zachowaniem analizowania lub serializacji. Przekaż opcje jako Python dict lub Scala Map[String, String].
Example
Poniższy przykład analizuje kolumnę JSON podczas upuszczania nieprawidłowo sformułowanych rekordów:
Python
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([StructField("name", StringType())])
df = df.withColumn("parsed", from_json("json_col", schema, {"mode": "DROPMALFORMED"}))
Scala
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._
val schema = StructType(Seq(StructField("name", StringType)))
val df = df.withColumn("parsed", from_json(col("json_col"), schema, Map("mode" -> "DROPMALFORMED")))
Avro
Funkcje Avro akceptują te same opcje co odpowiednie opcje ramki danych:
-
from_avroischema_of_avroużyj opcji DataFrameReader Avro. -
to_avroużywa opcji DataFrameWriter Avro.
Example
Poniższy przykład dekoduje kolumnę Avro z włączoną ewolucją schematu:
Python
from pyspark.sql.functions import from_avro
df = df.withColumn("decoded", from_avro("avro_col", json_schema, {"avroSchemaEvolutionMode": "restart"}))
Scala
import org.apache.spark.sql.avro.functions.from_avro
val df = df.withColumn("decoded", from_avro(col("avro_col"), jsonSchema, Map("avroSchemaEvolutionMode" -> "restart")))
Ponadto warianty rejestru schematów from_avro i to_avro akceptują następujące opcje:
| Key | Domyślny | Description |
|---|---|---|
schemaId |
None | Identyfikator schematu z rejestru schematów Confluent do użycia podczas dekodowania danych Avro zakodowanych przy użyciu schematu niezgodnego z jsonFormatSchema.
from_avro Dotyczy tylko. |
confluent.schema.registry.* |
None | Właściwości konfiguracji klienta rejestru schematów confluent. Przekaż dowolną właściwość klienta sr platformy Confluent przy użyciu tego prefiksu, na przykład confluent.schema.registry.basic.auth.user.info w przypadku poświadczeń uwierzytelniania podstawowego. Wymagane dla wariantów rejestru schematów i from_avroto_avro. |
CSV
Funkcje CSV akceptują te same opcje co odpowiednie opcje ramki danych:
-
from_csvischema_of_csvużyj opcji CSV Elementu DataFrameReader. -
to_csvużywa opcji CSV elementu DataFrameWriter.
Example
Poniższy przykład odczytuje plik CSV z separatorem niestandardowym i NULL wartością:
Python
from pyspark.sql.functions import from_csv
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([StructField("id", IntegerType()), StructField("name", StringType())])
df = df.withColumn("parsed", from_csv("csv_col", schema, {"sep": "|", "nullValue": "N/A"}))
Scala
import org.apache.spark.sql.functions.from_csv
import org.apache.spark.sql.types._
val schema = StructType(Seq(StructField("id", IntegerType), StructField("name", StringType)))
val df = df.withColumn("parsed", from_csv(col("csv_col"), schema, Map("sep" -> "|", "nullValue" -> "N/A")))
JSON
Funkcje JSON akceptują te same opcje co odpowiednie opcje ramki danych:
-
from_jsonischema_of_jsonużyj opcji DataFrameReader JSON. -
to_jsonużywa opcji JSON elementu DataFrameWriter.
Example
W poniższym przykładzie jest zapisywany kod JSON z polami NULL ignorowanymi i włączonym dość formatowaniem:
Python
from pyspark.sql.functions import to_json
df = df.withColumn("json_str", to_json("struct_col", {"pretty": "true", "ignoreNullFields": "true"}))
Scala
import org.apache.spark.sql.functions.to_json
val df = df.withColumn("json_str", to_json(col("struct_col"), Map("pretty" -> "true", "ignoreNullFields" -> "true")))
Protobuf
from_protobuf nie to_protobuf należy używać źródła danych opartego na plikach. Dane Protobuf są zawsze odczytywane i zapisywane jako kolumny binarne przy użyciu tych funkcji. Opcje są przekazywane jako i Map[String, String] są uwzględniane wielkość liter.
Example
Poniższy przykład dekoduje kolumnę Protobuf przy użyciu trybu PERMISSIVE:
Python
from pyspark.sql.functions import from_protobuf
df = df.withColumn("decoded", from_protobuf("proto_col", "MyMessage", "/path/to/descriptor.desc",
{"mode": "PERMISSIVE", "enums.as.ints": "true"}))
Scala
import org.apache.spark.sql.protobuf.functions.from_protobuf
val df = df.withColumn("decoded", from_protobuf(col("proto_col"), "MyMessage", "/path/to/descriptor.desc",
Map("mode" -> "PERMISSIVE", "enums.as.ints" -> "true")))
Funkcje Protobuf korzystają z następujących opcji:
| Key | Domyślny | Description |
|---|---|---|
mode |
FAILFAST |
Jak obsługiwać uszkodzone rekordy. Funkcja FAILFAST zgłasza wyjątek.
PERMISSIVE Ustawia źle sformułowane pola na null. Prawidłowe wartości: FAILFAST, PERMISSIVE. Dotyczy .from_protobuf |
recursive.fields.max.depth |
-1 (wyłączone) |
Maksymalna głębokość rekursji dla cyklicznych pól Protobuf. Ustaw wartość , aby 0 wyłączyć obsługę pól cyklicznych. Prawidłowe wartości: 0 do 10. Dotyczy .from_protobuf |
convert.any.fields.to.json |
false |
Czy przekonwertować pola Protobuf Any na ciąg JSON zamiast STRUCT. Dotyczy .from_protobuf |
emit.default.values |
false |
Czy emitować pola z wartościami zerowymi lub domyślnymi (semantyka proto3). Gdy falsepola z wartościami domyślnymi zostaną pominięte z danych wyjściowych. Dotyczy .from_protobuf |
enums.as.ints |
false |
Czy renderować pola wyliczenia jako wartości całkowite zamiast ciągów. Dotyczy .from_protobuf |
upcast.unsigned.ints |
false |
Czy przesłać uint32 do Long i uint64 , aby Decimal(20,0) zapobiec przepełnieniu liczby całkowitej. Dotyczy .from_protobuf |
unwrap.primitive.wrapper.types |
false |
Określa, Int32Value czy należy odpakowywać google.protobuf typy otoki (na przykład i StringValue) do odpowiadających im typów pierwotnych platformy Spark. Dotyczy .from_protobuf |
retain.empty.message.types |
false |
Czy zachować puste typy komunikatów Protobuf w schemacie danych wyjściowych przez wstawienie fikcyjnej kolumny. Dotyczy .from_protobuf |
schema.registry.subject |
None | Nazwa podmiotu rejestru schematów. Wymagane w przypadku używania wariantów rejestru schematów from_protobuf i to_protobuf. |
schema.registry.address |
None | Adres rejestru schematów (host i port). Wymagane w przypadku używania wariantów rejestru schematów from_protobuf i to_protobuf. |
schema.registry.protobuf.name |
None | Określa, który komunikat Protobuf ma być używany, gdy temat rejestru schematów zawiera wiele komunikatów. Optional. |
XML
Funkcje XML akceptują te same opcje co odpowiednie opcje ramki danych:
-
from_xmlischema_of_xmlużyj opcji XML elementu DataFrameReader. -
to_xmlużywa opcji XML elementu DataFrameWriter.
Example
Poniższy przykład zapisuje kod XML z niestandardowymi tagami katalogu głównego i wiersza:
Python
from pyspark.sql.functions import to_xml
df = df.withColumn("xml_str", to_xml("struct_col", {"rootTag": "records", "rowTag": "record"}))
Scala
import org.apache.spark.sql.functions.to_xml
val df = df.withColumn("xml_str", to_xml(col("struct_col"), Map("rootTag" -> "records", "rowTag" -> "record")))