파이프라인을 사용하여 Azure Databricks의 Apache Spark에서 지원하는 모든 데이터 원본에서 데이터를 로드할 수 있습니다. Streaming DataFrames 및 Pandas for Spark DataFrames를 포함하여 Spark DataFrame을 반환하는 쿼리에 대해 Lakeflow Spark 선언적 파이프라인에서 데이터 세트(테이블 및 뷰)를 정의할 수 있습니다. 데이터 수집 작업의 경우 Databricks는 대부분의 사용 사례에 스트리밍 테이블을 사용하는 것이 좋습니다. 스트리밍 테이블은 자동 로더를 사용하거나 Kafka와 같은 메시지 버스에서 클라우드 개체 스토리지에서 데이터를 수집하는 데 적합합니다.
비고
- 모든 데이터 원본에서 수집에 대한 SQL 지원이 있는 것은 아닙니다. 파이프라인에서 SQL 및 Python 원본을 혼합하여 필요한 경우 Python을 사용하고 동일한 파이프라인의 다른 작업에는 SQL을 사용할 수 있습니다.
- 기본적으로 Lakeflow Spark 선언적 파이프라인에 패키지되지 않은 라이브러리를 사용하는 방법에 대한 자세한 내용은 파이프라인에 대한 Python 종속성 관리를 참조하세요.
- Azure Databricks의 수집에 대한 일반적인 내용은 Lakeflow Connect의 표준 커넥터를 참조하세요.
아래 예제에서는 몇 가지 일반적인 패턴을 보여 줍니다.
기존 테이블에서 로드
Azure Databricks의 기존 테이블에서 데이터를 로드합니다. 쿼리를 사용하여 데이터를 변환하거나 파이프라인에서 추가 처리를 위해 테이블을 로드할 수 있습니다.
다음 예제에서는 기존 테이블에서 데이터를 읽습니다.
파이썬
@dp.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
)
SQL
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
클라우드 개체 스토리지에서 파일 로드
Databricks는 클라우드 개체 스토리지 또는 Unity 카탈로그 볼륨의 파일에서 대부분의 데이터 수집 작업에 대해 파이프라인에서 자동 로더를 사용하는 것이 좋습니다. 자동 로더와 파이프라인은 클라우드 스토리지에 도달하는 지속적으로 증가하는 데이터를 점진적이고 멱등적으로 로드하도록 설계되었습니다.
자동 로더란? 개체 스토리지에서 데이터 로드를 참조하세요.
다음 예제에서는 자동 로더를 사용하여 클라우드 스토리지에서 데이터를 읽습니다.
파이썬
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json")
)
SQL
CREATE OR REFRESH STREAMING TABLE sales
AS SELECT *
FROM STREAM read_files(
'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
format => "json"
);
다음 예제에서는 자동 로더를 사용하여 Unity 카탈로그 볼륨의 CSV 파일에서 데이터 세트를 만듭니다.
파이썬
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/Volumes/my_catalog/retail_org/customers/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/retail_org/customers/",
format => "csv"
)
비고
- 파일 알림과 함께 자동 로더를 사용하고 파이프라인 또는 스트리밍 테이블에 대해 전체 새로 고침을 실행하는 경우 리소스를 수동으로 정리해야 합니다. Notebook에서 CloudFilesResourceManager 사용하여 정리를 수행할 수 있습니다.
- Unity 카탈로그 사용 파이프라인에서 자동 로더를 사용하여 파일을 로드하려면 외부 위치를 사용해야 합니다. 파이프라인과 함께 Unity 카탈로그를 사용하는 방법에 대한 자세한 내용은 파이프라인에서 Unity 카탈로그 사용을 참조하세요.
메시지 버스에서 데이터 로드
메시지 버스에서 데이터를 수집하도록 파이프라인을 구성할 수 있습니다. Databricks는 메시지 버스에서 발생하는 대기 시간이 짧은 로드를 위해 연속적 실행과 향상된 자동 크기 조정 기능을 갖춘 스트리밍 테이블을 사용하는 것이 효율적이라고 권장합니다. 자동 크기 조정을 사용하여 Lakeflow Spark 선언적 파이프라인의 클러스터 사용률 최적화를 참조하세요.
예를 들어 다음 코드는 read_kafka 함수를 사용하여 Kafka에서 데이터를 수집하도록 스트리밍 테이블을 구성합니다.
파이썬
from pyspark import pipelines as dp
@dp.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka_server:9092")
.option("subscribe", "topic1")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_raw AS
SELECT *
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'topic1'
);
다른 메시지 버스 원본에서 수집하려면 다음을 참조하세요.
- 키네시스: read_kinesis
- Pub/Sub 항목: read_pubsub
- 펄서: read_pulsar
Azure Event Hubs에서 데이터 로드
Azure Event Hubs는 Apache Kafka 호환 인터페이스를 제공하는 데이터 스트리밍 서비스입니다. Lakeflow Spark 선언적 파이프라인 런타임에 포함된 구조적 스트리밍 Kafka 커넥터를 사용하여 Azure Event Hubs에서 메시지를 로드할 수 있습니다. Azure Event Hubs에서 메시지를 로드하고 처리하는 방법에 대한 자세한 내용은 Azure Event Hubs를 파이프라인 데이터 원본으로 사용하세요.
외부 시스템에서 데이터 로드
Lakeflow Spark 선언적 파이프라인은 Azure Databricks에서 지원하는 모든 데이터 원본에서 데이터 로드를 지원합니다. 데이터 원본 및 외부 서비스에 대한 연결을 참조하세요. Lakehouse Federation을 사용하여 지원되는 데이터 원본 의 외부 데이터를 로드할 수도 있습니다. Lakehouse Federation에는 Databricks Runtime 13.3 LTS 이상이 필요하므로 Lakehouse Federation을 사용하려면 파이프라인이 미리 보기 채널사용하도록 구성되어야 합니다.
일부 데이터 원본은 SQL에서 동등한 지원을 제공하지 않습니다. 이러한 데이터 원본 중 하나와 함께 Lakehouse Federation을 사용할 수 없는 경우 Python을 사용하여 원본에서 데이터를 수집할 수 있습니다. Python 및 SQL 원본 파일을 동일한 파이프라인에 추가할 수 있습니다. 다음 예제에서는 구체화된 뷰를 선언하여 원격 PostgreSQL 테이블의 현재 데이터 상태에 액세스합니다.
import dp
@dp.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
클라우드 개체 스토리지에서 작은 데이터 세트 또는 정적 데이터 세트 로드
Apache Spark 로드 구문을 사용하여 작거나 정적 데이터 세트를 로드할 수 있습니다. Lakeflow Spark 선언적 파이프라인은 Azure Databricks의 Apache Spark에서 지원하는 모든 파일 형식을 지원합니다. 전체 목록은 데이터 형식 옵션을 참조하세요.
다음 예제에서는 테이블을 만들기 위해 JSON을 로드하는 방법을 보여 줍니다.
파이썬
@dp.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
SQL
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
"/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)
비고
read_files SQL 함수는 Azure Databricks의 모든 SQL 환경에 공통적으로 적용됩니다. 파이프라인에서 SQL을 사용하는 직접 파일 액세스에 권장되는 패턴입니다. 자세한 내용은 옵션참조하세요.
Python 사용자 지정 데이터 원본에서 데이터 로드
Python 사용자 지정 데이터 원본을 사용하면 사용자 지정 형식으로 데이터를 로드할 수 있습니다. 특정 외부 데이터 원본에서 읽고 쓰는 코드를 작성하거나 기존 시스템의 기존 Python 코드를 활용하여 자체 내부 시스템에서 데이터를 읽을 수 있습니다. Python 데이터 원본 개발에 대한 자세한 내용은 PySpark 사용자 지정 데이터 원본을 참조하세요.
Python 사용자 지정 데이터 원본을 사용하여 파이프라인에 데이터를 로드하려면 다음과 같은 my_custom_datasource형식 이름으로 등록한 다음 해당 데이터 원본에서 읽습니다.
from pyspark import pipelines as dp
# Assume `my_custom_datasource` is a custom Python custom data
# source that supports both batch and streaming reads, and has
# been registered using `spark.dataSource.register`.
# This creates a materialized view
@dp.table(name = "read_from_batch")
def read_from_batch():
return spark.read.format("my_custom_datasource").load()
# This creates a streaming table
@dp.table(name = "read_from_streaming")
def read_from_streaming():
return spark.readStream.format("my_custom_datasource").load()
원본 스트리밍 테이블의 변경 내용을 무시하도록 스트리밍 테이블 구성
비고
-
skipChangeCommits플래그는spark.readStream함수를 사용하여option()과만 작동합니다.dp.read_stream()함수에서는 이 플래그를 사용할 수 없습니다. - 원본 스트리밍 테이블이
skipChangeCommitscreate_auto_cdc_flow() 함수의 대상으로 정의되면 플래그를 사용할 수 없습니다.
기본적으로 스트리밍 테이블에는 추가 전용 원본이 필요합니다. 스트리밍 테이블이 다른 스트리밍 테이블을 원본으로 사용하고 원본 스트리밍 테이블에 업데이트 또는 삭제가 필요한 경우(예: GDPR "잊혀질 권리" 처리) 원본 스트리밍 테이블을 읽을 때 해당 변경 내용을 무시하도록 skipChangeCommits 플래그를 설정할 수 있습니다. 이 플래그에 대한 자세한 내용은 업데이트 및 삭제 무시를 참조하세요.
@dp.table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")
비밀을 사용하여 파이프라인에서 스토리지 자격 증명에 안전하게 액세스하기
Azure Databricks 비밀 사용하여 액세스 키 또는 암호와 같은 자격 증명을 저장할 수 있습니다. 파이프라인에서 비밀을 구성하려면 파이프라인 설정 클러스터 구성에서 Spark 속성을 사용합니다. 파이프라인에 대한 클래식 컴퓨팅 구성을 참조하세요.
다음 예제에서는 비밀을 사용하여 자동 로더사용하여 ADLS(Azure Data Lake Storage) 스토리지 계정에서 입력 데이터를 읽는 데 필요한 액세스 키를 저장합니다. 이 동일한 방법을 사용하여 파이프라인에 필요한 비밀(예: S3에 액세스하기 위한 AWS 키 또는 Apache Hive 메타스토어에 대한 암호)을 구성할 수 있습니다.
Azure Data Lake Storage 및 Blob Storage 작업에 대해 자세히 알아보려면 Connect to Azure Data Lake Storage 및 Blob Storage을(를) 참조하세요.
비고
비밀 값을 설정하는 spark.hadoop. 구성 키에 spark_conf 접두사를 추가해야 합니다.
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
}
}
],
"name": ":re[LDP] quickstart using ADLS2"
}
Replace
-
<storage-account-name>에 ADLS 스토리지 계정 이름을 지정합니다. -
<scope-name>를 Azure Databricks 비밀 범위 이름으로. - Azure Storage 계정 액세스 키를 포함하는 키의 이름이
<secret-name>입니다.
from pyspark import pipelines as dp
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dp.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
Replace
-
<container-name>을(를) 입력 데이터를 저장하는 Azure Storage 계정 컨테이너의 이름으로 사용하십시오. -
<storage-account-name>에 ADLS 스토리지 계정 이름을 지정합니다. - 입력 데이터 세트의 경로를
<path-to-input-dataset>에 지정합니다.