이 페이지에는 데이터를 읽고 쓰는 Spark API에 사용할 수 있는 입력 및 출력 옵션이 나열되어 있습니다.
DataFrameReader 옵션
이러한 옵션은 DataFrameReader.option(), DataFrameReader.options(), read_files, COPY INTO 및 Auto Loader 사용하여 Azure Databricks 데이터 파일을 읽는 방법을 제어합니다.
Example
다음 예제에서는 JSON 파일을 읽도록 True 설정합니다multiLine.
Python
df = spark.read.format("json").option("multiLine", True).load("/path/to/data")
스칼라
val df = spark.read.format("json").option("multiLine", "true").load("/path/to/data")
SQL
SELECT * FROM read_files("/path/to/data", format => "json", multiLine => true)
일반적인
다음 옵션은 모든 파일 형식에 적용됩니다.
| Key | 기본값 | Description |
|---|---|---|
ignoreCorruptFiles |
false |
손상된 파일을 무시할지 여부를 나타냅니다. true인 경우 손상된 파일이 발견될 때 Spark 작업이 계속 실행되고 읽은 내용이 계속 반환됩니다. 따라서 COPY INTODelta Lake 기록의 열에서 numSkippedCorruptFiles 와 같이 operationMetrics 건너뛴 손상된 파일을 관찰할 수 있습니다. Databricks Runtime 11.3 LTS 이상에서 지원됩니다. |
ignoreMissingFiles |
false자동 로더의 경우 COPY INTO ( true 레거시) |
누락된 파일을 무시할지 여부를 나타냅니다. true이면 누락된 파일이 발견되면 Spark 작업이 계속 실행되고 콘텐츠가 계속 반환됩니다. Databricks Runtime 11.3 LTS 이상에서 지원됩니다. |
modifiedAfter |
None | 제공된 타임스탬프 다음에 수정 타임스탬프가 있는 파일만 수집하는 필터로서의 선택적 타임스탬프입니다. |
modifiedBefore |
None | 선택적 타임스탬프는 제공된 타임스탬프보다 앞서 수정된 타임스탬프를 가진 파일만 수집하기 위한 필터로 사용됩니다. |
pathGlobFilter 또는 fileNamePattern |
None | 파일 선택을 위해 사용할 수 있는 glob 패턴입니다.
PATTERN in COPY INTO (레거시)과 동일합니다.
fileNamePattern은 read_files에서 사용할 수 있습니다. |
recursiveFileLookup |
false |
이 true옵션은 이름이 다음과 같은 date=2019-07-01파티션 명명 체계를 따르지 않더라도 중첩된 디렉터리를 검색합니다. |
Avro
| Key | 기본값 | Description |
|---|---|---|
avroSchema |
None | 사용자가 Avro 형식으로 제공하는 선택적 스키마입니다. Avro를 읽을 때 이 옵션은 호환되지만 실제 Avro 스키마와 다른 진화된 스키마로 설정할 수 있습니다. 역직렬화 스키마는 진화된 스키마와 일치합니다. 예를 들어 기본값을 사용하여 하나의 추가 열을 포함하는 진화된 스키마를 설정하는 경우 읽기 결과에도 새 열이 포함됩니다. |
avroSchemaEvolutionMode |
none |
스키마 레지스트리를 사용할 때 스키마 진화를 처리하는 방법입니다. 유효한 값: none (스키마 변경 내용을 무시하고 작업을 계속합니다.) restart 스키마 변경 내용이 검색되면 UnknownFieldException 작업을 다시 시작해야 합니다. |
datetimeRebaseMode |
LEGACY |
율리우스력과 프로렙틱 그레고리력 사이에서 날짜 및 타임스탬프 값의 재조정을 제어합니다. 유효한 값은 EXCEPTION, LEGACY 및 CORRECTED입니다. |
enableStableIdentifiersForUnionType |
false |
Avro Union 형식에 대해 안정적인 필드 이름을 사용할지 여부입니다. 사용하도록 설정하면 공용 구조체 형식 필드 이름은 소문자(예: 예member_string: member_int)의 형식 이름에서 파생됩니다. 소문자 후 두 형식 이름이 동일한 경우 예외를 throw합니다. |
mergeSchema |
false |
여러 파일에서 스키마를 유추하고 각 파일의 스키마를 병합할지 여부입니다. Avro에 대한 mergeSchema는 데이터 형식을 완화하지 않습니다. |
mode |
FAILFAST |
손상된 레코드를 처리하기 위한 파서 모드입니다. 유효한 값: FAILFAST (예외를 throw), PERMISSIVE (잘못된 형식의 필드를 null로 설정), DROPMALFORMED (잘못된 레코드를 자동으로 삭제). |
readerCaseSensitive |
true |
rescuedDataColumn이 활성화된 경우 대/소문자 구분 동작을 지정합니다. true이면 이름이 대/소문자를 구분하여 스키마와 다른 데이터 열을 구합니다. false이면 대/소문자를 구분하지 않는 방식으로 데이터를 읽습니다. |
recursiveFieldMaxDepth |
None | 재귀 Avro 필드의 최대 재귀 깊이입니다.
1 모든 재귀 필드를 잘라내고, 2 한 수준의 재귀를 허용하는 등으로 15설정합니다. 설정 0되지 않거나 재귀 필드가 허용되지 않는 경우 유효한 값: 0 to .15 |
rescuedDataColumn |
None | 데이터 형식 불일치 및 스키마 불일치(열 대소문자 포함)로 인해 파싱할 수 없는 모든 데이터를 별도의 열에 수집할지 여부입니다. 이 열은 자동 로더를 사용할 때 기본적으로 포함됩니다.COPY INTO을 사용하여 스키마를 수동으로 설정할 수 없기 때문에 COPY INTO(레거시)에서는 구조 복구된 데이터 열을 지원하지 않습니다. Databricks는 대부분의 수집 시나리오에 자동 로더를 사용하는 것이 좋습니다.자세한 내용은 복구된 데이터 열이란?을 참조하세요. |
stableIdentifierPrefixForUnionType |
member_ |
안정적인 공용 구조체 형식 필드 이름 enableStableIdentifiersForUnionType=true에 사용할 접두사입니다. |
Csv
| Key | 기본값 | Description |
|---|---|---|
badRecordsPath |
None | 잘못된 CSV 레코드에 대한 정보를 기록하기 위한 파일을 저장하는 경로입니다. |
charToEscapeQuoteEscaping |
\0 |
따옴표 문자를 탈출시킬 때 사용하는 탈출 문자입니다. 예를 들어 [ " a\\", b ] 레코드의 경우 다음과 같습니다.
|
columnNameOfCorruptRecord |
_corrupt_record |
자동 로더를 지원합니다.
COPY INTO(레거시)은 지원되지 않습니다.형식이 잘못되어 구문을 분석할 수 없는 레코드를 저장하기 위한 열입니다. 구문 분석에 대한 mode가 DROPMALFORMED로 설정되면 이 열은 비어 있습니다. |
comment |
\0 |
텍스트 줄의 시작 부분에 있는 줄 주석을 나타내는 문자를 정의합니다. 주석 건너뛰기를 사용하지 않도록 설정하려면 '\0'을 사용합니다. |
dateFormat |
yyyy-MM-dd |
날짜 문자열을 해석하기 위한 형식입니다. |
emptyValue |
빈 문자열 | 빈 값의 문자열 표현입니다. |
enableDateTimeParsingFallback |
false |
값을 지정된 형식으로 구문 분석할 수 없는 경우 레거시 날짜 및 타임스탬프 구문 분석 동작으로 대체할지 여부입니다.
false구문 분석 실패 시 오류가 발생하거나 에 따라 null을 생성합니다mode. |
encoding 또는 charset |
UTF-8 |
CSV 파일 인코딩 이름입니다. 옵션 목록은 java.nio.charset.Charset을 참조하세요.
UTF-16이 UTF-32이면 multiline 및 true를 사용할 수 없습니다. |
enforceSchema |
true |
지정된 스키마 또는 유추된 스키마를 CSV 파일에 강제로 적용할지 여부입니다. 옵션을 사용하도록 설정하면 CSV 파일의 머리글이 무시됩니다. 자동 로더를 사용하여 데이터를 복구하고 스키마 진화를 허용하는 경우 이 옵션은 기본적으로 무시됩니다. |
escape |
\ |
데이터를 구문 분석할 때 사용할 이스케이프 문자입니다. |
extension |
csv |
예상된 파일 이름 확장명입니다. 이 확장명 없는 파일은 읽기 중에 필터링됩니다. |
failOnUnknownFields |
false |
CSV 레코드에 스키마에 없는 열이 포함되어 있을 때 실패할지 여부입니다. 인식 false할 수 없는 열이 자동으로 삭제되거나 복구 rescuedDataColumn되는 경우 . |
failOnWidenedFields |
false |
필드 값을 확장하지 않고 선언된 스키마 형식으로 구문 분석할 수 없는 경우 실패할지 여부입니다. 경우 false형식 확장 값은 에 따라 rescuedDataColumn자동으로 복구됩니다. 설정 failOnUnknownFields=true 은 이 옵션의 효과를 마스킹할 수 있습니다. |
header |
false |
CSV 파일에 머리글이 포함되어 있는지 여부입니다. 자동 로더는 스키마를 유추할 때 파일에 머리글이 있다고 가정합니다. |
ignoreLeadingWhiteSpace |
false |
파싱된 각 값의 선행 공백을 무시할지 여부입니다. |
ignoreTrailingWhiteSpace |
false |
각 값의 파싱 시 후행 공백을 무시할지 여부입니다. |
inferSchema |
false |
구문 분석된 CSV 레코드의 데이터 형식을 유추할지, 또는 모든 열을 StringType로 가정할지를 결정하는 것입니다.
true로 설정되면 데이터를 추가로 전달해야 합니다. 자동 로더의 경우 cloudFiles.inferColumnTypes를 대신 사용합니다. |
inputBufferSize |
1048576 (1MB) |
CSV 파서의 버퍼 크기(바이트)입니다. 큰 CSV 파일을 구문 분석할 때 메모리 사용량을 조정하는 데 유용합니다. 유효한 값: 양의 정수입니다. |
lineSep |
None, 포함 \r, \r\n및 \n |
연속된 두 CSV 레코드 사이의 문자열입니다. |
locale |
US |
식별자 java.util.Locale입니다. CSV 내에서 기본 날짜, 타임스탬프, 10진수 구문 분석에 영향을 미칩니다. |
maxCharsPerColumn |
-1 |
값을 구문 분석할 때 기대되는 최대 문자 수입니다. 메모리 오류를 방지하는 데 사용할 수 있습니다. 기본값은 -1이며 무제한을 의미합니다. 유효한 값: 양의 정수 또는 -1 무제한입니다. |
maxColumns |
20480 |
레코드에 사용할 수 있는 열 수에 대한 최대 한도입니다. 유효한 값: 양의 정수입니다. |
mergeSchema |
false |
여러 파일에서 스키마를 유추하고 각 파일의 스키마를 병합할지 여부입니다. 스키마를 유추할 때 자동 로더에 대해 기본적으로 사용하도록 설정됩니다. |
mode |
PERMISSIVE |
잘못된 형식의 레코드를 처리하기 위한 파서 모드입니다. 유효한 값: PERMISSIVE, DROPMALFORMED. FAILFAST |
multiLine |
false |
CSV 레코드가 여러 줄에 걸쳐 있는지 여부입니다. |
nanValue |
NaN |
FloatType 및 DoubleType 열을 구문 분석할 때 숫자가 아닌 값에 대한 문자열 표현입니다. |
negativeInf |
-Inf |
FloatType 또는 DoubleType 열을 구문 분석할 때 음의 무한대를 문자열로 표현한 것입니다. |
nullValue |
빈 문자열 | Null 값의 문자열 표현입니다. |
parserCaseSensitive(더 이상 사용되지 않음) |
false |
파일을 읽는 동안, 헤더에 선언된 열을 스키마별로 대소문자를 구분하여 정렬할지 여부를 결정합니다. 자동 로더의 경우 기본적으로 true입니다. 기능이 활성화되면 대/소문자가 다른 열이 rescuedDataColumn에서 복구됩니다.
readerCaseSensitive를 위하여 이 옵션은 더 이상 사용되지 않습니다. |
positiveInf |
Inf |
FloatType 또는 DoubleType 열을 구문 분석할 때 양의 무한대를 나타내는 문자열입니다. |
preferDate |
true |
가능한 경우 문자열을 타임스탬프 대신 날짜로 유추하려고 시도합니다. 또한 자동 로더를 사용하거나 사용하여 스키마 유추를 inferSchema 사용해야 cloudFiles.inferColumnTypes 합니다. |
quote |
" |
값에 필드 구분 기호가 포함된 경우 이 값을 이스케이프하는 데 사용되는 문자입니다. |
readerCaseSensitive |
true |
rescuedDataColumn이 활성화된 경우 대/소문자 구분 동작을 지정합니다. true이면 이름이 대/소문자를 구분하여 스키마와 다른 데이터 열을 구합니다. false이면 대/소문자를 구분하지 않는 방식으로 데이터를 읽습니다. |
rescuedDataColumn |
None | 데이터 형식 불일치 및 스키마 불일치(열 대소문자 포함)로 인해 파싱할 수 없는 모든 데이터를 별도의 열에 수집할지 여부입니다. 이 열은 자동 로더를 사용할 때 기본적으로 포함됩니다. 자세한 내용은 복구된 데이터 열이란?을 참조하세요.COPY INTO을 사용하여 스키마를 수동으로 설정할 수 없기 때문에 COPY INTO(레거시)에서는 구조 복구된 데이터 열을 지원하지 않습니다. Databricks는 대부분의 수집 시나리오에 자동 로더를 사용하는 것이 좋습니다. |
sep 또는 delimiter |
, |
열을 구분하는 문자열입니다. |
singleVariantColumn |
None | 열 이름으로 설정하면 각 필드를 자체 열로 구문 분석하는 대신 전체 CSV 레코드를 해당 이름의 단일 VariantType 열로 읽습니다.
header=true가 필요합니다. |
skipRows |
0 |
무시해야 하는 CSV 파일 시작 부분의 행 수입니다(주석이 추가된 행과 빈 행 포함).
header가 true이면 헤더는 건너뛰지 않고 주석 처리되지 않은 첫 번째 행이 됩니다. 유효한 값: 양의 정수 또는 0입니다. |
timeFormat |
HH:mm:ss |
열 값을 구문 분석하기 TimeType 위한 형식입니다. |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
타임스탬프 문자열 분석 형식입니다. |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
표준 시간대(TimestampNTZType) 문자열이 없는 타임스탬프를 구문 분석하는 형식입니다. |
timeZone |
None |
java.time.ZoneId은 타임스탬프와 날짜를 구문 분석할 때 사용하는 것입니다. |
unescapedQuoteHandling |
STOP_AT_DELIMITER |
이스케이프되지 않은 따옴표를 처리하기 위한 전략입니다. 허용되는 옵션은 다음과 같습니다.
|
Excel
| Key | 기본값 | Description |
|---|---|---|
dataAddress |
None | Excel 구문에서 읽을 셀 범위입니다. 생략하면 첫 번째 시트에서 유효한 셀을 모두 읽습니다. 명명된 시트에서 범위를 읽거나, "C5:H10" 첫 번째 시트에서 범위를 읽거나"SheetName", 특정 시트에서 모든 데이터를 읽는 데 사용합니다"SheetName!C5:H10". |
headerRows |
0 |
열 이름 머리글로 사용할 초기 행 수입니다.
dataAddress 지정되면 셀 범위 내에서 적용됩니다. 이면 0열 이름이 ,, _c3_c2등으로 _c1자동 생성됩니다. 유효한 값: 0, 1. |
ignoreMissingSheet |
false |
에서 지정 dataAddress한 시트를 포함하지 않는 파일을 자동으로 건너뛸지 여부입니다. 요청 false된 시트가 파일에 없으면 오류가 throw됩니다. 에 시트 이름을 지정 dataAddress하는 경우에만 적용됩니다. 유효한 값: true, . false |
includePhoneticRuns |
false |
XLSX 파일을 읽을 때 셀 문자열 값에 연결된 윗주 주석(예: pinyin 또는 furigana)을 포함할지 여부입니다. 유효한 값: true, . false |
operation |
readSheet |
Excel 통합 문서에서 수행할 작업입니다. 유효한 값: readSheet (시트에서 데이터를 읽음), listSheets (필드 sheetIndex: long 가 있는 구조체 및 sheetName: String 각 시트에 대해 반환). |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
Excel 문자열로 저장된 타임스탬프 없는 표준 시간대 값에 대한 사용자 지정 형식 문자열입니다. 사용자 지정 날짜 형식은 날짜/시간 패턴의 형식을 따릅니다. |
dateFormat |
yyyy-MM-dd |
로 읽은 Date문자열 값에 대한 사용자 지정 형식 문자열입니다. 사용자 지정 날짜 형식은 날짜/시간 패턴의 형식을 따릅니다. |
Json
| Key | 기본값 | Description |
|---|---|---|
allowBackslashEscapingAnyCharacter |
false |
백슬래시가 뒤따르는 모든 문자를 이스케이프할 수 있도록 허용할지 여부. 활성화되지 않으면 JSON 사양에 명시적으로 나열된 문자만 이스케이프 기능을 사용할 수 있습니다. |
allowComments |
false |
구문 분석된 콘텐츠 내에서 Java, C, C++ 스타일 주석('/', '*' 및 '//' 변형)을 사용하도록 허용할지 여부입니다. |
allowNonNumericNumbers |
true |
숫자가 아닌(NaN) 토큰 집합을 유효한 부동 숫자 값으로 허용할지 여부입니다. |
allowNumericLeadingZeros |
false |
정수가 추가적인(무시 가능한) 0으로 시작할 수 있는지 여부(예: 000001)입니다. |
allowSingleQuotes |
true |
문자열(이름 및 문자열 값)을 인용할 때 작은따옴표(아포스트로피, 문자 '\') 사용을 허용할지 여부입니다. |
allowUnquotedControlChars |
false |
JSON 문자열에 캡슐화되지 않은 컨트롤 문자(탭 및 줄 바꿈 문자를 포함하여 값이 32보다 작은 ASCII 문자)를 포함하도록 허용할지 여부입니다. |
allowUnquotedFieldNames |
false |
JSON 사양이 아닌 JavaScript에서 허용하는 따옴표 없는 필드 이름의 사용을 허용할지 여부입니다. |
alternateVariantEncoding |
None | 원본 JSON의 Variant 값에 사용되는 인코딩입니다. 인라인 JSON으로 저장되지 않고 Base85로 인코딩된 Variant 값을 디코딩하도록 Z85 설정합니다. |
badRecordsPath |
None | 잘못된 JSON 레코드에 대한 정보를 기록하기 위한 파일을 저장하는 경로입니다.badRecordsPath 파일 기반 데이터 원본에서 옵션을 사용하면 다음과 같은 제한 사항이 있습니다.
|
columnNameOfCorruptRecord |
_corrupt_record |
형식이 잘못되어 구문을 분석할 수 없는 레코드를 저장하기 위한 열입니다. 구문 분석에 대한 mode가 DROPMALFORMED로 설정되면 이 열은 비어 있습니다. |
dateFormat |
yyyy-MM-dd |
날짜 문자열을 해석하기 위한 형식입니다. |
dropFieldIfAllNull |
false |
스키마 유추 중에 모든 null 값의 열을 무시할지, 아니면 빈 배열과 구조체를 무시할지 여부입니다. |
encoding 또는 charset |
UTF-8 |
JSON 파일 인코딩의 이름입니다. 옵션 목록은 java.nio.charset.Charset을 참조하세요.
UTF-16이 UTF-32이면 multiline 및 true를 사용할 수 없습니다. |
inferTimestamp |
false |
타임스탬프 문자열을 TimestampType으로 유추할지 여부입니다. 로 설정 true하면 스키마 유추가 눈에 띄게 오래 걸릴 수 있습니다. 자동 로더와 함께 사용하려면 cloudFiles.inferColumnTypes를 설정해야 합니다. |
lineSep |
None, 포함 \r, \r\n및 \n |
연속된 두 JSON 레코드 사이의 문자열입니다. |
locale |
US |
식별자 java.util.Locale입니다. JSON 내의 기본 날짜, 타임스탬프, 10진수 구문 분석에 영향을 줍니다. |
maxNestingDepth |
500 |
JSON 개체 및 배열에 허용되는 최대 중첩 깊이입니다. 깊이 중첩된 문서에 대해 이 값을 늘입니다. 유효한 값: 양의 정수입니다. |
maxNumLen |
1000 |
JSON 입력의 최대 수 토큰 길이입니다. 숫자 리터럴이 큰 JSON의 경우 이 값을 늘입니다. 유효한 값: 양의 정수입니다. |
maxStringLen |
제한 없음 | JSON 입력에 있는 문자열 값의 최대 길이입니다. 큰 문자열로 JSON을 구문 분석할 때 메모리 사용량을 제한하도록 설정합니다. 유효한 값: 양의 정수입니다. |
mode |
PERMISSIVE |
잘못된 형식의 레코드를 처리하기 위한 파서 모드입니다. 유효한 값: PERMISSIVE, DROPMALFORMED. FAILFAST |
multiLine |
false |
JSON 레코드가 여러 줄에 걸쳐 있는지 여부입니다. |
prefersDecimal |
false |
가능하면 문자열 DecimalType 을 float 또는 double 형식 대신 유추하려고 시도합니다. 또한 자동 로더를 사용하거나 사용하여 스키마 유추를 inferSchema 사용해야 cloudFiles.inferColumnTypes 합니다. |
primitivesAsString |
false |
숫자 및 부울과 같은 기본 형식을 StringType으로 유추할지 여부입니다. |
readerCaseSensitive |
true |
rescuedDataColumn이 활성화된 경우 대/소문자 구분 동작을 지정합니다. true이면 이름이 대/소문자를 구분하여 스키마와 다른 데이터 열을 구합니다. false이면 대/소문자를 구분하지 않는 방식으로 데이터를 읽습니다. Databricks Runtime 13.3 이상에서 사용할 수 있습니다. |
rescuedDataColumn |
None | 데이터 형식 불일치 또는 스키마 불일치(열 대/소문자 포함)로 인해 구문 분석할 수 없는 모든 데이터를 별도의 열에 수집할지 여부입니다. 이 열은 자동 로더를 사용할 때 기본적으로 포함됩니다. 자세한 내용은 복구된 데이터 열이란?을 참조하세요.COPY INTO을 사용하여 스키마를 수동으로 설정할 수 없기 때문에 COPY INTO(레거시)에서는 구조 복구된 데이터 열을 지원하지 않습니다. Databricks는 대부분의 수집 시나리오에 자동 로더를 사용하는 것이 좋습니다. |
singleVariantColumn |
None | 전체 JSON 문서를 수집할지 여부를 지정한 문자열을 열 이름으로 사용하여 단일 Variant 열로 구문 분석합니다. 설정하지 않으면 JSON 필드가 자체 열로 수집됩니다. 유효한 값: 모든 문자열입니다. |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
타임스탬프 문자열 분석 형식입니다. |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
표준 시간대(TimestampNTZType) 문자열이 없는 타임스탬프를 구문 분석하는 형식입니다. |
timeZone |
None |
java.time.ZoneId은 타임스탬프와 날짜를 구문 분석할 때 사용하는 것입니다. |
upgradeExceptionAsBadRecord |
false |
형식 업그레이드 예외(예: 값이 선언된 열 형식으로 확장될 수 없는 경우)를 예외를 throw하는 대신 잘못된 레코드로 처리할지 여부입니다. |
Kafka
Kafka 판독기 옵션의 전체 목록은 DataStreamReader Kafka 옵션을 참조하세요. 다음 옵션은 .를 사용하는 spark.read.format("kafka")일괄 처리 읽기에만 적용합니다.
| Key | 기본값 | Description |
|---|---|---|
endingOffsets |
latest |
읽기를 중지할 위치입니다. 유효한 값: latest또는 각 파티션에 대한 오프셋의 JSON 문자열(예: {"topicA":{"0":50,"1":-1}}.JSON 문자열 -1 에서 최신 오프셋입니다.
-2가장 빠른 오프셋인 경우 끝 오프셋으로 사용할 수 없습니다. |
endingOffsetsByTimestamp |
None | 타임스탬프로 지정된 파티션당 종료 오프셋(밀리초)입니다. 유효한 값: 각 파티션에 대한 타임스탬프의 JSON 문자열(예: {"topicA":{"0":2000,"1":3000}}. |
endingTimestamp |
None | 모든 파티션에 적용되는 전역 종료 타임스탬프(밀리초)입니다. 유효한 값: 음수가 아닌 정수입니다. |
오크
| Key | 기본값 | Description |
|---|---|---|
mergeSchema |
false |
여러 파일에서 스키마를 유추하고 각 파일의 스키마를 병합할지 여부입니다. |
쪽모이 세공 마루
| Key | 기본값 | Description |
|---|---|---|
datetimeRebaseMode |
LEGACY |
율리우스력과 프로렙틱 그레고리력 사이에서 날짜 및 타임스탬프 값의 재조정을 제어합니다. 유효한 값은 EXCEPTION, LEGACY 및 CORRECTED입니다. |
int96RebaseMode |
LEGACY |
율리우스력과 프로렙틱 그레고리력 사이에서 INT96 타임스탬프 값의 기준을 다시 지정하는 것을 제어합니다. 유효한 값은 EXCEPTION, LEGACY 및 CORRECTED입니다. |
mergeSchema |
false |
여러 파일에서 스키마를 유추하고 각 파일의 스키마를 병합할지 여부입니다. |
readerCaseSensitive |
true |
rescuedDataColumn이 활성화된 경우 대/소문자 구분 동작을 지정합니다. true이면 이름이 대/소문자를 구분하여 스키마와 다른 데이터 열을 구합니다. false이면 대/소문자를 구분하지 않는 방식으로 데이터를 읽습니다. |
rescuedDataColumn |
None | 데이터 형식 불일치 및 스키마 불일치(열 대소문자 포함)로 인해 파싱할 수 없는 모든 데이터를 별도의 열에 수집할지 여부입니다. 이 열은 자동 로더를 사용할 때 기본적으로 포함됩니다. 자세한 내용은 복구된 데이터 열이란?을 참조하세요.COPY INTO을 사용하여 스키마를 수동으로 설정할 수 없기 때문에 COPY INTO(레거시)에서는 구조 복구된 데이터 열을 지원하지 않습니다. Databricks는 대부분의 수집 시나리오에 자동 로더를 사용하는 것이 좋습니다. |
상태 저장소
이러한 옵션과 테이블 spark.read.format("statestore") 반환 함수를 read_statestore 사용하여 구조적 스트리밍 상태 데이터를 읽습니다.
구조적 스트리밍 상태 정보 읽기를 참조하세요.
| Key | 기본값 | Description |
|---|---|---|
batchId |
최신 일괄 처리 ID | 읽을 대상 일괄 처리입니다. 쿼리의 이전 상태를 쿼리하는 데 사용합니다. 배치는 커밋되어야 하지만 아직 정리되지 않았습니다. 유효한 값: 음수가 아닌 정수입니다. |
operatorId |
0 |
읽을 대상 연산자입니다. 쿼리에 여러 상태 저장 연산자가 있는 경우에 사용합니다. 유효한 값: 음수가 아닌 정수입니다. |
storeName |
DEFAULT |
읽을 대상 상태 저장소 이름입니다. 상태 저장 연산자에 여러 상태 저장소 인스턴스가 있는 경우에 사용합니다. 스트림 스트림 조인을 storeNamejoinSide 지정해야 하지만 둘 다 지정하지는 않습니다. 유효한 값: 모든 문자열입니다. |
joinSide |
None | 스트림 스트림 조인에 대해 읽을 대상 쪽입니다. 스트림 스트림 조인을 storeNamejoinSide 지정해야 하지만 둘 다 지정하지는 않습니다. 유효한 값: left, . right |
snapshotStartBatchId |
None | 상태를 읽을 때 시작점으로 사용할 스냅샷의 일괄 처리 ID입니다. 판독기는 이 스냅샷에서 다음까지 batchId변경 내용을 재생하여 상태를 다시 작성합니다. 스냅샷이 손상된 경우에 유용합니다. 와 함께 snapshotPartitionId지정해야 합니다. 와 함께 readChangeFeed사용할 수 없습니다. 변경 로그 검사점을 사용하도록 설정된 HDFS 지원 상태 저장소 및 RocksDB 상태 저장소를 지원합니다. Databricks Runtime 15.4 LTS 이상에서 사용할 수 있습니다. 유효한 값: 음수가 아닌 정수입니다. |
snapshotPartitionId |
None | 지정된 경우 쿼리는 이 파티션만 읽습니다. 와 함께 snapshotStartBatchId지정해야 합니다. 와 함께 readChangeFeed사용할 수 없습니다. Databricks Runtime 15.4 LTS 이상에서 사용할 수 있습니다. 유효한 값: 음수가 아닌 정수입니다. |
readChangeFeed |
false |
이면 true지정된 일괄 처리 범위에서의 상태 변경 내용을 반환 changeStartBatchId 합니다 changeEndBatchId.
changeStartBatchId가 필요합니다. , batchId또는 snapshotStartBatchIdsnapshotPartitionId.와 함께 joinSide사용할 수 없습니다. Databricks Runtime 16.4 LTS 이상에서 사용할 수 있습니다. 유효한 값: true, . false자세한 내용은 구조적 스트리밍 상태 변경 내용 읽기를 참조하세요. |
changeStartBatchId |
None | 변경 피드 범위에 대한 시작 일괄 처리 ID입니다.
readChangeFeed이 true인 경우 필요합니다. 로 readChangeFeed 설정된 true경우에만 적용됩니다. Databricks Runtime 16.4 LTS 이상에서 사용할 수 있습니다. 유효한 값: 음수가 아닌 정수입니다. |
changeEndBatchId |
최신 일괄 처리 ID | 변경 피드 범위의 끝 일괄 처리 ID입니다. 보다 크거나 같아야 합니다 changeStartBatchId. 로 readChangeFeed 설정된 true경우에만 적용됩니다. Databricks Runtime 16.4 LTS 이상에서 사용할 수 있습니다. 유효한 값: 음수가 아닌 정수입니다. |
stateVarName |
None | 읽을 상태 변수 이름입니다. 상태 변수 이름은 연산자가 사용하는 함수 내에서 init 각 변수의 고유 이름입니다StatefulProcessor.transformWithState 연산자를 transformWithState 사용할 때 필요합니다. Databricks Runtime 16.4 LTS 이상에서 사용할 수 있습니다. 유효한 값: 모든 문자열입니다. |
readRegisteredTimers |
false |
이 경우 true연산자가 사용하는 등록된 타이머를 transformWithState 읽습니다. 연산자에 transformWithState 만 적용됩니다. Databricks Runtime 16.4 LTS 이상에서 사용할 수 있습니다. 유효한 값: true, . false |
flattenCollectionTypes |
true |
이면 true맵 및 목록 상태 변수에 대해 반환된 레코드를 평면화합니다. 이면 false레코드를 Spark SQL Array 또는 Map.로 반환합니다. 연산자에 transformWithState 만 적용됩니다. Databricks Runtime 16.4 LTS 이상에서 사용할 수 있습니다. 유효한 값: true, . false |
텍스트
| Key | 기본값 | Description |
|---|---|---|
encoding |
UTF-8 |
TEXT 파일 줄 구분 기호의 인코딩 이름입니다. 옵션 목록은 다음을 참조하세요 java.nio.charset.Charset. 파일의 콘텐츠는 이 옵션의 영향을 받지 않으며 as-is로 읽힙니다. |
lineSep |
없음( 포함 \r) \r\n 및 \n |
연속된 두 텍스트 레코드 사이의 문자열입니다. |
wholeText |
false |
파일을 단일 레코드로 읽을지 여부입니다. |
Xml
| Key | 기본값 | Description |
|---|---|---|
rowTag |
None | 행으로 처리할 XML 파일의 행 태그입니다. 예를 들어, XML이 <books> <book><book>...<books> 형태라면, 적절한 값은 book입니다. 필수 옵션입니다. |
samplingRatio |
1.0 |
스키마 유추에 사용되는 행의 비율을 정의합니다. XML 기본 제공 함수는 이 옵션을 무시합니다. 유효한 값: 0.0 to .1.0 |
excludeAttribute |
false |
요소의 특성을 제외할지 여부입니다. |
mode |
None | 구문 분석 중 손상된 레코드를 처리하는 모드.
PERMISSIVE: 손상된 레코드의 경우 잘못된 문자열을 columnNameOfCorruptRecord로 설정된 필드에 넣고 잘못된 필드들을 null로 설정합니다. 손상된 레코드를 유지하려면 사용자 정의 스키마에서 string 타입의 columnNameOfCorruptRecord라는 필드를 설정할 수 있습니다. 스키마에 필드가 없으면 구문 분석 중에 손상된 레코드가 삭제됩니다. 스키마를 유추할 때 파서가 출력 스키마에 columnNameOfCorruptRecord 필드를 암시적으로 추가합니다.
DROPMALFORMED: 손상된 레코드를 무시합니다. 이 모드는 XML 기본 제공 함수에 대해 지원되지 않습니다.
FAILFAST: 파서가 손상된 레코드를 만나면 예외를 발생시킵니다. |
inferSchema |
true |
true인 경우 각 결과 DataFrame 열에 대해 적절한 유형을 유추하려고 시도합니다.
false인 경우 모든 결과 열은 string 유형입니다. XML 기본 제공 함수는 이 옵션을 무시합니다. |
columnNameOfCorruptRecord |
spark.sql.columnNameOfCorruptRecord |
모드에서 만든 PERMISSIVE 형식이 잘못된 문자열을 포함하는 새 필드의 이름을 바꾸도록 허용합니다. |
attributePrefix |
None | 특성과 요소를 구별하기 위한 특성의 접두사. 이는 필드 이름의 접두사입니다. 기본값은 _입니다. XML을 읽는 경우 비워 둘 수 있지만 쓰기에는 비워 둘 수 없습니다.
DataFrameWriter XML 옵션에도 적용됩니다. |
valueTag |
_VALUE |
요소 내에서 속성이나 자식 요소가 있는 문자 데이터에 사용되는 태그입니다. 사용자는 스키마에서 valueTag 필드를 지정할 수 있습니다. 또는 문자 데이터가 다른 요소 또는 특성이 있는 요소에 있을 때 스키마 유추 중에 자동으로 추가됩니다.
DataFrameWriter XML 옵션에도 적용됩니다. |
encoding |
UTF-8 |
읽기를 위해 지정된 인코딩 형식으로 XML 파일을 디코드합니다. 쓰기의 경우 저장된 XML 파일의 인코딩(charset)을 지정합니다. XML 기본 제공 함수는 이 옵션을 무시합니다. DataFrameWriter XML 옵션에도 적용됩니다. |
ignoreSurroundingSpaces |
true |
값을 둘러싼 공백을 건너뛰어야 하는지 여부입니다. 공백 전용 문자 데이터는 무시됩니다. |
rowValidationXSDPath |
None | 각 행을 개별적으로 검증하기 위해 사용되는 선택적 XSD 파일의 경로입니다. 유효성 검사에 실패한 행은 구문 분석 오류처럼 처리됩니다. XSD는 제공되거나 유추되었는지 여부에 관계없이 스키마에 영향을 주지 않습니다. |
ignoreNamespace |
false |
이 경우 trueXML 요소 및 특성에 대한 네임스페이스의 접두사는 무시됩니다. 예를 들어 <abc:author> 태그와 <def:author> 태그는 둘 다 <author>로 처리됩니다. 네임스페이스는 rowTag 요소 자체에서 무시할 수 없으며, 읽기 권한 자식 요소에서만 무시 가능합니다. XML 파싱은 false여도 네임스페이스를 인식하지 않습니다. |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
날짜/시간 형식 패턴을 따르는 사용자 지정 타임스탬프 형식 문자열입니다. 이는 timestamp 형식에 적용됩니다.
DataFrameWriter XML 옵션에도 적용됩니다. |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
시간대 없이 타임스탬프를 나타내는 사용자 지정 형식 문자열로 날짜/시간 형식 패턴을 따릅니다. 이는 TimestampNTZType 유형에 적용됩니다. DataFrameWriter XML 옵션에도 적용됩니다. |
dateFormat |
yyyy-MM-dd |
날짜/시간 형식 패턴을 따르는 사용자 지정 날짜 형식 문자열입니다. 이는 날짜 형식에 적용됩니다. DataFrameWriter XML 옵션에도 적용됩니다. |
locale |
en-US |
로캘을 IETF BCP 47 형식의 언어 태그로 설정합니다. 예를 들어 날짜 및 타임스탬프를 파싱할 때 locale을 사용합니다. |
nullValue |
문자열 null |
Null 값의 문자열 표현을 설정합니다. 이 값이 null인 경우 파서는 필드의 속성과 요소를 기록하지 않습니다.
DataFrameWriter XML 옵션에도 적용됩니다. |
readerCaseSensitive |
true |
rescuedDataColumn이 활성화된 경우 대소문자 구분 동작을 지정합니다. true이면 이름이 대/소문자를 구분하여 스키마와 다른 데이터 열을 구합니다. false이면 대/소문자를 구분하지 않는 방식으로 데이터를 읽습니다. |
rescuedDataColumn |
None | 데이터 형식 불일치 및 스키마 불일치(열 대소문자 구분 포함)로 인해 구문 분석할 수 없는 모든 데이터를 별도의 열에 수집할지 여부입니다. 이 열은 자동 로더를 사용할 때 기본적으로 포함됩니다. 자세한 내용은 복구된 데이터 열이란 무엇인가요?를 참조하세요.
COPY INTO을 사용하여 스키마를 수동으로 설정할 수 없기 때문에 COPY INTO(레거시)에서는 구조 복구된 데이터 열을 지원하지 않습니다. Databricks는 대부분의 수집 시나리오에 자동 로더를 사용하는 것이 좋습니다. |
singleVariantColumn |
none |
단일 변형 열의 이름을 지정합니다. 이 옵션을 읽기 위해 지정한 경우 전체 XML 레코드를 지정된 옵션 문자열 값을 열 이름으로 사용하여 단일 Variant 열로 구문 분석합니다. 이 옵션을 작성하기 위해 제공된 경우 단일 Variant 열의 값을 XML 파일에 씁니다. DataFrameWriter XML 옵션에도 적용됩니다. |
useLegacyXMLParser |
true |
레거시 XML 파서 사용 여부입니다. 레거시 파서는 잘못된 형식의 콘텐츠에 대해 덜 엄격한 유효성 검사를 수행하지만 메모리 효율은 낮습니다. 더 엄격한 기본 파서에 옵트인하도록 false 설정합니다. |
wildcardColName |
xs_any |
와일드카드(xs:any) 스키마 요소와 일치하는 XML 요소를 캡처하는 데 사용되는 열 이름입니다. 와 함께 rescuedDataColumn사용할 수 없습니다. |
DataStreamReader 옵션
이러한 옵션을 DataStreamReader.option() 사용하여 Delta Lake 테이블 및 기타 파일 기반 원본에서 스트리밍 읽기를 구성합니다.
파일 형식 옵션(JSON, CSV, Parquet 등)은 DataFrameReader 옵션을 참조하세요.
자동 로더(cloudFiles.*) 옵션은 자동 로더를 참조하세요.
Example
다음 예제에서는 Delta Lake 테이블 스트림에 대해 설정합니다 maxFilesPerTrigger10 .
Python
df = spark.readStream.format("delta").option("maxFilesPerTrigger", 10).load("/path/to/delta-table")
스칼라
val df = spark.readStream.format("delta").option("maxFilesPerTrigger", "10").load("/path/to/delta-table")
일반적인
다음 옵션은 Delta Lake 테이블 및 기타 파일 기반 스트리밍 원본에 적용됩니다.
| Key | 기본값 | Description |
|---|---|---|
cleanSource |
off |
스트림에서 처리한 후 원본 파일을 처리하는 방법입니다. 유효한 값: off (작업 없음), delete (원본 파일을 영구적으로 삭제), archive (이동 sourceArchiveDir). 로 archive설정하면 설정 sourceArchiveDir 해야 합니다. Delta Lake 테이블 스트리밍에는 적용되지 않습니다. |
fileNameOnly |
false |
전체 경로가 아닌 파일 이름으로만 이미 처리된 파일을 식별할지 여부입니다. 경우 true동일한 파일 이름을 가진 다른 경로에 있는 파일은 동일한 파일로 처리되고 다시 처리되지 않습니다. Delta Lake 테이블 스트리밍에는 적용되지 않습니다. |
latestFirst |
false |
각 마이크로 일괄 처리 내에서 가장 최근에 수정된 파일을 먼저 처리할지 여부입니다. 최신 데이터를 최대한 빨리 처리하려는 경우에 유용합니다.
maxFilesPerTrigger 설정 true 되거나 maxBytesPerTrigger 설정 maxFileAge 되면 무시됩니다. Delta Lake 테이블 스트리밍에는 적용되지 않습니다. |
maxBytesPerTrigger |
None | 각 마이크로 일괄 처리에 대해 처리되는 데이터의 양에 대한 소프트 최대값입니다. 가장 작은 입력 단위가 초과하면 일괄 처리가 제한보다 더 많이 처리할 수 있습니다. 마이크로 일괄 처리와 함께 maxFilesPerTrigger사용하면 한도에 먼저 도달할 때까지 데이터를 처리합니다. 유효한 값: 양의 정수입니다.자동 로더의 경우 cloudFiles.maxBytesPerTrigger를 대신 사용합니다. 일반을 참조 하세요. |
maxCachedFiles |
10000 |
후속 마이크로 일괄 처리를 위해 캐시할 처리되지 않은 파일의 최대 수입니다. 캐싱을 끄도록 0 설정합니다. 원본 디렉터리에 각 트리거에 대한 많은 새 파일이 포함된 경우 이 값을 늘입니다. Delta Lake 테이블 스트리밍에는 적용되지 않습니다. 유효한 값: 양의 정수 또는 0. |
maxFileAge |
7d |
현재 시스템 시간이 아닌 가장 최근에 수정된 파일의 타임스탬프를 기준으로 처리할 것으로 간주되는 파일의 최대 기간입니다. 이 임계값보다 오래된 파일은 무시됩니다. 기간 문자열(예: 7d 또는 4h.)을 허용합니다. 설정 truemaxFilesPerTriggermaxBytesPerTrigger 되거나 설정된 경우 latestFirst 무시됩니다. Delta Lake 테이블 스트리밍에는 적용되지 않습니다. |
maxFilesPerTrigger |
1000 Delta Lake 및 Auto Loader의 경우 다른 파일 기반 원본의 최대값은 없습니다. |
각 마이크로 일괄 처리에서 처리되는 새 파일 수에 대한 상한입니다. 마이크로 일괄 처리와 함께 maxBytesPerTrigger사용하면 한도에 먼저 도달할 때까지 데이터를 처리합니다. 유효한 값: 양의 정수입니다.자동 로더의 경우 cloudFiles.maxFilesPerTrigger를 대신 사용합니다. 일반을 참조 하세요. |
sourceArchiveDir |
None | 로 설정된 archive경우 보관 디렉터리의 cleanSource 경로입니다. 원본 파일은 처리 후 이 경로로 이동하여 상대 디렉터리 구조를 유지합니다. Delta Lake 테이블 스트리밍에는 적용되지 않습니다. |
자동 로더
원본에서 cloudFiles 이러한 옵션을 사용하여 클라우드 스토리지에서 스트리밍 수집을 위한 자동 로더 를 구성합니다. 원본과 cloudFiles 관련된 옵션은 다른 구조적 스트리밍 원본 옵션과 cloudFiles 별도의 네임스페이스에 유지하기 위해 접두사로 사용됩니다.
일반적인
| Key | 기본값 | 설명 |
|---|---|---|
cloudFiles.allowOverwrites |
false |
입력 디렉터리 파일 변경에서 기존 데이터를 덮어쓰도록 허용할지 여부입니다. 구성 주의 사항은 파일이 추가되거나 덮어쓰여질 때 자동 로더가 파일을 다시 처리하나요?를 참조하세요. |
cloudFiles.backfillInterval |
None | 자동 로더는 지정된 간격으로 비동기 백필을 트리거할 수 있습니다. 예를 들어 1 day 매일 백필하거나 1 week 매주 백필합니다. 자세한 내용은 cloudFiles.backfillInterval을 사용하여 일반 백필 트리거를 참조하세요.로 설정된 cloudFiles.useManagedFileEvents경우 true 사용하지 마세요. |
cloudFiles.cleanSource |
OFF |
입력 디렉터리에서 처리된 파일을 자동으로 삭제할지 여부입니다. (기본값)으로 OFF 설정하면 파일이 삭제되지 않습니다.설정 DELETE하면 자동 로더는 파일이 처리된 후 30일 후에 자동으로 삭제됩니다. 이렇게 하려면 자동 로더에 원본 디렉터리에 대한 쓰기 권한이 있어야 합니다.설정 MOVE하면 자동 로더는 파일이 처리된 후 30일 이내에 cloudFiles.cleanSource.moveDestination 자동으로 지정된 위치로 이동합니다. 이렇게 하려면 자동 로더에 원본 디렉터리 및 이동 위치에 대한 쓰기 권한이 있어야 합니다.파일은 테이블 반환 함수의 commit_time 결과로 null이 아닌 값 cloud_files_state 이 있는 경우 처리된 것으로 간주됩니다. 테이블 값 함수 cloud_files_state을 참조하세요. 을 사용하여 cloudFiles.cleanSource.retentionDuration처리 후 30일 추가 대기를 구성할 수 있습니다.사용하도록 설정하기 전에 다음 고려 사항을 검토합니다 cloudFiles.cleanSource.
Databricks Runtime 16.4 이상에서 사용할 수 있습니다. |
cloudFiles.cleanSource.retentionDuration |
30 days |
처리된 파일이 다음을 사용하여 보관 cleanSource할 후보가 되기 전에 대기할 시간입니다. 의 경우 DELETE7일보다 커야 합니다. 에 대한 MOVE최소 제한 없음값은 CalendarInterval 문자열입니다. 예: "14 days", "30 days", "2 weeks" 또는 "1 month".Databricks Runtime 16.4 이상에서 사용할 수 있습니다. |
cloudFiles.cleanSource.moveDestination |
None |
cloudFiles.cleanSource가 MOVE로 설정될 때 처리된 파일을 보관할 경로입니다. 클라우드 스토리지 경로 또는 Unity 카탈로그 볼륨 경로(예: /Volumes/my_catalog/my_schema/my_volume/archive/)일 수 있습니다.이동 위치는 다음을 수행해야 합니다.
자동 로더에는 이 디렉터리에 대한 쓰기 권한이 있어야 합니다. Databricks Runtime 16.4 이상에서 사용할 수 있습니다. |
cloudFiles.format |
없음(필수 옵션) | 원본 경로의 데이터 파일 형식 입니다. 유효한 값은 다음과 같습니다. |
cloudFiles.includeExistingFiles |
true |
스트림 처리 입력 경로에 기존 파일을 포함할지 아니면 초기 설정 후에 도착하는 새 파일만 처리할지 여부입니다. 이 옵션은 스트림을 처음 시작할 때만 평가됩니다. 스트림을 다시 시작한 후 이 옵션을 변경해도 효과가 없습니다. |
cloudFiles.inferColumnTypes |
false |
스키마 유추를 활용할 때 정확한 열 형식을 유추할지 여부입니다. 기본적으로 열은 JSON 및 CSV 데이터 세트를 유추할 때 문자열로 유추됩니다. 자세한 내용은 스키마 유추 를 참조하세요. |
cloudFiles.maxBytesPerTrigger |
None | 모든 트리거에서 처리할 새 바이트의 최대 수입니다.
10g와 같은 바이트 문자열을 지정하여 각 마이크로 일괄 처리를 10GB의 데이터로 제한할 수 있습니다. 이는 유연한 최대값입니다. 각각 3GB인 파일이 있는 경우 Azure Databricks는 마이크로 일괄 처리에서 12GB를 처리합니다.
cloudFiles.maxFilesPerTrigger와 함께 사용하면 Azure Databricks는 cloudFiles.maxFilesPerTrigger 또는 cloudFiles.maxBytesPerTrigger 중 먼저 도달하는 하한까지 소비합니다. 이 옵션은 Trigger.Once()와 함께 사용하면 효과가 없습니다(Trigger.Once()는 사용되지 않음).Databricks Runtime 18.0 이상에서 이 옵션은 동적으로 구성되며 수동으로 설정할 필요가 없습니다. |
cloudFiles.maxFileAge |
None | 중복 제거를 위해 파일 이벤트를 추적하는 기간입니다. Databricks는 이 매개 변수를 튜닝하는 것을 권장하지 않습니다. 단, 시간당 수백만 개의 파일로 데이터를 수집하는 경우에는 예외입니다. 자세한 내용은 파일 이벤트 추적 섹션을 참조하세요.cloudFiles.maxFileAge를 지나치게 과감하게 튜닝하면 중복 수집이나 파일 누락과 같은 데이터 품질 문제가 발생할 수 있습니다. 따라서 Databricks는 다른 데이터 수집 솔루션의 권장 사항과 유사하게 cloudFiles.maxFileAge를 90일과 같은 보수적인 값으로 설정할 것을 권장합니다. |
cloudFiles.maxFilesPerTrigger |
1000 |
모든 트리거에서 처리할 새 파일의 최대 수입니다.
cloudFiles.maxBytesPerTrigger와 함께 사용하면 Azure Databricks는 cloudFiles.maxFilesPerTrigger 또는 cloudFiles.maxBytesPerTrigger 중 먼저 도달하는 하한까지 소비합니다. 이 옵션은 Trigger.Once()와 함께 사용하면 효과가 없습니다(사용되지 않음).Databricks Runtime 18.0 이상에서 이 옵션은 동적으로 구성되며 수동으로 설정할 필요가 없습니다. |
cloudFiles.partitionColumns |
None | 파일의 디렉터리 구조에서 유추하려는 Hive 스타일 파티션 열의 쉼표로 구분된 목록입니다. Hive 스타일 파티션 열은 같은 같음 기호 <base-path>/a=x/b=1/c=y/file.format와 결합된 키-값 쌍입니다. 이 예제에서 파티션 열은 a, b그리고 c입니다. 기본적으로, 스키마 유추를 사용하는 경우 <base-path>을 사용하여 데이터를 로드할 수 있는 출처를 제공하면 이러한 열이 스키마에 자동으로 추가됩니다. 스키마를 제공하면 자동 로더는 이러한 열이 스키마에 포함될 것으로 예상합니다. 이러한 열을 스키마의 일부로 사용하지 않으려면 ""를 지정하여 이러한 열을 무시할 수 있습니다. 또한 아래 예와 같이 복잡한 디렉터리 구조에서 열이 파일 경로로 유추되기를 원할 때 이 옵션을 사용할 수 있습니다.<base-path>/year=2022/week=1/file1.csv<base-path>/year=2022/month=2/day=3/file2.csv<base-path>/year=2022/month=2/day=4/file3.csvcloudFiles.partitionColumns을 year,month,day로 지정하면 year=2022에 대해 file1.csv가 반환되지만, month 및 day 열은 null입니다.month와 day는 file2.csv와 file3.csv에 대해 올바르게 구문 분석됩니다. |
cloudFiles.schemaEvolutionMode |
addNewColumns 스키마가 제공되지 none 않으면 |
데이터에서 새 열이 검색될 때 스키마를 발전시키는 모드입니다. 기본적으로 열은 JSON 데이터 세트를 유추할 때 문자열로 유추됩니다. 자세한 내용은 스키마 진화 를 참조하세요. |
cloudFiles.schemaHints |
None | 스키마 유추 중에 자동 로더에 제공하는 스키마 정보입니다. 자세한 내용은 스키마 힌트를 참조하세요. |
cloudFiles.schemaLocation |
없음(스키마를 유추하는 데 필요) | 유추된 스키마 및 후속 변경 내용을 저장할 위치입니다. 자세한 내용은 스키마 유추 를 참조하세요. |
cloudFiles.useStrictGlobber |
false |
Apache Spark에서 다른 파일 원본의 기본 globbing 동작과 일치하는 엄격한 글로버를 사용할지 여부입니다. 자세한 내용은 일반 데이터 로드 패턴을 참조하세요 . Databricks Runtime 12.2 LTS 이상에서 지원됩니다. |
cloudFiles.validateOptions |
true |
자동 로더 옵션의 유효성을 검사하고 알 수 없거나 일치하지 않는 옵션에 대해 오류를 반환할지 여부입니다. |
디렉터리 목록
파일 알림
필요한 클라우드 권한, 설정 지침 및 인증 방법을 포함하여 파일 알림 모드를 구성하는 방법에 대한 자세한 내용은 파일 알림 모드에서 자동 로더 스트림 구성을 참조하세요.
| Key | 기본값 | 설명 |
|---|---|---|
cloudFiles.fetchParallelism |
1 |
큐 서비스에서 메시지를 가져올 때 사용할 스레드 수입니다. 로 설정된 cloudFiles.useManagedFileEvents경우 true 사용하지 마세요. |
cloudFiles.pathRewrites |
None | 여러 S3 버킷에서 파일 알림을 수신하는 파일을 지정 queueUrl 하고 이러한 컨테이너의 데이터에 액세스하도록 구성된 탑재 지점을 사용하려는 경우에만 필요합니다.
bucket/key 경로의 접두사를 마운트 포인트로 변경하려면 이 옵션을 사용하십시오. 접두사만 다시 쓸 수 있습니다. 예를 들어, 구성 {"<databricks-mounted-bucket>/path": "dbfs:/mnt/data-warehouse"}의 경우, 경로 s3://<databricks-mounted-bucket>/path/2017/08/fileA.json는 dbfs:/mnt/data-warehouse/2017/08/fileA.json로 다시 작성됩니다.로 설정된 cloudFiles.useManagedFileEvents경우 true 사용하지 마세요. |
cloudFiles.resourceTag |
None | 관련 리소스를 연결하고 식별할 수 있도록 하는 일련의 키-값 태그 쌍입니다. 예를 들면 다음과 같습니다.cloudFiles.option("cloudFiles.resourceTag.myFirstKey", "myFirstValue") .option("cloudFiles.resourceTag.mySecondKey", "mySecondValue")AWS에 대한 자세한 내용은 Amazon SQS 비용 할당 태그 및 Amazon SNS 토픽에 대한 태그 구성을 참조하세요. (1) Azure에 대한 자세한 내용은 GCP에 대한 자세한 내용은 레이블을 사용하여 사용 현황 보고를 참조하세요. (1) 로 설정된 cloudFiles.useManagedFileEvents경우 true 사용하지 마세요. 대신 클라우드 공급자 콘솔을 사용하여 리소스 태그를 설정합니다. |
cloudFiles.useManagedFileEvents |
false |
설정 true되면 자동 로더는 파일 이벤트 서비스를 사용하여 외부 위치에서 파일을 검색합니다. 로드 경로가 파일 이벤트를 사용하도록 설정된 외부 위치에 있는 경우에만 이 옵션을 사용할 수 있습니다.
파일 이벤트와 함께 파일 알림 모드 사용을 참조하세요.자동 로더는 마지막 실행 후 새 파일을 검색할 수 있으므로 파일 이벤트는 파일 검색에서 알림 수준 성능을 제공합니다. 디렉터리 목록과 달리 이 프로세스는 디렉터리의 모든 파일을 나열할 필요가 없습니다. 파일 이벤트 옵션을 사용하도록 설정하더라도 자동 로더가 디렉터리 목록을 사용하는 경우가 있습니다.
자동 로더가 이 옵션과 함께 디렉터리 목록을 사용하는 경우의 포괄적인 상황 목록은 파일 이벤트가 있는 자동 로더가 디렉터리 목록을 사용하는 경우를 참조하세요. Databricks Runtime 14.3 LTS 이상에서 사용할 수 있습니다. |
cloudFiles.listOnStart |
false |
설정하면 true자동 로더는 검사점에서 연속 토큰으로 시작하는 대신 스트림이 시작될 때 전체 디렉터리 목록을 수행합니다. 다음과 같은 CF_MANAGED_FILE_EVENTS_INVALID_CONTINUATION_TOKEN오류에서 복구하려면 이 옵션을 사용합니다.
오류에서 복구하려면 어떻게 해야 하나요?를 CF_MANAGED_FILE_EVENTS_INVALID_CONTINUATION_TOKEN 참조하세요. |
cloudFiles.useNotifications |
false |
파일 알림 모드를 사용하여 새 파일이 있는지 확인할지 여부입니다.
false인 경우 디렉터리 나열 모드를 사용합니다.
자동 로더 파일 검색 모드 비교를 참조하세요.로 설정된 cloudFiles.useManagedFileEvents경우 true 사용하지 마세요. |
(1) 자동 로더는 기본적으로 최상의 노력으로 다음 키-값 태그 쌍을 추가합니다.
-
vendor:Databricks -
path: 데이터가 로드되는 위치입니다. 레이블 지정 제한으로 인해 GCP에서 사용할 수 없습니다. -
checkpointLocation: 스트림 검사점의 위치입니다. 레이블 지정 제한으로 인해 GCP에서 사용할 수 없습니다. -
streamId: 스트림의 전역 고유 식별자입니다.
Databricks는 이러한 키 이름을 예약하며 해당 값을 덮어쓸 수 없습니다.
클라우드 관련
자동 로더는 파일 알림 모드에 대한 클라우드 인프라를 구성하는 옵션을 제공합니다. 필요한 클라우드 권한 및 설정 지침은 파일 알림 모드에서 자동 로더 스트림 구성을 참조하세요.
Aws
자동 로더를 선택하여 cloudFiles.useNotifications = true 알림 서비스를 설정하려는 경우에만 다음 옵션을 제공합니다.
| Key | 기본값 | 설명 |
|---|---|---|
cloudFiles.region |
EC2 인스턴스의 지역 | 원본 S3 버킷이 있는 지역 및 AWS SNS 및 SQS 서비스를 만들려는 지역입니다. |
| Key | 기본값 | 설명 |
|---|---|---|
cloudFiles.restrictNotificationSetupToSameAWSAccountId |
false |
SNS 토픽과 동일한 계정에서 AWS S3 버킷의 이벤트 알림만 허용합니다. true이면 자동 로더는 SNS 토픽과 동일한 계정의 AWS S3 버킷에서 이벤트 알림만 허용합니다. 액세스 false정책이 계정 간 버킷 및 SNS 토픽 설정을 제한하지 않는 경우 이는 SNS 토픽 및 버킷 경로가 다른 계정과 연결된 경우에 유용합니다.Databricks Runtime 17.2 이상에서 사용할 수 있습니다. |
cloudFiles.useNotifications
=
true를 선택하고 자동 로더가 이미 설정한 큐를 사용하도록 하려는 경우에만 다음 옵션을 제공합니다.
| Key | 기본값 | 설명 |
|---|---|---|
cloudFiles.queueUrl |
None | SQS 큐의 URL입니다. 제공되는 경우 자동 로더는 자체 AWS SNS 및 SQS 서비스를 설정하는 대신 이 큐의 이벤트를 직접 사용합니다. |
AWS 인증 옵션
Databricks 서비스 자격 증명을 사용하려면 다음 인증 옵션을 제공합니다.
| Key | 기본값 | 설명 |
|---|---|---|
databricks.serviceCredential |
None | Databricks 서비스 자격 증명의 이름입니다. Databricks Runtime 16.1 이상에서 사용할 수 있습니다. |
Databricks 서비스 자격 증명 또는 IAM 역할을 사용할 수 없는 경우 대신 다음 인증 옵션을 제공할 수 있습니다.
| Key | 기본값 | 설명 |
|---|---|---|
cloudFiles.awsAccessKey |
None | 사용자의 AWS 액세스 키 ID입니다.
cloudFiles.awsSecretKey이 제공되어야 합니다. |
cloudFiles.awsSecretKey |
None | 사용자의 AWS 비밀 액세스 키입니다.
cloudFiles.awsAccessKey이 제공되어야 합니다. |
cloudFiles.roleArn |
None | 필요한 경우 사용할 IAM 역할의 ARN입니다. 역할은 클러스터의 인스턴스 프로필에서 또는 자격 증명과 함께 cloudFiles.awsAccessKey 제공하여 가정할 수 있습니다 cloudFiles.awsSecretKey. |
cloudFiles.roleExternalId |
None |
cloudFiles.roleArn을 사용하여 역할을 가정하는 동안 제공할 식별자입니다. |
cloudFiles.roleSessionName |
None | 를 사용하여 cloudFiles.roleArn역할을 가정하는 동안 사용할 선택적 세션 이름입니다. |
cloudFiles.stsEndpoint |
None |
cloudFiles.roleArn을 사용하여 역할을 가정할 때 AWS STS에 액세스하기 위해 제공하는 선택적 엔드포인트입니다. |
Azure
cloudFiles.useNotifications
=
true를 지정하고 자동 로더를 통해 알림 서비스를 설정하려는 경우 다음과 같은 모든 옵션의 값을 제공해야 합니다.
| Key | 기본값 | 설명 |
|---|---|---|
cloudFiles.resourceGroup |
None | 스토리지 계정이 만들어지는 Azure 리소스 그룹입니다. |
cloudFiles.subscriptionId |
None | 리소스 그룹을 만드는 Azure 구독 ID입니다. |
databricks.serviceCredential |
None | Databricks 서비스 자격 증명의 이름입니다. Databricks Runtime 16.1 이상에서 사용할 수 있습니다. |
Databricks 서비스 자격 증명을 사용할 수 없는 경우 다음 인증 옵션을 대신 제공할 수 있습니다.
| Key | 기본값 | 설명 |
|---|---|---|
cloudFiles.clientId |
None | 서비스 주체의 클라이언트 ID 또는 애플리케이션 ID입니다. |
cloudFiles.clientSecret |
None | 서비스 주체의 클라이언트 비밀입니다. |
cloudFiles.connectionString |
None | 계정 액세스 키 또는 SAS(공유 액세스 서명)를 기반으로 하는 스토리지 계정의 연결 문자열입니다. |
cloudFiles.tenantId |
None | 서비스 주체가 만들어지는 Azure 테넌트 ID입니다. |
자동 로더가 기존 큐를 사용하도록 설정한 cloudFiles.useNotifications = true 경우에만 다음 옵션을 제공합니다.
| Key | 기본값 | 설명 |
|---|---|---|
cloudFiles.queueName |
None | Azure 대기열의 이름입니다. 제공된 경우 클라우드 파일 원본은 고유한 Azure Event Grid 및 Queue Storage 서비스를 설정하는 대신 이 큐의 이벤트를 직접 사용합니다. 이 경우 databricks.serviceCredential 또는 cloudFiles.connectionString 큐에 대한 읽기 권한만 필요합니다. |
Gcp
자동 로더는 Databricks 서비스 자격 증명을 활용하여 자동으로 알림 서비스를 설정할 수 있습니다. Databricks 서비스 자격 증명으로 만든 서비스 계정에는 파일 알림 모드에서 자동 로더 스트림 구성에 지정된 권한이 필요합니다.
| Key | 기본값 | 설명 |
|---|---|---|
cloudFiles.projectId |
None | GCS 버킷이 있는 프로젝트의 ID입니다. Google Cloud Pub/Sub 구독도 이 프로젝트 내에서 만들어집니다. |
databricks.serviceCredential |
None | Databricks 서비스 자격 증명의 이름입니다. Databricks Runtime 16.1 이상에서 사용할 수 있습니다. |
Databricks 서비스 자격 증명을 사용할 수 없는 경우 Google 서비스 계정을 직접 사용할 수 있습니다. Google 서비스 설정에 따라 서비스 계정을 가정하도록 클러스터를 구성하거나 다음 인증 옵션을 직접 제공할 수 있습니다.
| Key | 기본값 | 설명 |
|---|---|---|
cloudFiles.client |
None | Google 서비스 계정의 클라이언트 ID입니다. |
cloudFiles.clientEmail |
None | Google 서비스 계정의 이메일입니다. |
cloudFiles.privateKey |
None | Google 서비스 계정에 대해 생성된 프라이빗 키입니다. |
cloudFiles.privateKeyId |
None | Google 서비스 계정에 대해 생성된 프라이빗 키의 ID입니다. |
cloudFiles.useNotifications
=
true를 선택하고 자동 로더가 이미 설정한 큐를 사용하도록 하려는 경우에만 다음 옵션을 제공합니다.
| Key | 기본값 | 설명 |
|---|---|---|
cloudFiles.subscription |
None | Google Cloud Pub/Sub 구독의 이름입니다. 제공되는 경우 클라우드 파일 원본은 자체 GCS 알림 및 Google Cloud Pub/Sub 서비스를 설정하는 대신 이 큐의 이벤트를 사용합니다. |
Delta Lake
다음 옵션은 .를 사용하여 spark.readStreamDelta Lake 테이블에서 읽을 때 적용됩니다.
| Key | 기본값 | 설명 |
|---|---|---|
allowSourceColumnDrop |
None | 델타 테이블 버전 번호로 설정하거나 "always" 원본 테이블 스키마에서 열을 삭제한 후에도 스트림을 계속할 수 있도록 합니다. 버전 번호로 설정하면 해당 버전까지 모든 스키마 변경 내용을 승인합니다.
schemaTrackingLocation가 필요합니다.
Delta Lake 열 매핑을 사용하여 열 이름 바꾸기 및 삭제를 참조하세요. |
allowSourceColumnRename |
None | 델타 테이블 버전 번호로 설정하거나 "always" 원본 테이블에서 열의 이름을 바꾼 후에도 스트림을 계속할 수 있도록 합니다. 버전 번호로 설정하면 해당 버전까지 모든 스키마 변경 내용을 승인합니다.
schemaTrackingLocation가 필요합니다.
Delta Lake 열 매핑을 사용하여 열 이름 바꾸기 및 삭제를 참조하세요. |
allowSourceColumnTypeChange |
None | 델타 테이블 버전 번호로 설정하거나 "always" 원본 테이블에서 열 형식이 변경된 후에도 스트림을 계속할 수 있도록 합니다. 버전 번호로 설정하면 해당 버전까지 모든 스키마 변경 내용을 승인합니다.
schemaTrackingLocation가 필요합니다.
타입 확장을 참조하세요. |
excludeRegex |
None | 정규식 패턴입니다. 경로가 패턴과 일치하는 파일은 스트리밍 읽기에서 제외됩니다. 예상된 명명 규칙을 준수하지 않는 파일을 필터링하는 데 유용합니다. |
failOnDataLoss |
true |
로그 보존(logRetentionDuration)으로 인해 원본 데이터가 삭제된 경우 스트리밍 쿼리에 실패할지 여부입니다. 누락된 데이터를 건너뛰고 처리를 계속하도록 false 설정합니다.
시간 이동 쿼리에 대한 데이터 보존 구성을 참조하세요. |
ignoreChanges(더 이상 사용되지 않음) |
false |
Databricks Runtime 11.3 LTS 이하에서 사용할 수 있습니다. , 또는 OVERWRITE. 등의 수정 작업 DELETEUPDATEMERGE INTO후에 다시 쓴 데이터 파일을 다시 내보냅니다. 변경되지 않은 행을 새 행과 함께 내보낼 수 있으므로 다운스트림 소비자는 중복 항목을 처리해야 합니다. 삭제는 다운스트림으로 전파되지 않습니다. Databricks Runtime 12.2 LTS 이상으로 대체 skipChangeCommits 되었습니다. |
ignoreDeletes(더 이상 사용되지 않음) |
false |
파티션 경계에서 데이터를 삭제하는 트랜잭션을 무시합니다(전체 파티션 삭제만 해당). 파티션이 아닌 삭제, 업데이트 또는 기타 수정은 처리하지 않습니다.
skipChangeCommits를 대신 사용하세요. |
readChangeFeed 또는 readChangeData |
false |
스트리밍 쿼리에 대한 변경 데이터 피드 읽기를 사용하도록 설정할지 여부입니다. 사용하도록 설정하면 스트림은 추가 메타데이터 열과 함께 행 수준 변경 내용(삽입, 업데이트 및 삭제)을 내보낸다. Azure Databricks에서 Delta Lake 변경 데이터 피드를 사용하는 방법을 참조하세요. |
schemaTrackingLocation |
None | Delta Lake가 스트리밍 읽기에 대한 스키마 변경 내용을 추적하는 디렉터리의 경로입니다. 열 매핑이 사용하도록 설정된 테이블에서 스트리밍하고 스키마 진화를 처리하는 옵션을 사용할 allowSourceColumn* 때 필요합니다. 스트리밍 쿼리 내에 checkpointLocation 있어야 합니다.
Delta Lake 열 매핑을 사용하여 열 이름 바꾸기 및 삭제를 참조하세요. |
skipChangeCommits |
false |
기존 레코드를 삭제하거나 수정하는 트랜잭션을 무시하고 추가만 처리합니다. Databricks는 변경 데이터 피드를 사용하지 않는 대부분의 워크로드에 이 옵션을 권장합니다. Databricks Runtime 12.2 LTS 이상에서 지원됩니다.
를 사용하여 업스트림 변경 커밋 건너뛰기 skipChangeCommits참조 |
startingTimestamp |
최신 버전 사용 가능 | 읽기를 시작할 타임스탬프입니다. 스트림은 지정된 타임스탬프에서 또는 그 이후에 커밋된 모든 테이블 변경 내용을 읽습니다. 타임스탬프가 사용 가능한 모든 테이블 커밋 앞에 오는 경우 스트림은 사용 가능한 가장 빠른 커밋에서 시작됩니다. 와 함께 startingVersion사용할 수 없습니다. 스트리밍 검사점이 이미 있는 경우 무시됩니다.유효한 값: 타임스탬프 문자열(예: "2019-01-01T00:00:00.000Z" )이거나 날짜 문자열(예: .)"2019-01-01" |
startingVersion |
최신 버전 사용 가능 | 읽기를 시작할 델타 테이블 버전입니다. 스트림은 지정된 버전에서 또는 그 이후에 커밋된 모든 변경 내용을 읽습니다. 가장 최근의 변경 내용에서만 시작하도록 지정 "latest" 합니다. 와 함께 startingTimestamp사용할 수 없습니다. 스트리밍 검사점이 이미 있는 경우 무시됩니다.
테이블 기록 작업을 참조하세요. |
withEventTimeOrder |
false |
레코드가 런타임 이벤트로 잘못 표시되고 워터마크가 있는 상태 저장 쿼리에서 삭제되지 않도록 초기 테이블 스냅샷을 이벤트 시간 버킷으로 나눕니다. 검사점을 삭제하지 않고 초기 스냅샷 처리를 시작한 후에는 변경할 수 없습니다. Databricks Runtime 11.3 LTS 이상에서 지원됩니다. 데이터를 삭제하지 않고 프로세스 초기 스냅샷을 참조하세요. |
Kafka
다음 옵션 중 하나 spark.readStream.format("kafka") 또는 spark.read.format("kafka")다음을 사용합니다.
| Key | 기본값 | 설명 |
|---|---|---|
assign |
None | 사용할 특정 파티션입니다. 또는 subscribePatternassign 옵션 중 subscribe하나를 정확히 지정해야 합니다. 유효한 값: JSON 문자열(예: {"topicA":[0,1],"topicB":[2,4]}. |
failOnDataLoss |
true |
예를 들어 삭제된 토픽 또는 오프셋 잘림으로 인해 데이터가 손실되었을 수 있는 경우 쿼리에 실패할지 여부입니다. 누락된 데이터를 건너뛰고 계속하도록 false 설정합니다. 유효한 값: true, . falseDatabricks는 데이터가 손실되었는지 여부를 보수적으로 추정합니다. 그러나 이로 인해 거짓 경보가 발생할 수 있습니다. |
fetchoffset.numretries |
3 |
Kafka 오프셋을 가져올 때 다시 시도 횟수가 실패합니다. 유효한 값: 음수가 아닌 정수입니다. |
fetchoffset.retryintervalms |
1000 |
오프셋 인출 재시도 사이의 간격(밀리초)입니다. 유효한 값: 음수가 아닌 정수입니다. |
groupIdPrefix |
spark-kafka-source (스트리밍), spark-kafka-relation (일괄 처리) |
자동 생성된 Kafka 소비자 그룹 ID에 사용할 사용자 지정된 접두사입니다. 명시적으로 설정된 경우 kafka.group.id 커넥터는 이 옵션을 무시합니다. 유효한 값: 모든 문자열입니다. |
includeHeaders |
false |
Kafka 메시지 헤더를 출력에 열로 포함할지 여부입니다. 유효한 값: true, . false |
kafkaconsumer.polltimeoutms |
None | Kafka 소비자 poll() 호출에 대한 시간 제한(밀리초)입니다. 유효한 값: 양의 정수입니다. |
kafka.bootstrap.servers |
None | Kafka broker에 대한 호스트:포트 주소의 쉼표로 구분된 목록입니다. Kafka 클라이언트의 bootstrap.servers 속성을 설정합니다.Kafka의 데이터가 없는 경우 이 broker 주소 목록에서 잘못된 주소를 확인합니다. broker 주소 목록이 올바르지 않으면 오류가 없을 수 있습니다. Kafka 클라이언트는 브로커를 최종적으로 사용할 수 있고 네트워크 오류가 발생할 때 영원히 다시 시도한다고 가정합니다. |
maxRecordsPerPartition |
None | 각 Spark 파티션에 대한 최대 레코드 수입니다. 설정하면 커넥터는 Kafka 파티션을 분할하여 각 Spark 파티션이 대부분의 레코드를 읽도록 합니다. 유효한 값: 양의 정수입니다. 이 옵션은 .와 함께 minPartitions사용할 수도 있습니다. 두 옵션을 모두 설정하면 Spark는 더 많은 파티션이 생성되는 옵션을 사용합니다. |
minPartitions |
None | Kafka에서 읽을 Spark 파티션의 최소 수입니다. 설정하면 커넥터는 큰 Kafka 파티션을 분할하여 병렬 처리를 증가합니다. 설정하지 않으면 Spark는 각 Kafka 토픽 파티션에 대해 하나의 파티션을 만듭니다. 데이터 기울이기 또는 최대 로드 처리에 유용합니다. 유효한 값: 양의 정수입니다. 이 옵션은 각 트리거에 대해 Kafka 소비자를 다시 초기화하여 SSL의 성능에 영향을 줄 수 있습니다. |
startingOffsets |
latest (스트리밍), earliest (일괄 처리) |
쿼리가 읽기를 시작하는 오프셋입니다. 유효한 값: earliest- latest또는 각 파티션에 대한 오프셋의 JSON 문자열(예: {"topicA":{"0":23,"1":-2}}. JSON 문자열 -1 에서 최신 오프셋입니다.
-2 는 가장 빠른 오프셋입니다.스트리밍 쿼리의 경우 이 옵션은 새 쿼리가 시작될 때만 적용됩니다. 다시 시작된 쿼리는 항상 검사점을 사용합니다. 쿼리하는 동안 새 파티션은 가장 빠른 오프셋에서 읽기 시작합니다. 일괄 처리 쿼리의 latest 경우 허용되지 않습니다. |
startingOffsetsByTimestamp |
None | 타임스탬프로 지정된 각 파티션의 시작 오프셋 목록(밀리초)입니다. 타임스탬프에 대한 오프셋이 없으면 쿼리 동작은 .에 의해 startingOffsetsByTimestampStrategy결정됩니다. 유효한 값: 각 파티션에 대한 타임스탬프의 JSON 문자열(예: {"topicA":{"0":1000,"1":2000}}.스트리밍 쿼리의 경우 이 옵션은 새 쿼리가 시작될 때만 적용됩니다. 다시 시작된 쿼리는 항상 검사점을 사용합니다. 쿼리하는 동안 새 파티션은 가장 빠른 오프셋에서 읽기 시작합니다. |
startingOffsetsByTimestampStrategy |
error |
또는 에 지정된 startingOffsetsByTimestampstartingTimestamp타임스탬프에 대한 오프셋을 찾을 수 없을 때 사용하는 전략입니다. 유효한 값: error (예외 발생), latest (사용 가능한 최신 오프셋 사용). |
startingTimestamp |
None | 모든 파티션에 적용되는 전역 시작 타임스탬프(밀리초)입니다. 타임스탬프에 대한 오프셋이 없으면 동작이 에 의해 startingOffsetsByTimestampStrategy제어됩니다. 유효한 값: 음수가 아닌 정수입니다. |
subscribe |
None | 구독할 항목입니다. 또는 subscribePatternassign 옵션 중 subscribe하나를 정확히 지정해야 합니다. 유효한 값: 토픽 이름의 쉼표로 구분된 목록입니다. |
subscribePattern |
None | 토픽을 구독하는 데 사용되는 패턴입니다. 또는 subscribePatternassign 옵션 중 subscribe하나를 정확히 지정해야 합니다.
topic.*을 예로 들 수 있습니다. 유효한 값: 모든 Java 정규식 문자열입니다. |
다음 옵션은 다음을 사용하여 스트리밍 읽기 spark.readStream.format("kafka")에만 적용합니다.
| Key | 기본값 | 설명 |
|---|---|---|
bytesEstimateWindowLength |
300s |
메트릭의 나머지 바이트를 estimatedTotalBytesBehindLatest 예측하는 데 사용되는 시간 창입니다. 유효한 값: 기간 문자열(예: 10m 또는 600s.
Kafka 메트릭 검색을 참조하세요. |
maxOffsetsPerTrigger |
None | 트리거 간격당 처리할 최대 오프셋 수입니다. 오프셋은 토픽 파티션에 비례하여 분산됩니다. 유효한 값: 양의 정수입니다. |
maxTriggerDelay |
15m |
트리거하기 전에 누적되기를 minOffsetsPerTrigger 기다리는 최대 시간입니다. 유효한 값: 기간 문자열(예: 10m 또는 600s. |
minOffsetsPerTrigger |
None | 마이크로 일괄 처리를 트리거하기 전에 누적할 최소 오프셋 수입니다. 도달하면 maxTriggerDelay 마이크로 일괄 처리는 관계없이 실행됩니다. 유효한 값: 양의 정수입니다. |
일괄 처리 읽기 spark.read.format("kafka")에만 적용되는 오프셋 옵션은 DataFrameReader Kafka 옵션을 참조하세요.
Kafka 클라이언트(kafka.*) 및 인증 옵션은 옵션을 참조하세요.
DataFrameWriter 옵션
이러한 옵션을 DataFrameWriter.option() 및 DataFrameWriterV2.option() 사용하여 Azure Databricks 데이터를 쓰는 방법을 제어합니다.
Example
다음 예제에서는 Delta Lake 테이블을 작성하도록 True 설정합니다mergeSchema.
Python
df.write.format("delta").option("mergeSchema", True).saveAsTable("my_table")
스칼라
df.write.format("delta").option("mergeSchema", "true").saveAsTable("my_table")
Avro
| Key | 기본값 | 설명 |
|---|---|---|
avroSchema |
None | 전체 Avro 스키마를 JSON 문자열로 사용합니다. Spark SQL 형식을 특정 Avro 형식으로 변환하려면 이 옵션을 사용합니다. Avro 파일에 적용됩니다. |
avroSchemaUrl |
None | Avro 스키마 파일을 가리키는 URL입니다. 스키마가 외부에 저장되는 경우 대신 avroSchema 사용합니다.
avroSchema와 상호 배타적입니다.
Avro 파일에 적용됩니다. |
compression |
snappy |
작성할 때 사용할 압축 코덱입니다. 유효한 값: uncompressed,deflate, snappy, bzip2xz, zstandard.
Avro 파일에 적용됩니다. |
recordName |
topLevelRecord |
출력 Avro 스키마의 최상위 레코드 이름입니다. Avro 파일에 적용됩니다. |
positionalFieldMatching |
false |
Spark 스키마와 Avro 스키마 사이의 열을 이름 대신 필드 위치별로 일치시킬지 여부입니다. Avro 파일에 적용됩니다. |
recordNamespace |
빈 문자열 | 출력 Avro 스키마의 최상위 레코드에 대한 네임스페이스입니다. Avro 파일에 적용됩니다. |
Delta Lake 및 Apache Iceberg
| Key | 기본값 | 설명 |
|---|---|---|
clusterByAuto |
false |
자동 liquid 클러스터링을 사용하도록 설정할지 여부, 여기서 Azure Databricks 쿼리 패턴에 따라 클러스터링 열을 선택합니다. 에서만 유효합니다 mode("overwrite"). 모드에서 append 사용할 수 없습니다. Databricks Runtime 16.4 이상에서 사용할 수 있습니다.
테이블에 액체 클러스터링 사용에 적용됩니다. |
mergeSchema |
None | 쓰기 작업에 스키마 진화를 사용하도록 설정할지 여부입니다. 원본 DataFrame의 새 열이 대상 테이블 스키마에 추가됩니다. 일괄 처리 및 스트리밍 추가에 적용됩니다. 업데이트 테이블 스키마에 적용됩니다. |
overwriteSchema |
None | 덮어쓸 때 테이블 스키마 및 분할을 바꿀지 여부입니다. 을 사용하지 않아도 mode("overwrite")됩니다replaceWhere. 와 함께 partitionOverwriteMode사용할 수 없습니다.
업데이트 테이블 스키마에 적용됩니다. |
partitionOverwriteMode |
None | 파티션 덮어쓰기 모드입니다.
dynamic 새 데이터가 포함된 파티션만 덮어쓰도록 설정하고 다른 모든 파티션은 변경되지 않습니다. 레거시 모드로, 서버리스 컴퓨팅 또는 Databricks SQL에서는 지원되지 않습니다. 유효한 값: static, . dynamic
Delta Lake를 사용하여 데이터를 선택적으로 덮어쓰는 데 적용됩니다. |
replaceOn |
None | 대상 테이블의 행과 일치하여 원본 쿼리의 행으로 바꾸는 부울 식입니다. 대상 테이블과 원본 쿼리 모두에서 열을 참조할 수 있습니다. 원본 행과 일치하는 대상의 행이 삭제되고 바뀝니다. 원본이 비어 있으면 삭제가 발생하지 않습니다. 열 참조를 명확하게 하는 데 사용합니다 targetAlias . Databricks Runtime 17.1 이상에서 사용할 수 있습니다.
Delta Lake를 사용하여 데이터를 선택적으로 덮어쓰는 데 적용됩니다. |
replaceUsing |
None | 대상 테이블과 원본 쿼리 사이의 행을 일치시킬 때 사용되는 열 이름의 쉼표로 구분된 목록입니다. 대상과 원본 모두 나열된 모든 열을 포함해야 합니다. 같음 비교에서 원본 행과 일치하는 대상의 행이 삭제되고 바뀝니다.
NULL 값은 같지 않은 것으로 처리되며 일치하지 않습니다. Databricks Runtime 16.3 이상에서 사용할 수 있습니다.
Delta Lake를 사용하여 데이터를 선택적으로 덮어쓰는 데 적용됩니다. |
replaceWhere |
None | 조건자 식입니다. 조건자와 일치하는 레코드만 원자성으로 덮어씁니다. Delta Lake를 사용하여 데이터를 선택적으로 덮어쓰는 데 적용됩니다. |
targetAlias |
None | 대상 테이블의 문자열 별칭입니다. 조건이 대상 테이블과 원본 쿼리 모두에서 열을 참조할 때 열 참조와 함께 replaceOn 사용하거나 replaceWhere 명확하게 지정합니다.
Delta Lake를 사용하여 데이터를 선택적으로 덮어쓰는 데 적용됩니다. |
txnAppId |
None | 작업에서 foreachBatch idempotent 쓰기에 대한 애플리케이션을 식별하는 고유 문자열입니다. 여러 Delta Lake 테이블에 정확히 한 번 쓰는지 확인하려면 함께 txnVersion 사용합니다.
idempotent 테이블 쓰기에 적용 foreachBatch 됩니다. |
txnVersion |
None | 작업에서 foreachBatch idempotent 쓰기의 트랜잭션 버전으로 사용되는 단조로 증가하는 수입니다. 여러 Delta Lake 테이블에 정확히 한 번 쓰는지 확인하려면 함께 txnAppId 사용합니다.
idempotent 테이블 쓰기에 적용 foreachBatch 됩니다. |
optimizeWrite |
None | 이 쓰기 작업에 대해 쓰기 자동 최적화를 사용하도록 설정할지 여부입니다. 구성을 재정의 spark.databricks.delta.optimizeWrite.enabled 합니다.
Azure Databricks Delta Lake는 무엇인가요? 적용됩니다. |
userMetadata |
None | 쓰기 작업에 대한 커밋 메타데이터에 추가된 사용자 정의 문자열입니다. 의 출력 DESCRIBE HISTORY에 표시됩니다.
사용자 지정 메타데이터를 사용하여 보강 테이블에 적용됩니다. |
Csv
| Key | 기본값 | 설명 |
|---|---|---|
charToEscapeQuoteEscaping |
\0 (사용하도록 설정되지 않음) |
따옴표 문자와 다를 때 이스케이프 문자를 이스케이프하는 데 사용되는 문자입니다. csv(DataFrameWriter)에 적용됩니다. |
compression |
none |
작성할 때 사용할 압축 코덱입니다. 유효한 값: none,bzip2, gzip, lz4snappy, deflatezstd.
csv(DataFrameWriter)에 적용됩니다. |
dateFormat |
yyyy-MM-dd |
날짜 열 값에 대한 문자열 서식을 지정합니다. csv(DataFrameWriter)에 적용됩니다. |
emptyValue |
빈 문자열 | 빈(null이 아닌) 값에 대해 작성된 문자열입니다. csv(DataFrameWriter)에 적용됩니다. |
encoding |
UTF-8 |
출력 파일의 문자 인코딩입니다. csv(DataFrameWriter)에 적용됩니다. |
escape |
\ |
따옴표 붙은 값을 이스케이프하는 데 사용되는 문자입니다. csv(DataFrameWriter)에 적용됩니다. |
escapeQuotes |
true |
따옴표 붙은 필드 값 내에서 따옴표 문자를 이스케이프할지 여부입니다. csv(DataFrameWriter)에 적용됩니다. |
header |
false |
열 이름을 출력의 첫 줄로 쓸지 여부입니다. csv(DataFrameWriter)에 적용됩니다. |
ignoreLeadingWhiteSpace |
false |
작성할 때 선행 공백을 값에서 트리밍할지 여부입니다. csv(DataFrameWriter)에 적용됩니다. |
ignoreTrailingWhiteSpace |
false |
쓸 때 값에서 후행 공백을 트리밍할지 여부입니다. csv(DataFrameWriter)에 적용됩니다. |
lineSep |
\n |
레코드 간에 사용되는 줄 구분 기호 문자열입니다. csv(DataFrameWriter)에 적용됩니다. |
locale |
en-US |
식별자 java.util.Locale입니다. 작성할 때 날짜 및 타임스탬프 값의 서식에 영향을 줍니다. |
nullValue |
빈 문자열 | null 값에 대해 작성된 문자열입니다. csv(DataFrameWriter)에 적용됩니다. |
quote |
" |
구분 기호가 포함된 필드 값을 인용하는 데 사용되는 문자입니다. csv(DataFrameWriter)에 적용됩니다. |
quoteAll |
false |
콘텐츠에 관계없이 모든 필드 값을 따옴표로 묶을지 여부입니다. csv(DataFrameWriter)에 적용됩니다. |
sep |
, |
필드 구분 기호 문자입니다. csv(DataFrameWriter)에 적용됩니다. |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
타임스탬프 열 값의 형식 문자열입니다. csv(DataFrameWriter)에 적용됩니다. |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
표준 시간대(TimestampNTZType) 열 값이 없는 타임스탬프의 형식 문자열입니다. |
Excel
| Key | 기본값 | 설명 |
|---|---|---|
dataAddress |
None | 쓰기의 시트 이름 또는 시작 셀입니다. 생략하면 셀A1에서 시작하는 시트 Sheet1 에 씁니다. 시트 이름("SheetName") 또는 단일 셀 참조("SheetName!A1")를 허용합니다. 셀 범위는 쓰기에 지원되지 않습니다. |
dateFormatInWrite |
yyyy-mm-dd |
Date 열에 적용된 셀 서식 문자열을 Excel. Excel 형식 구문을 사용합니다. |
headerRows |
0 |
열 이름을 첫 번째 행으로 쓸지 여부입니다. 유효한 값: 0, . 1 |
timestampNTZFormat |
yyyy-mm-dd hh:mm:ss |
TimestampNTZ 및 Timestamp 열에 적용된 셀 서식 문자열을 Excel. Excel 형식 구문을 사용합니다. |
version |
xlsx |
쓸 Excel 파일 형식 버전입니다. 유효한 값: xlsx, . xls |
Json
| Key | 기본값 | 설명 |
|---|---|---|
compression |
none |
작성할 때 사용할 압축 코덱입니다. 유효한 값: none,bzip2, gzip, lz4snappy, deflatezstd.
json에 적용됩니다(DataFrameWriter). |
dateFormat |
yyyy-MM-dd |
날짜 열 값에 대한 문자열 서식을 지정합니다. json에 적용됩니다(DataFrameWriter). |
encoding |
UTF-8 |
출력 파일의 문자 인코딩입니다. json에 적용됩니다(DataFrameWriter). |
ignoreNullFields |
의 값 spark.sql.jsonGenerator.ignoreNullFields |
JSON 출력에서 null 값이 있는 필드를 생략할지 여부입니다. json에 적용됩니다(DataFrameWriter). |
lineSep |
\n |
레코드 간에 사용되는 줄 구분 기호 문자열입니다. json에 적용됩니다(DataFrameWriter). |
locale |
en-US |
식별자 java.util.Locale입니다. 작성할 때 날짜 및 타임스탬프 값의 서식에 영향을 줍니다. |
pretty |
false |
예쁜(들여쓰기, 여러 줄) JSON 출력을 사용하도록 설정할지 여부입니다. |
sortKeys |
false |
출력에서 JSON 개체의 키를 사전순으로 정렬할지 여부입니다. 결정적 출력을 생성하는 데 유용합니다. |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
타임스탬프 열 값의 형식 문자열입니다. json에 적용됩니다(DataFrameWriter). |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
표준 시간대(TimestampNTZType) 열 값이 없는 타임스탬프의 형식 문자열입니다. |
writeNonAsciiCharacterAsCodePoint |
false |
출력에서 리터럴 UTF-8 문자 대신 비 ASCII 문자를 유니코드 이스케이프 시퀀스로 \uXXXX 인코딩할지 여부입니다. |
오크
| Key | 기본값 | 설명 |
|---|---|---|
compression |
zstd |
작성할 때 사용할 압축 코덱입니다. 유효한 값: none, uncompressed,snappy, zliblzo, zstd, lz4brotli.
orc(DataFrameWriter)에 적용됩니다. |
쪽모이 세공 마루
| Key | 기본값 | 설명 |
|---|---|---|
compression |
snappy |
작성할 때 사용할 압축 코덱입니다. 유효한 값: none,uncompressed, ,gzipsnappylzo, brotli, lz4, lz4_rawzstd.
parquet에 적용됩니다(DataFrameWriter). |
spark.sql.parquet.outputTimestampType |
INT96 |
타임스탬프 열을 인코딩하는 데 사용되는 실제 형식입니다. 유효한 값: INT96, TIMESTAMP_MICROS. TIMESTAMP_MILLIS 표준 타임스탬프 형식을 지원하지 않는 레거시 Parquet 판독기와의 호환성을 위해 사용합니다 INT96 . |
텍스트
| Key | 기본값 | 설명 |
|---|---|---|
compression |
none |
작성할 때 사용할 압축 코덱입니다. 유효한 값: none,bzip2, gzip, lz4snappy, deflatezstd. 텍스트에 적용됩니다 (DataFrameWriter). |
encoding |
UTF-8 |
출력 파일의 문자 인코딩입니다. |
lineSep |
\n |
레코드 간에 사용되는 줄 구분 기호 문자열입니다. 텍스트에 적용됩니다 (DataFrameWriter). |
Xml
| Key | 기본값 | 설명 |
|---|---|---|
arrayElementName |
item |
명시적 이름이 없는 배열 요소의 요소 이름입니다. xml에 적용됩니다 (DataFrameWriter). |
attributePrefix |
_ |
XML 특성에 해당하는 필드 이름 앞에 접두사입니다. xml에 적용됩니다 (DataFrameWriter). |
compression |
none |
작성할 때 사용할 압축 코덱입니다. 유효한 값: none,bzip2, gzip, lz4snappy, deflatezstd. xml에 적용됩니다 (DataFrameWriter). |
dateFormat |
yyyy-MM-dd |
날짜 열 값에 대한 문자열 서식을 지정합니다. xml에 적용됩니다 (DataFrameWriter). |
declaration |
version="1.0" encoding="UTF-8" standalone="yes" |
각 출력 파일의 맨 위에 기록된 XML 선언 문자열입니다. 선언을 표시하지 않는 빈 문자열로 설정합니다. xml에 적용됩니다 (DataFrameWriter). |
encoding |
UTF-8 |
출력 파일의 문자 인코딩입니다. xml에 적용됩니다 (DataFrameWriter). |
indent |
공백 4개 | 출력에서 자식 요소를 들여쓰는 데 사용되는 문자열입니다. 들여쓰기를 해제하고 각 행을 한 줄에 쓰려면 빈 문자열로 설정합니다. |
locale |
en-US |
식별자 java.util.Locale입니다. 작성할 때 날짜 및 타임스탬프 값의 서식에 영향을 줍니다. |
nullValue |
null |
null 값에 대해 작성된 문자열입니다. 로 null설정하면 null 필드에 대한 특성 및 자식 요소가 생략됩니다. xml에 적용됩니다 (DataFrameWriter). |
rootTag |
ROWS |
출력의 모든 행 요소를 래핑하는 루트 요소 태그입니다. xml에 적용됩니다 (DataFrameWriter). |
rowTag |
ROW |
출력의 행을 나타내는 요소 태그입니다. xml에 적용됩니다 (DataFrameWriter). |
singleVariantColumn |
None | XML 파일에 쓸 단일 Variant 열의 이름입니다. xml에 적용됩니다 (DataFrameWriter). |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
타임스탬프 열 값의 형식 문자열입니다. xml에 적용됩니다 (DataFrameWriter). |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
표준 시간대 열 값이 없는 타임스탬프의 형식 문자열입니다. xml에 적용됩니다 (DataFrameWriter). |
validateName |
true |
열 이름이 유효한 XML 요소 식별자가 아닌 경우 예외를 throw할지 여부입니다. xml에 적용됩니다 (DataFrameWriter). |
valueTag |
_VALUE |
특성 또는 자식 요소가 있는 XML 요소의 문자 데이터에 사용되는 필드 이름입니다. xml에 적용됩니다 (DataFrameWriter). |
DataStreamWriter 옵션
이러한 옵션을 DataStreamWriter.option() 사용하여 스트리밍 쓰기를 구성합니다.
Example
다음 예제에서는 스트림의 검사점 위치를 설정합니다.
Python
(df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.start("/path/to/table"))
스칼라
df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.start("/path/to/table")
일반적인
| Key | 기본값 | 설명 |
|---|---|---|
checkpointLocation |
없음(필수) | 스트리밍 쿼리에 대한 검사점 디렉터리의 경로입니다. 내결함성 및 정확히 한 번 처리 보장에 필요합니다. 각 스트리밍 쿼리는 고유한 검사점 위치를 사용해야 합니다. Databricks는 Unity 카탈로그 볼륨 또는 클라우드 스토리지 경로에 검사점을 저장하는 것이 좋습니다. 구조적 스트리밍 검사점을 참조하세요. |
path |
None | Parquet과 같은 파일 기반 스트리밍 싱크의 출력 경로입니다. 파일 기반 형식에만 적용됩니다. |
콘솔 싱크
| Key | 기본값 | 설명 |
|---|---|---|
numRows |
20 |
콘솔 싱크에 쓸 때 각 마이크로 일괄 처리에 대해 표시할 행 수입니다. |
truncate |
true |
행을 표시할 때 긴 문자열을 잘릴지 여부입니다. 전체 문자열 값을 표시하도록 false 설정합니다. |
Delta Lake
다음 옵션은 을 사용하여 format("delta")Delta Lake 테이블에 스트림을 쓸 때 적용됩니다. 와 같은 overwriteSchemareplaceWhere덮어쓰기 전용 옵션이며 partitionOverwriteMode 스트리밍 쓰기에는 지원되지 않습니다.
| Key | 기본값 | 설명 |
|---|---|---|
mergeSchema |
false |
스트리밍 DataFrame에 새 열이 포함될 때 Delta Lake 테이블 스키마를 개발할지 여부입니다. 추가 출력 모드에만 적용됩니다. 업데이트 테이블 스키마에 적용됩니다. |
userMetadata |
None | 쓰기 작업에 대한 커밋 메타데이터에 추가된 사용자 정의 문자열입니다. 의 출력 DESCRIBE HISTORY에 표시됩니다.
사용자 지정 메타데이터를 사용하여 보강 테이블에 적용됩니다. |
파일 싱크
다음 옵션은 파일 기반 형식(Parquet, JSON, CSV, ORC, text)에 스트림을 쓸 때 적용됩니다. 형식별 옵션은 DataFrameWriter 옵션을 참조하세요.
| Key | 기본값 | 설명 |
|---|---|---|
retention |
None | 내결함성 및 압축에 사용되는 싱크 메타데이터 파일을 보존하는 기간입니다. 와 같은 7 days 시간 문자열을 허용합니다 24 hours. 설정하지 않으면 메타데이터 파일이 무기한 보존됩니다. |
Kafka 싱크
Kafka에 스트림을 쓰기 위한 옵션의 전체 목록은 옵션을 참조하세요.
| Key | 기본값 | 설명 |
|---|---|---|
kafka.bootstrap.servers |
None | 필수입니다. Kafka broker host:port 주소의 쉼표로 구분된 목록입니다. |
topic |
None | 모든 행에 대한 대상 Kafka 토픽입니다. DataFrame에 열이 없는 경우 필수입니다 topic . |
kafka.* |
None | 접두사로 kafka.붙는 Kafka 생산자 구성
kafka.compression.type을 예로 들 수 있습니다. |
메모리 싱크
| Key | 기본값 | 설명 |
|---|---|---|
queryName |
없음(필수) | 쿼리가 쓰는 메모리 내 테이블의 이름입니다. 메모리 싱크에 필요합니다. 을 통해 .queryName()구성할 수도 있습니다. |
mode |
exactlyonce |
메모리 싱크에 대한 배달 보장입니다.
exactlyonce 는 정확히 한 번 의미 체계를 사용하여 마이크로 일괄 처리 모드를 사용합니다.
atleastonce 는 한 번 이상의 의미 체계를 사용하여 연속 모드를 사용합니다. 유효한 값: exactlyonce, . atleastonce |
Spark 함수 옵션
일부 Spark SQL 기본 제공 함수는 구문 분석 또는 serialization 동작을 제어하는 맵을 허용 options 합니다. 옵션을 Python dict 또는 Scala Map[String, String] 전달합니다.
Example
다음 예제에서는 형식이 잘못된 레코드를 삭제하는 동안 JSON 열을 구문 분석합니다.
Python
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([StructField("name", StringType())])
df = df.withColumn("parsed", from_json("json_col", schema, {"mode": "DROPMALFORMED"}))
스칼라
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._
val schema = StructType(Seq(StructField("name", StringType)))
val df = df.withColumn("parsed", from_json(col("json_col"), schema, Map("mode" -> "DROPMALFORMED")))
Avro
Avro 함수는 해당 DataFrame 옵션과 동일한 옵션을 허용합니다.
-
from_avroDataFrameReaderschema_of_avroAvro 옵션을 사용합니다. -
to_avro에서는 DataFrameWriter Avro 옵션을 사용합니다.
Example
다음 예제에서는 스키마 진화를 사용하도록 설정된 Avro 열을 디코딩합니다.
Python
from pyspark.sql.functions import from_avro
df = df.withColumn("decoded", from_avro("avro_col", json_schema, {"avroSchemaEvolutionMode": "restart"}))
스칼라
import org.apache.spark.sql.avro.functions.from_avro
val df = df.withColumn("decoded", from_avro(col("avro_col"), jsonSchema, Map("avroSchemaEvolutionMode" -> "restart")))
또한 스키마 레지스트리의 변형은 from_avro 다음과 같은 옵션을 적용합니다 to_avro .
| Key | 기본값 | 설명 |
|---|---|---|
schemaId |
None | 호환되지 않는 jsonFormatSchema스키마로 인코딩된 Avro 데이터를 디코딩할 때 사용할 Confluent 스키마 레지스트리의 스키마 ID입니다. 적용 대상 from_avro 입니다. |
confluent.schema.registry.* |
None | Confluent Schema Registry 클라이언트 구성 속성입니다. 이 접두사를 사용하여 Confluent SR 클라이언트 속성을 전달합니다(예 confluent.schema.registry.basic.auth.user.info : 기본 인증 자격 증명). 및 .의 스키마 레지스트리 변형에 from_avroto_avro필요합니다. |
Csv
CSV 함수는 해당 DataFrame 옵션과 동일한 옵션을 허용합니다.
-
from_csvDataFrameReaderschema_of_csvCSV 옵션을 사용합니다. -
to_csv에서는 DataFrameWriter CSV 옵션을 사용합니다.
Example
다음 예제에서는 사용자 지정 구분 기호 및 NULL 값이 있는 CSV를 읽습니다.
Python
from pyspark.sql.functions import from_csv
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([StructField("id", IntegerType()), StructField("name", StringType())])
df = df.withColumn("parsed", from_csv("csv_col", schema, {"sep": "|", "nullValue": "N/A"}))
스칼라
import org.apache.spark.sql.functions.from_csv
import org.apache.spark.sql.types._
val schema = StructType(Seq(StructField("id", IntegerType), StructField("name", StringType)))
val df = df.withColumn("parsed", from_csv(col("csv_col"), schema, Map("sep" -> "|", "nullValue" -> "N/A")))
Json
JSON 함수는 해당 DataFrame 옵션과 동일한 옵션을 허용합니다.
-
from_jsonDataFrameReaderschema_of_jsonJSON 옵션을 사용합니다. -
to_json에서는 DataFrameWriter JSON 옵션을 사용합니다.
Example
다음 예제에서는 필드가 무시되고 서식이 매우 좋은 JSON NULL 을 씁니다.
Python
from pyspark.sql.functions import to_json
df = df.withColumn("json_str", to_json("struct_col", {"pretty": "true", "ignoreNullFields": "true"}))
스칼라
import org.apache.spark.sql.functions.to_json
val df = df.withColumn("json_str", to_json(col("struct_col"), Map("pretty" -> "true", "ignoreNullFields" -> "true")))
Protobuf
from_protobuf 파일 to_protobuf 기반 DataSource를 사용하지 않습니다. Protobuf 데이터는 항상 이러한 함수를 사용하여 이진 열로 읽고 작성됩니다. 옵션은 a Map[String, String] 로 전달되며 대/소문자를 구분합니다.
Example
다음 예제에서는 PERMISSIVE 모드를 사용하여 Protobuf 열을 디코딩합니다.
Python
from pyspark.sql.functions import from_protobuf
df = df.withColumn("decoded", from_protobuf("proto_col", "MyMessage", "/path/to/descriptor.desc",
{"mode": "PERMISSIVE", "enums.as.ints": "true"}))
스칼라
import org.apache.spark.sql.protobuf.functions.from_protobuf
val df = df.withColumn("decoded", from_protobuf(col("proto_col"), "MyMessage", "/path/to/descriptor.desc",
Map("mode" -> "PERMISSIVE", "enums.as.ints" -> "true")))
Protobuf 함수는 다음 옵션을 사용합니다.
| Key | 기본값 | 설명 |
|---|---|---|
mode |
FAILFAST |
손상된 레코드를 처리하는 방법입니다.
FAILFAST 예외를 throw합니다.
PERMISSIVE 형식이 잘못된 필드를 null로 설정합니다. 유효한 값: FAILFAST, . PERMISSIVE 적용 대상 from_protobuf입니다. |
recursive.fields.max.depth |
-1 (사용 안 함) |
재귀 Protobuf 필드의 최대 재귀 깊이입니다. 재귀 필드 지원을 끄도록 0 설정합니다. 유효한 값: 0 to .10 적용 대상 from_protobuf입니다. |
convert.any.fields.to.json |
false |
Protobuf Any 필드를 STRUCT. 적용 대상 from_protobuf입니다. |
emit.default.values |
false |
0 또는 기본값(proto3 의미 체계)을 사용하여 필드를 내보내는지 여부입니다. 이 경우 false기본값이 있는 필드가 출력에서 생략됩니다. 적용 대상 from_protobuf입니다. |
enums.as.ints |
false |
열거형 필드를 문자열 대신 정수 값으로 렌더링할지 여부입니다. 적용 대상 from_protobuf입니다. |
upcast.unsigned.ints |
false |
정수 오버플로를 Longuint64 방지하기 위해 Decimal(20,0) 업캐스트 uint32 할지 여부입니다. 적용 대상 from_protobuf입니다. |
unwrap.primitive.wrapper.types |
false |
래퍼 형식(예Int32Value: 및StringValue)을 해당 기본 Spark 형식으로 래핑 google.protobuf 해제할지 여부입니다. 적용 대상 from_protobuf입니다. |
retain.empty.message.types |
false |
더미 열을 삽입하여 출력 스키마에 빈 Protobuf 메시지 형식을 유지할지 여부입니다. 적용 대상 from_protobuf입니다. |
schema.registry.subject |
None | 스키마 레지스트리 주체 이름입니다. 및 .의 스키마 레지스트리 변형을 from_protobufto_protobuf사용할 때 필요합니다. |
schema.registry.address |
None | 스키마 레지스트리 주소(호스트 및 포트). 및 .의 스키마 레지스트리 변형을 from_protobufto_protobuf사용할 때 필요합니다. |
schema.registry.protobuf.name |
None | 스키마 레지스트리 제목에 여러 메시지가 포함될 때 사용할 Protobuf 메시지를 지정합니다. Optional. |
Xml
XML 함수는 해당 DataFrame 옵션과 동일한 옵션을 허용합니다.
-
from_xmlDataFrameReaderschema_of_xmlXML 옵션을 사용합니다. -
to_xml에서는 DataFrameWriter XML 옵션을 사용합니다.
Example
다음 예제에서는 사용자 지정 루트 및 행 태그를 사용하여 XML을 씁니다.
Python
from pyspark.sql.functions import to_xml
df = df.withColumn("xml_str", to_xml("struct_col", {"rootTag": "records", "rowTag": "record"}))
스칼라
import org.apache.spark.sql.functions.to_xml
val df = df.withColumn("xml_str", to_xml(col("struct_col"), Map("rootTag" -> "records", "rowTag" -> "record")))