다음을 통해 공유


데이터 세트를 정의하는 함수

여기서 별칭으로 pyspark.pipelines지정된 모듈은 dp 데코레이터를 사용하여 대부분의 핵심 기능을 구현합니다. 이러한 데코레이터는 스트리밍 또는 일괄 처리 쿼리를 정의하고 Apache Spark DataFrame을 반환하는 함수를 허용합니다. 다음 구문은 파이프라인 데이터 세트를 정의하는 간단한 예제를 보여 줍니다.

from pyspark import pipelines as dp

@dp.table()
def function_name(): # This is the function decorated
  return (<query>) # This is the query logic that defines the dataset

이 페이지에서는 파이프라인에서 데이터 세트를 정의하는 함수 및 쿼리에 대한 개요를 제공합니다. 사용 가능한 데코레이터의 전체 목록은 파이프라인 개발자 참조를 참조하세요.

데이터 세트를 정의하는 데 사용하는 함수에는 타사 API 호출을 포함하여 데이터 세트와 관련이 없는 임의의 Python 논리가 포함되어서는 안 됩니다. 파이프라인은 계획, 유효성 검사 및 업데이트 중에 이러한 함수를 여러 번 실행합니다. 임의의 논리를 포함하면 예기치 않은 결과가 발생할 수 있습니다.

데이터를 읽어 데이터 세트 정의를 시작합니다.

파이프라인 데이터 세트를 정의하는 데 사용되는 함수는 일반적으로 spark.read이나 spark.readStream 연산으로 시작합니다. 이러한 읽기 작업은 DataFrame을 반환하기 전에 추가 변환을 정의하는 데 사용하는 정적 또는 스트리밍 DataFrame 개체를 반환합니다. DataFrame을 반환하는 Spark 작업의 다른 예는 다음과 같습니다spark.tablespark.range.

함수는 함수 외부에서 정의된 DataFrame을 참조해서는 안 됩니다. 다른 범위에서 정의된 DataFrame을 참조하려고 하면 예기치 않은 동작이 발생할 수 있습니다. 여러 테이블을 만들기 위한 메타프로그래밍 패턴의 예는 루프에서 테이블 만들기를 for참조하세요.

다음 예제에서는 일괄 처리 또는 스트리밍 논리를 사용하여 데이터를 읽기 위한 기본 구문을 보여 줍니다.

from pyspark import pipelines as dp

# Batch read on a table
@dp.materialized_view()
def function_name():
  return spark.read.table("catalog_name.schema_name.table_name")

# Batch read on a path
@dp.materialized_view()
def function_name():
  return spark.read.format("parquet").load("/Volumes/catalog_name/schema_name/volume_name/data_path")


# Streaming read on a table
@dp.table()
def function_name():
  return spark.readStream.table("catalog_name.schema_name.table_name")

# Streaming read on a path
@dp.table()
def function_name():
  return (spark.read
    .format("cloudFiles")
    .option("cloudFile.format", "parquet")
    .load("/Volumes/catalog_name/schema_name/volume_name/data_path")
  )

외부 REST API에서 데이터를 읽어야 하는 경우 Python 사용자 지정 데이터 원본을 사용하여 이 연결을 구현합니다. PySpark 사용자 지정 데이터 원본을 참조하세요.

비고

pandas 데이터프레임, 사전 및 리스트를 포함하여 Python 데이터 컬렉션에서 임의의 Apache Spark DataFrames를 만들 수 있습니다. 이러한 패턴은 개발 및 테스트 중에 유용할 수 있지만 대부분의 프로덕션 파이프라인 데이터 세트 정의는 파일, 외부 시스템 또는 기존 테이블 또는 뷰에서 데이터를 로드하는 것으로 시작해야 합니다.

연쇄 변환

파이프라인은 거의 모든 Apache Spark DataFrame 변환을 지원합니다. 데이터 세트 정의 함수에 원하는 수의 변환을 포함할 수 있지만 사용하는 메서드가 항상 DataFrame 개체를 반환하는지 확인해야 합니다.

여러 다운스트림 워크로드를 구동하는 중간 변환이 있지만 테이블로 구체화할 필요가 없는 경우 파이프라인에 임시 보기를 추가하는 데 사용합니다 @dp.temporary_view() . 그런 다음, 여러 다운스트림 데이터 세트 정의에서 spark.read.table("temp_view_name")을 사용하여 이 보기를 참조할 수 있습니다. 다음 구문은 이 패턴을 보여 줍니다.

from pyspark import pipelines as dp

@dp.temporary_view()
def a():
  return spark.read.table("source").filter(...)

@dp.materialized_view()
def b():
  return spark.read.table("a").groupBy(...)

@dp.materialized_view()
def c():
  return spark.read.table("a").groupBy(...)

이렇게 하면 파이프라인이 파이프라인 계획 중에 뷰의 변환을 완전히 인식하고 데이터 세트 정의 외부에서 실행되는 임의 Python 코드와 관련된 잠재적인 문제를 방지할 수 있습니다.

함수 내에서 다음 예제와 같이 증분 결과를 뷰, 구체화된 뷰 또는 스트리밍 테이블로 작성하지 않고도 DataFrame을 함께 연결하여 새 DataFrame을 만들 수 있습니다.

from pyspark import pipelines as dp

@dp.table()
def multiple_transformations():
  df1 = spark.read.table("source").filter(...)
  df2 = df1.groupBy(...)
  return df2.filter(...)

모든 DataFrame이 일괄 처리 논리를 사용하여 초기 읽기를 수행하는 경우 반환 결과는 정적 DataFrame입니다. 스트리밍하는 쿼리가 있는 경우 반환 결과는 스트리밍 데이터 프레임입니다.

DataFrame를 반환합니다

@dp.table를 사용하여 스트리밍 읽기의 결과에서 스트리밍 테이블을 만드십시오. @dp.materialized_view을 사용하여 일괄 처리 읽기 결과에서 물리적 뷰를 만듭니다. 대부분의 다른 데코레이터는 스트리밍 및 정적 DataFrame 모두에서 작동하지만 일부 데코레이터에는 스트리밍 DataFrame이 필요합니다.

데이터 세트를 정의하는 데 사용되는 함수는 Spark DataFrame을 반환해야 합니다. 파이프라인 데이터 세트 코드의 일부로 파일 또는 테이블에 저장하거나 쓰는 메서드를 사용하지 마세요.

파이프라인 코드에서 사용해서는 안 되는 Apache Spark 작업의 예:

  • collect()
  • count()
  • toPandas()
  • save()
  • saveAsTable()
  • start()
  • toTable()

비고

또한 파이프라인은 데이터 세트 정의 함수에 Spark의 Pandas 사용을 지원합니다. Spark의 Pandas API를 참조하세요.

Python 파이프라인에서 SQL 사용

PySpark는 SQL을 spark.sql 사용하여 DataFrame 코드를 작성하는 연산자를 지원합니다. 파이프라인 소스 코드에서 이 패턴을 사용하면 구체화된 뷰 또는 스트리밍 테이블로 컴파일됩니다.

다음 코드 예제는 데이터 세트 쿼리 논리에 사용하는 spark.read.table("catalog_name.schema_name.table_name") 것과 같습니다.

@dp.materialized_view
def my_table():
  return spark.sql("SELECT * FROM catalog_name.schema_name.table_name")

dlt.readdlt.read_stream (레거시)

이전 dlt 모듈에는 레거시 파이프라인 게시 모드에서 기능을 지원하기 위해 도입된 dlt.read()dlt.read_stream() 함수가 포함되어 있습니다. Databricks는 이러한 메서드도 지원하지만, 다음과 같은 이유로 항상 spark.read.table()spark.readStream.table() 함수를 사용할 것을 권장합니다.

  • 함수는 dlt 현재 파이프라인 외부에서 정의된 데이터 세트를 읽기 위한 제한된 지원을 제공합니다.
  • spark 함수는 읽기 작업에 대한 skipChangeCommits 옵션을 지정할 수 있습니다. 옵션 지정을 dlt 함수에서 지원하지 않습니다.
  • dlt 모듈은 pyspark.pipelines 모듈로 대체되었습니다. Databricks는 Python에서 파이프라인 코드를 작성할 때 from pyspark import pipelines as dp을(를) 사용하기 위해 pyspark.pipelines을(를) 가져오는 것을 권장합니다.