Delta Live Tables Python 언어 참조

이 문서에서는 Delta Live Tables Python 프로그래밍 인터페이스에 대한 세부 정보를 제공합니다.

SQL API에 대한 자세한 내용은 Delta Live Tables SQL 언어 참조를 참조하세요.

자동 로더 구성과 관련된 자세한 내용은 자동 로더란?을 참조하세요.

제한 사항

Delta Live Tables Python 인터페이스에는 다음과 같은 제한 사항이 있습니다.

  • Python tableview 함수는 DataFrame을 반환해야 합니다. DataFrames에서 작동하는 일부 함수는 DataFrames를 반환하지 않으며 사용하지 않아야 합니다. DataFrame 변환은 전체 데이터 흐름 그래프가 확인된 후에 실행되므로 이러한 작업을 사용하면 의도하지 않은 부작용이 발생할 수 있습니다. 이러한 작업에는 collect(), count(), toPandas(), save()saveAsTable()과 같은 함수가 포함됩니다. 그러나 이 코드는 그래프 초기화 단계에서 한 번 실행되므로 이러한 함수를 table 또는 view 함수 정의 외부에 포함할 수 있습니다.
  • pivot() 함수는 지원되지 않습니다. Spark에서 pivot 작업을 수행하려면 출력의 스키마를 계산하기 위해 입력 데이터를 즉시 로드해야 합니다. 이 기능은 Delta Live Tables에서 지원되지 않습니다.

dlt Python 모듈 가져오기

Delta Live Tables Python 함수는 dlt 모듈에 정의되어 있습니다. Python API를 사용하여 구현된 파이프라인은 다음 모듈을 가져와야 합니다.

import dlt

Delta Live Tables 구체화된 뷰 또는 스트리밍 테이블 만들기

Python에서 Delta Live Tables는 정의 쿼리를 기반으로 데이터 세트를 구체화된 뷰로 업데이트할지 아니면 스트리밍 테이블로 업데이트할지를 결정합니다. @table 데코레이터는 구체화된 뷰와 스트리밍 테이블을 모두 정의하는 데 사용됩니다.

Python에서 구체화된 뷰를 정의하려면 데이터 원본에 대해 정적 읽기를 수행하는 쿼리에 적용 @table 합니다. 스트리밍 테이블을 정의하려면 데이터 원본에 대해 스트리밍 읽기를 수행하는 쿼리에 적용 @table 합니다. 두 데이터 세트 형식의 구문 사양은 다음과 같습니다.

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  schema="schema-definition",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

델타 라이브 테이블 보기 만들기

Python에서 뷰를 정의하려면 @view 데코레이터를 적용합니다. 데코레이터와 @table 마찬가지로 정적 또는 스트리밍 데이터 세트에 델타 라이브 테이블의 보기를 사용할 수 있습니다. 다음은 Python을 사용하여 뷰를 정의하는 구문입니다.

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

예: 테이블 및 뷰 정의

Python에서 테이블 또는 뷰를 정의하려면 함수에 @dlt.view 또는 @dlt.table 데코레이터를 적용합니다. 함수 이름 또는 name 매개 변수를 사용하여 테이블 또는 뷰 이름을 할당할 수 있습니다. 다음 예제는 JSON 파일을 입력 원본으로 사용하는 taxi_raw라는 뷰와 taxi_raw 뷰를 입력으로 사용하는 filtered_data라는 테이블의 두 가지 데이터 세트를 정의합니다.

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return dlt.read("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return dlt.read("taxi_raw").where(...)

예: 동일한 파이프라인에 정의된 데이터 세트에 액세스

외부 데이터 원본에서 읽는 것 외에도 Delta Live Tables read() 함수를 사용하여 동일한 파이프라인에 정의된 데이터 세트에 액세스할 수 있습니다. 다음 예제에서는 함수를 customers_filtered 사용하여 데이터 세트를 만드는 방법을 read() 보여 줍니다.

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return dlt.read("customers_raw").where(...)

함수를 spark.table() 사용하여 동일한 파이프라인에 정의된 데이터 세트에 액세스할 수도 있습니다. spark.table() 함수를 사용하여 파이프라인에 정의된 데이터 세트에 액세스하는 경우 함수 인수에서 데이터 세트 이름 앞에 LIVE 키워드를 추가합니다.

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredB():
  return spark.table("LIVE.customers_raw").where(...)

예: 메타스토어에 등록된 테이블에서 읽기

Hive 메타스토어에 등록된 테이블에서 데이터를 읽으려면 함수 인수에서 키워드(keyword) 생략 LIVE 하고 필요에 따라 테이블 이름을 데이터베이스 이름으로 한정합니다.

@dlt.table
def customers():
  return spark.table("sales.customers").where(...)

Unity 카탈로그 테이블에서 읽는 예제는 Unity 카탈로그 파이프라인으로 데이터 수집을 참조하세요.

예: 를 사용하여 데이터 세트에 액세스 spark.sql

쿼리 함수에서 spark.sql 식을 사용하여 데이터 세트를 반환할 수도 있습니다. 내부 데이터 세트에서 읽으려면 데이터 세트 이름 앞에 LIVE.을 추가합니다.

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")

스트리밍 작업의 대상으로 사용할 테이블 만들기

이 함수를 create_streaming_table() 사용하여 apply_changes() @append_flow 출력 레코드를 포함하여 스트리밍 작업으로 레코드 출력에 대한 대상 테이블을 만듭니다.

참고 항목

create_target_table() 함수 및 create_streaming_live_table() 함수는 더 이상 사용되지 않습니다. Databricks에서는 create_streaming_table() 함수를 사용하도록 기존 코드를 업데이트할 것을 권장합니다.

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>"
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"}
)
인수
name

유형: str

테이블 이름.

이 매개 변수는 필수입니다.
comment

유형: str

테이블에 대한 선택적 설명입니다.
spark_conf

유형: dict

이 쿼리를 실행하기 위한 선택적 Spark 구성 목록입니다.
table_properties

유형: dict

테이블에 대한 테이블 속성의 선택적 목록입니다.
partition_cols

유형: array

테이블 분할에 사용할 하나 이상의 열에 대한 선택적 목록입니다.
path

유형: str

테이블 데이터에 대한 선택적 스토리지 위치입니다. 설정하지 않으면 시스템은 기본적으로 파이프라인 스토리지 위치로 설정됩니다.
schema

형식: str 또는 StructType

테이블에 대한 선택적 스키마 정의입니다. 스키마는 SQL DDL 문자열 또는 Python으로 정의할 수 있습니다.
StructType.
expect_all
expect_all_or_drop
expect_all_or_fail

유형: dict

테이블에 대한 선택적 데이터 품질 제약 조건입니다. 여러 기대를 확인 합니다.

테이블 구체화 방법 제어

테이블은 또한 구체화에 대한 추가 제어를 제공합니다.

  • partition_cols를 사용하여 테이블을 분할하는 방법을 지정합니다. 분할을 사용하여 쿼리 속도를 높일 수 있습니다.
  • 뷰 또는 테이블을 정의할 때 테이블 속성을 설정할 수 있습니다. Delta Live Tables 테이블 속성을 참조 하세요.
  • path 설정을 사용하여 테이블 데이터의 스토리지 위치를 설정합니다. 기본적으로 테이블 데이터는 path이 설정되지 않은 경우 파이프라인 스토리지 위치에 저장됩니다.
  • 스키마 정의에서 생성된 열을 사용할 수 있습니다. 예제 : 스키마 및 파티션 열을 지정합니다.

참고 항목

크기가 1TB 미만인 테이블의 경우 Databricks는 Delta Live Tables가 데이터 조직을 제어하도록 하는 것이 좋습니다. 테이블이 테라바이트 이상으로 증가할 것으로 예상하지 않는 한 일반적으로 파티션 열을 지정해서는 안 됩니다.

예: 스키마 및 파티션 열 지정

선택적으로 Python StructType 또는 SQL DDL 문자열을 사용하여 테이블 스키마를 지정할 수 있습니다. DDL 문자열로 지정하면 정의에 생성된 열이 포함될 수 있습니다.

다음 예제에서는 PythonStructType을 사용하여 지정된 스키마를 사용하여 호출 sales 되는 테이블을 만듭니다.

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

다음 예제에서는 DDL 문자열을 사용하여 테이블에 대한 스키마를 지정하고, 생성된 열을 정의하고, 파티션 열을 정의합니다.

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

기본적으로 Delta Live Tables는 스키마를 지정하지 않으면 table 정의에서 스키마를 유추합니다.

원본 스트리밍 테이블의 변경 내용을 무시하도록 스트리밍 테이블 구성

참고 항목

  • 플래그는 skipChangeCommits 함수를 사용하는 경우에만 spark.readStream 작동합니다 option() . 함수에서는 이 플래그를 dlt.read_stream() 사용할 수 없습니다.
  • 원본 스트리밍 테이블이 skipChangeCommits apply_changes() 함수의 대상으로 정의되면 플래그를 사용할 수 없습니다.

기본적으로 스트리밍 테이블에는 추가 전용 원본이 필요합니다. 스트리밍 테이블이 다른 스트리밍 테이블을 원본으로 사용하고 원본 스트리밍 테이블에 업데이트 또는 삭제가 필요한 경우(예: GDPR "잊혀질 권리" 처리 skipChangeCommits ) 이러한 변경 내용을 무시하기 위해 원본 스트리밍 테이블을 읽을 때 플래그를 설정할 수 있습니다. 이 플래그에 대한 자세한 내용은 업데이트 및 삭제 무시를 참조 하세요.

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")

Python Delta Live Tables 속성

다음 표에서는 Delta Live Tables를 사용하여 테이블 및 뷰를 정의하는 동안 지정할 수 있는 옵션 및 속성에 대해 설명합니다.

@table 또는 @view
name

유형: str

테이블 또는 뷰의 선택적 이름입니다. 정의되지 않은 경우 함수 이름이 테이블 또는 뷰 이름으로 사용됩니다.
comment

유형: str

테이블에 대한 선택적 설명입니다.
spark_conf

유형: dict

이 쿼리를 실행하기 위한 선택적 Spark 구성 목록입니다.
table_properties

유형: dict

테이블에 대한 테이블 속성의 선택적 목록입니다.
path

유형: str

테이블 데이터에 대한 선택적 스토리지 위치입니다. 설정하지 않으면 시스템은 기본적으로 파이프라인 스토리지 위치로 설정됩니다.
partition_cols

유형: a collection of str

선택적 컬렉션(예: list테이블 분할에 사용할 하나 이상의 열)입니다.
schema

형식: str 또는 StructType

테이블에 대한 선택적 스키마 정의입니다. 스키마는 SQL DDL 문자열 또는 Python으로 정의할 수 있습니다.
StructType.
temporary

유형: bool

테이블을 만들지만 테이블에 대한 메타데이터는 게시하지 않습니다. temporary 키워드(keyword) 파이프라인에서 사용할 수 있지만 파이프라인 외부에서 액세스해서는 안 되는 테이블을 만들도록 Delta Live Tables에 지시합니다. 처리 시간을 줄이기 위해 임시 테이블은 단일 업데이트가 아니라 해당 테이블을 만드는 파이프라인의 수명 동안 유지됩니다.

기본값은 ‘False’입니다.
테이블 또는 뷰 정의
def <function-name>()

데이터 세트를 정의하는 Python 함수입니다. name 매개 변수가 설정되지 않은 경우 <function-name>이 대상 데이터 세트 이름으로 사용됩니다.
query

Spark Dataset 또는 Koalas DataFrame을 반환하는 Spark SQL 문입니다.

dlt.read() 또는 spark.table()을 사용하여 동일한 파이프라인에 정의된 데이터 세트에서 전체 읽기를 수행합니다. spark.table() 함수를 사용하여 동일한 파이프라인에 정의된 데이터 세트에서 읽을 때 함수 인수의 데이터 세트 이름 앞에 LIVE 키워드를 추가합니다. 예를 들어, customers라는 데이터 세트에서 읽으려면 다음을 수행합니다.

spark.table("LIVE.customers")

또한 spark.table() 함수를 사용하여 LIVE 키워드를 생략하고 필요에 따라 테이블 이름을 데이터베이스 이름으로 한정하여 메타스토어에 등록된 테이블에서 읽을 수 있습니다.

spark.table("sales.customers")

dlt.read_stream()을 사용하여 동일한 파이프라인에 정의된 데이터 세트에서 스트리밍 읽기를 수행합니다.

spark.sql 함수를 사용하여 SQL 쿼리를 정의하여 반환 데이터 세트를 만듭니다.

PySpark 구문을 사용하여 Python으로 Delta Live Tables 쿼리를 정의합니다.
기대치
@expect("description", "constraint")

다음으로 식별되는 데이터 품질 제약 조건을 선언합니다.
description. 행이 예상을 위반하는 경우 대상 데이터 세트에 행을 포함합니다.
@expect_or_drop("description", "constraint")

다음으로 식별되는 데이터 품질 제약 조건을 선언합니다.
description. 행이 예상을 위반하는 경우 대상 데이터 세트에서 행을 삭제합니다.
@expect_or_fail("description", "constraint")

다음으로 식별되는 데이터 품질 제약 조건을 선언합니다.
description. 행이 예상을 위반하는 경우 즉시 실행을 중지합니다.
@expect_all(expectations)

하나 이상의 데이터 품질 제약 조건을 선언합니다.
expectations는 키가 예상 설명이고 값이 예상 제약 조건인 Python 사전입니다. 행이 예상을 위반하는 경우 대상 데이터 세트에 행을 포함합니다.
@expect_all_or_drop(expectations)

하나 이상의 데이터 품질 제약 조건을 선언합니다.
expectations는 키가 예상 설명이고 값이 예상 제약 조건인 Python 사전입니다. 행이 예상을 위반하는 경우 대상 데이터 세트에서 행을 삭제합니다.
@expect_all_or_fail(expectations)

하나 이상의 데이터 품질 제약 조건을 선언합니다.
expectations는 키가 예상 설명이고 값이 예상 제약 조건인 Python 사전입니다. 행이 예상을 위반하는 경우 즉시 실행을 중지합니다.

Delta Live Tables에서 Python을 사용하여 데이터 캡처 변경

Python API의 apply_changes() 함수를 사용하여 Delta Live Tables CDC 기능을 사용합니다. Delta Live Tables Python 인터페이스는 create_streaming_table() 함수도 제공합니다. 이 함수를 사용하여 apply_changes() 함수에 필요한 대상 테이블을 만들 수 있습니다.

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

참고 항목

INSERTUPDATE 이벤트의 기본 동작은 원본의 CDC 이벤트를 upsert하는 것입니다. 즉, 대상 테이블에서 지정된 키와 일치하는 모든 행을 업데이트하거나, 대상 테이블에 일치하는 레코드가 없는 경우 새 행을 삽입합니다. DELETE 이벤트에 대한 처리는 APPLY AS DELETE WHEN 조건을 사용하여 지정할 수 있습니다.

Important

변경 내용을 적용하려면 대상 스트리밍 테이블을 선언해야 합니다. 필요에 따라 대상 테이블에 대한 스키마를 지정할 수 있습니다. apply_changes 대상 테이블의 스키마를 지정할 때 sequence_by 필드와 데이터 형식이 동일한 __START_AT__END_AT 열도 포함해야 합니다.

APPLY CHANGES API: Delta Live Tables에서 변경 데이터 캡처 간소화를 참조하세요.

인수
target

유형: str

업데이트할 테이블의 이름입니다. 함수를 실행하기 전에 create_streaming_table() 함수를 사용하여 대상 테이블을 만들 수 있습니다 apply_changes() .

이 매개 변수는 필수입니다.
source

유형: str

CDC 레코드를 포함하는 데이터 원본입니다.

이 매개 변수는 필수입니다.
keys

유형: list

원본 데이터의 행을 고유하게 식별하는 열 또는 열의 조합입니다. 대상 테이블에서 특정 레코드에 적용되는 CDC 이벤트를 식별하는 데 사용됩니다.

다음 중 하나를 지정할 수 있습니다.

* 문자열 목록: ["userId", "orderId"]
* Spark SQL col() 함수 목록: [col("userId"), col("orderId"]

col() 함수의 인수는 한정자를 포함할 수 없습니다. 예를 들어, col(userId)는 사용할 수 있지만 col(source.userId)는 사용할 수 없습니다.

이 매개 변수는 필수입니다.
sequence_by

형식: str 또는 col()

원본 데이터에서 CDC 이벤트의 논리적 순서를 지정하는 열 이름입니다. Delta Live Tables는 이 시퀀싱을 사용하여 순서가 맞지 않게 도착한 변경 이벤트를 처리합니다.

다음 중 하나를 지정할 수 있습니다.

* 문자열: "sequenceNum"
* Spark SQL col() 함수: col("sequenceNum")

col() 함수의 인수는 한정자를 포함할 수 없습니다. 예를 들어, col(userId)는 사용할 수 있지만 col(source.userId)는 사용할 수 없습니다.

이 매개 변수는 필수입니다.
ignore_null_updates

유형: bool

대상 열의 하위 집합을 포함하는 업데이트를 수집할 수 있습니다. CDC 이벤트가 기존 행과 일치하고 ignore_null_updatesTrue인 경우, null을 포함하는 열은 대상에서 기존 값으로 유지됩니다. null 값을 갖는 중첩된 열에도 적용됩니다. ignore_null_updatesFalse인 경우 기존 값이 null 값으로 덮어써집니다.

이 매개 변수는 선택 사항입니다.

기본값은 False입니다.
apply_as_deletes

형식: str 또는 expr()

CDC 이벤트를 upsert가 아닌 DELETE로 취급해야 하는 경우를 지정합니다. 순서가 맞지 않는 데이터를 처리하기 위해, 삭제된 행은 기본 Delta 테이블에서 일시적으로 삭제 표식으로 유지되고 메타스토어에서 이러한 삭제 표식이 제외된 뷰가 만들어집니다. 보존 간격은
pipelines.cdc.tombstoneGCThresholdInSeconds테이블 속성을 사용하여 구성할 수 있습니다.

다음 중 하나를 지정할 수 있습니다.

* 문자열: "Operation = 'DELETE'"
* Spark SQL expr() 함수: expr("Operation = 'DELETE'")

이 매개 변수는 선택 사항입니다.
apply_as_truncates

형식: str 또는 expr()

CDC 이벤트가 전체 테이블 TRUNCATE로 처리되어야 하는 경우를 지정합니다. 이 절은 대상 테이블의 전체 자르기를 트리거하므로 이 기능이 필요한 특정 사용 사례에만 사용해야 합니다.

apply_as_truncates 매개 변수는 SCD 형식 1에 대해서만 지원됩니다. SCD 형식 2는 자르기를 지원하지 않습니다.

다음 중 하나를 지정할 수 있습니다.

* 문자열: "Operation = 'TRUNCATE'"
* Spark SQL expr() 함수: expr("Operation = 'TRUNCATE'")

이 매개 변수는 선택 사항입니다.
column_list

except_column_list

유형: list

대상 테이블에 포함할 열의 하위 집합입니다. 포함할 열의 전체 목록을 지정하려면 column_list를 사용합니다. 제외할 열을 지정하려면 except_column_list를 사용합니다. 값은 문자열 목록이나 Spark SQL col() 함수 목록으로 지정할 수 있습니다.

* column_list = ["userId", "name", "city"].
* column_list = [col("userId"), col("name"), col("city")]
* except_column_list = ["operation", "sequenceNum"]
* except_column_list = [col("operation"), col("sequenceNum")

col() 함수의 인수는 한정자를 포함할 수 없습니다. 예를 들어, col(userId)는 사용할 수 있지만 col(source.userId)는 사용할 수 없습니다.

이 매개 변수는 선택 사항입니다.

기본값은 함수에 column_list 또는 except_column_list 인수가 전달되지 않은 경우 대상 테이블의 모든 열을 포함하는 것입니다.
stored_as_scd_type

형식: str 또는 int

레코드를 SCD 형식 1 또는 SCD 형식 2로 저장할지 여부.

SCD 유형 1의 경우 1, SCD 유형 2의 경우 2로 설정합니다.

이 절은 옵션입니다.

기본값은 SCD 형식 1입니다.
track_history_column_list

track_history_except_column_list

유형: list

대상 테이블의 기록에 대해 추적할 출력 열의 하위 집합입니다. 추적할 열의 전체 목록을 지정하는 데 사용합니다 track_history_column_list . 사용
track_history_except_column_list 추적에서 제외할 열을 지정합니다. 값은 문자열 목록이나 Spark SQL col() 함수 목록(track_history_column_list = ["userId", "name", "city"])으로 지정할 수 있습니다. - track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum")

col() 함수의 인수는 한정자를 포함할 수 없습니다. 예를 들어, col(userId)는 사용할 수 있지만 col(source.userId)는 사용할 수 없습니다.

이 매개 변수는 선택 사항입니다.

기본값은 함수에 track_history_column_list 또는
track_history_except_column_list 인수가 전달되지 않은 경우 대상 테이블의 모든 열을 포함하는 것입니다.