중요합니다
파이프라인 create_sink API는 공개 미리 보기로 제공됩니다.
이 create_sink() 함수는 선언적 파이프라인에서 Apache Kafka나 Azure Event Hubs와 같은 이벤트 스트리밍 서비스나 델타 테이블에 기록합니다.
create_sink() 함수를 사용하여 싱크를 만든 후, 부가 흐름을(를) 사용하여 싱크에 데이터를 씁니다. 추가 흐름은 create_sink() 함수에서 지원되는 유일한 흐름 형식입니다.
create_auto_cdc_flow같은 다른 흐름 형식은 지원되지 않습니다.
델타 싱크는 Unity 카탈로그 외부 및 관리 테이블 및 Hive 메타스토어 관리 테이블을 지원합니다. 테이블 이름은 완전하게 지정되어야 합니다. 예를 들어 Unity 카탈로그 테이블은 <catalog>.<schema>.<table>3계층 식별자를 사용해야 합니다. Hive 메타스토어 테이블은 <schema>.<table>사용해야 합니다.
비고
- 전체 새로 고침 업데이트 실행해도 싱크에서 데이터가 지워지지 않습니다. 다시 처리된 데이터는 싱크에 추가되고 기존 데이터는 변경되지 않습니다.
- API에서는
sink로 기대를 지원하지 않습니다.
Syntax
from pyspark import pipelines as dp
dp.create_sink(name=<sink_name>, format=<format>, options=<options>)
매개 변수
| 매개 변수 | 유형 | Description |
|---|---|---|
name |
str |
필수 사항입니다. 싱크를 식별하고 싱크를 참조하고 관리하는 데 사용되는 문자열입니다. 싱크 이름은 파이프라인의 일부인 모든 소스 코드 파일을 포함하여 파이프라인에 고유해야 합니다. |
format |
str |
필수 사항입니다. 출력 형식(kafka 또는 delta)을 정의하는 문자열입니다. |
options |
dict |
키와 값이 모두 문자열인 싱크 옵션 목록입니다 {"key": "value"}. Kafka 및 델타 싱크에서 지원하는 모든 Databricks 런타임 옵션이 지원됩니다.
|
예시
from pyspark import pipelines as dp
# Create a Kafka sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
# Create an external Delta table sink with a file path
dp.create_sink(
"my_delta_sink",
"delta",
{ "path": "/path/to/my/delta/table" }
)
# Create a Delta table sink using a table name
dp.create_sink(
"my_delta_sink",
"delta",
{ "tableName": "my_catalog.my_schema.my_table" }
)