다음을 통해 공유


create_sink

중요합니다

파이프라인 create_sink API는 공개 미리 보기로 제공됩니다.

create_sink() 함수는 선언적 파이프라인에서 Apache Kafka나 Azure Event Hubs와 같은 이벤트 스트리밍 서비스나 델타 테이블에 기록합니다. create_sink() 함수를 사용하여 싱크를 만든 후, 부가 흐름을(를) 사용하여 싱크에 데이터를 씁니다. 추가 흐름은 create_sink() 함수에서 지원되는 유일한 흐름 형식입니다. create_auto_cdc_flow같은 다른 흐름 형식은 지원되지 않습니다.

델타 싱크는 Unity 카탈로그 외부 및 관리 테이블 및 Hive 메타스토어 관리 테이블을 지원합니다. 테이블 이름은 완전하게 지정되어야 합니다. 예를 들어 Unity 카탈로그 테이블은 <catalog>.<schema>.<table>3계층 식별자를 사용해야 합니다. Hive 메타스토어 테이블은 <schema>.<table>사용해야 합니다.

비고

  • 전체 새로 고침 업데이트 실행해도 싱크에서 데이터가 지워지지 않습니다. 다시 처리된 데이터는 싱크에 추가되고 기존 데이터는 변경되지 않습니다.
  • API에서는 sink로 기대를 지원하지 않습니다.

Syntax

from pyspark import pipelines as dp

dp.create_sink(name=<sink_name>, format=<format>, options=<options>)

매개 변수

매개 변수 유형 Description
name str 필수 사항입니다. 싱크를 식별하고 싱크를 참조하고 관리하는 데 사용되는 문자열입니다. 싱크 이름은 파이프라인의 일부인 모든 소스 코드 파일을 포함하여 파이프라인에 고유해야 합니다.
format str 필수 사항입니다. 출력 형식(kafka 또는 delta)을 정의하는 문자열입니다.
options dict 키와 값이 모두 문자열인 싱크 옵션 목록입니다 {"key": "value"}. Kafka 및 델타 싱크에서 지원하는 모든 Databricks 런타임 옵션이 지원됩니다.

예시

from pyspark import pipelines as dp

# Create a Kafka sink
dp.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Create an external Delta table sink with a file path
dp.create_sink(
  "my_delta_sink",
    "delta",
    { "path": "/path/to/my/delta/table" }
)

# Create a Delta table sink using a table name
dp.create_sink(
  "my_delta_sink",
    "delta",
    { "tableName": "my_catalog.my_schema.my_table" }
)