Lakeflow 선언적 파이프라인 및 자동 로더를 사용하여 데이터 오케스트레이션을 위한 ETL(추출, 변환 및 로드) 파이프라인을 만들고 배포하는 방법을 알아봅니다. ETL 파이프라인은 원본 시스템에서 데이터를 읽고, 데이터 품질 검사 및 중복 제거 기록과 같은 요구 사항에 따라 해당 데이터를 변환하고, 데이터 웨어하우스 또는 데이터 레이크와 같은 대상 시스템에 데이터를 쓰는 단계를 구현합니다.
이 자습서에서는 Lakeflow 선언적 파이프라인 및 자동 로더를 사용하여 다음을 수행합니다.
- 대상 테이블에 원시 원본 데이터를 적재합니다.
- 원시 원본 데이터를 변환하고 변환된 데이터를 두 개의 대상 구체화된 뷰에 씁니다.
- 변환된 데이터를 쿼리합니다.
- Databricks 작업을 사용하여 ETL 파이프라인을 자동화합니다.
Lakeflow 선언적 파이프라인 및 자동 로더에 대한 자세한 내용은 Lakeflow 선언적 파이프라인 및 자동 로더란?을 참조하세요.
요구 사항
이 자습서를 완료하려면 다음 요구 사항을 충족해야 합니다.
- Azure Databricks 작업 영역에 로그인합니다.
- 작업 영역에 Unity 카탈로그 를 사용하도록 설정합니다.
- 계정에 대해 서버리스 컴퓨팅 을 사용하도록 설정해야 합니다. 서버리스 Lakeflow 선언적 파이프라인은 모든 작업 영역 지역에서 사용할 수 없습니다. 사용 가능한 지역에 대한 지역 가용성이 제한된 기능을 참조하세요.
- 컴퓨팅 리소스를 만들거나 컴퓨팅 리소스에액세스할 수 있는 권한이 있습니다.
-
카탈로그에 새 스키마를 만들 수 있는 권한이 있습니다. 필요한 사용 권한은
ALL PRIVILEGES
또는USE CATALOG
및CREATE SCHEMA
입니다. -
기존 스키마에 새 볼륨을 만들 수 있는 권한이 있습니다. 필요한 사용 권한은
ALL PRIVILEGES
또는USE SCHEMA
및CREATE VOLUME
입니다.
데이터 세트 정보
이 예제에 사용된 데이터 세트는 밀리언 송 데이터 세트의 하위 집합으로, 현대 음악 트랙에 대한 기능 및 메타데이터 컬렉션입니다. 이 데이터 세트는 Azure Databricks 작업 영역에 포함된 샘플 데이터 세트에서 사용할 수 있습니다.
1단계: 파이프라인 만들기
먼저 Lakeflow 선언적 파이프라인에서 ETL 파이프라인을 만듭니다. Lakeflow 선언적 파이프라인은 Lakeflow 선언적 파이프라인 구문을 사용하여 Notebook 또는 파일( 소스 코드라고 함)에 정의된 종속성을 확인하여 파이프라인을 만듭니다. 각 소스 코드 파일에는 하나의 언어만 포함될 수 있지만 파이프라인에 여러 언어별 Notebook 또는 파일을 추가할 수 있습니다. 자세한 내용은 Lakeflow 선언적 파이프라인을 참조하세요.
중요합니다
소스 코드 필드를 비워 두고 소스 코드 작성을 위한 Notebook을 자동으로 만들고 구성합니다.
이 자습서에서는 서버리스 컴퓨팅 및 Unity 카탈로그를 사용합니다. 지정되지 않은 모든 구성 옵션에 대해 기본 설정을 사용합니다. 작업 영역에서 서버리스 컴퓨팅을 사용하도록 설정하거나 지원하지 않는 경우 기본 컴퓨팅 설정을 사용하여 작성된 자습서를 완료할 수 있습니다. 기본 컴퓨팅 설정을 사용하는 경우 파이프라인 UI 만들기의 대상 섹션에 있는 스토리지 옵션 아래에서 Unity 카탈로그를 수동으로 선택해야 합니다.
Lakeflow 선언적 파이프라인에서 새 ETL 파이프라인을 만들려면 다음 단계를 수행합니다.
- 작업 영역에서
사이드바의 작업 및 파이프라인입니다.
- 새로 만들기에서 ETL 파이프라인을 클릭합니다.
- 파이프라인 이름에 고유한 파이프라인 이름을 입력합니다.
- 서버리스 확인란을 선택합니다.
- 대상에서 테이블이 게시되는 Unity 카탈로그 위치를 구성하려면 기존 카탈로그를 선택하고 스키마에 새 이름을 작성하여 카탈로그에 새 스키마를 만듭니다.
- 만들기를 클릭합니다.
새 파이프라인에 대한 파이프라인 UI가 나타납니다.
2단계: 파이프라인 개발
중요합니다
Notebook은 단일 프로그래밍 언어만 포함할 수 있습니다. 파이프라인 소스 코드 Notebook에서 Python 및 SQL 코드를 혼합하지 마세요.
이 단계에서는 Databricks Notebooks를 사용하여 대화형으로 Lakeflow 선언적 파이프라인에 대한 소스 코드를 개발하고 유효성을 검사합니다.
이 코드는 증분 데이터 수집에 자동 로더를 사용합니다. 자동 로더는 클라우드 개체 스토리지에 도착하는 새 파일을 자동으로 감지하고 처리합니다. 자세한 내용은 자동 로더란?
빈 소스 코드 노트북이 파이프라인 용도로 자동 생성되고 구성됩니다. 노트는 사용자 디렉터리 안에 새로운 디렉터리가 생성되어 만들어집니다. 새 디렉터리 및 파일의 이름이 파이프라인의 이름과 일치합니다. 예들 들어 /Users/someone@example.com/my_pipeline/my_pipeline
입니다.
파이프라인을 개발할 때 Python 또는 SQL을 선택할 수 있습니다. 두 언어 모두에 대한 예제가 포함되어 있습니다. 언어 선택에 따라 기본 전자 필기장 언어를 선택해야 합니다. Lakeflow 선언적 파이프라인 코드 개발을 위한 Notebook 지원에 대한 자세한 내용은 Lakeflow 선언적 파이프라인에서 Notebook을 사용하여 ETL 파이프라인 개발 및 디버그를 참조하세요.
이 Notebook에 액세스할 수 있는 링크는 파이프라인 세부 정보 패널의 소스 코드 필드 아래에 있습니다. 다음 단계로 진행하기 전에 링크를 클릭하여 전자 필기장을 엽니다.
오른쪽 위에서 연결을 클릭하여 컴퓨팅 구성 메뉴를 엽니다.
1단계에서 만든 파이프라인의 이름을 마우스로 가리킵니다.
연결을 클릭합니다.
맨 위에 있는 Notebook의 제목 옆에 있는 Notebook의 기본 언어(Python 또는 SQL)를 선택합니다.
다음 코드를 복사하여 Notebook의 셀에 붙여넣습니다.
파이썬
# Import modules import dlt from pyspark.sql.functions import * from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField # Define the path to the source data file_path = f"/databricks-datasets/songs/data-001/" # Define a streaming table to ingest data from a volume schema = StructType( [ StructField("artist_id", StringType(), True), StructField("artist_lat", DoubleType(), True), StructField("artist_long", DoubleType(), True), StructField("artist_location", StringType(), True), StructField("artist_name", StringType(), True), StructField("duration", DoubleType(), True), StructField("end_of_fade_in", DoubleType(), True), StructField("key", IntegerType(), True), StructField("key_confidence", DoubleType(), True), StructField("loudness", DoubleType(), True), StructField("release", StringType(), True), StructField("song_hotnes", DoubleType(), True), StructField("song_id", StringType(), True), StructField("start_of_fade_out", DoubleType(), True), StructField("tempo", DoubleType(), True), StructField("time_signature", DoubleType(), True), StructField("time_signature_confidence", DoubleType(), True), StructField("title", StringType(), True), StructField("year", IntegerType(), True), StructField("partial_sequence", IntegerType(), True) ] ) @dlt.table( comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." ) def songs_raw(): return (spark.readStream .format("cloudFiles") .schema(schema) .option("cloudFiles.format", "csv") .option("sep","\t") .option("inferSchema", True) .load(file_path)) # Define a materialized view that validates data and renames a column @dlt.table( comment="Million Song Dataset with data cleaned and prepared for analysis." ) @dlt.expect("valid_artist_name", "artist_name IS NOT NULL") @dlt.expect("valid_title", "song_title IS NOT NULL") @dlt.expect("valid_duration", "duration > 0") def songs_prepared(): return ( spark.read.table("songs_raw") .withColumnRenamed("title", "song_title") .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year") ) # Define a materialized view that has a filtered, aggregated, and sorted view of the data @dlt.table( comment="A table summarizing counts of songs released by the artists who released the most songs each year." ) def top_artists_by_year(): return ( spark.read.table("songs_prepared") .filter(expr("year > 0")) .groupBy("artist_name", "year") .count().withColumnRenamed("count", "total_number_of_songs") .sort(desc("total_number_of_songs"), desc("year")) )
SQL (영문)
-- Define a streaming table to ingest data from a volume CREATE OR REFRESH STREAMING TABLE songs_raw ( artist_id STRING, artist_lat DOUBLE, artist_long DOUBLE, artist_location STRING, artist_name STRING, duration DOUBLE, end_of_fade_in DOUBLE, key INT, key_confidence DOUBLE, loudness DOUBLE, release STRING, song_hotnes DOUBLE, song_id STRING, start_of_fade_out DOUBLE, tempo DOUBLE, time_signature INT, time_signature_confidence DOUBLE, title STRING, year INT, partial_sequence STRING, value STRING ) COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." AS SELECT * FROM STREAM read_files( '/databricks-datasets/songs/data-001/'); -- Define a materialized view that validates data and renames a column CREATE OR REFRESH MATERIALIZED VIEW songs_prepared( CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL), CONSTRAINT valid_title EXPECT (song_title IS NOT NULL), CONSTRAINT valid_duration EXPECT (duration > 0) ) COMMENT "Million Song Dataset with data cleaned and prepared for analysis." AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year FROM songs_raw; -- Define a materialized view that has a filtered, aggregated, and sorted view of the data CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs." AS SELECT artist_name, year, COUNT(*) AS total_number_of_songs FROM songs_prepared WHERE year > 0 GROUP BY artist_name, year ORDER BY total_number_of_songs DESC, year DESC
시작을 클릭하여 연결된 파이프라인에 대한 업데이트를 시작합니다.
3단계: 변환된 데이터 쿼리
이 단계에서는 ETL 파이프라인에서 처리된 데이터를 쿼리하여 노래 데이터를 분석합니다. 이러한 쿼리는 이전 단계에서 만든 준비된 레코드를 사용합니다.
먼저 1990년 이후 매년 가장 많은 곡을 발표한 아티스트를 찾는 쿼리를 실행합니다.
사이드바에서
SQL 편집기를 클릭합니다.
새 탭 아이콘을 클릭하고 메뉴에서 새 쿼리 만들기를 선택합니다.
다음을 입력합니다.
-- Which artists released the most songs each year in 1990 or later? SELECT artist_name, total_number_of_songs, year FROM <catalog>.<schema>.top_artists_by_year WHERE year >= 1990 ORDER BY total_number_of_songs DESC, year DESC
<catalog>
및<schema>
을(를) 테이블이 있는 카탈로그 및 스키마의 이름으로 바꾸십시오. 예들 들어data_pipelines.songs_data.top_artists_by_year
입니다.선택한 실행을 클릭합니다.
이제 4/4 비트와 댄스 템포로 노래를 찾는 또 다른 쿼리를 실행합니다.
새 탭 아이콘을 클릭하고 메뉴에서 새 쿼리 만들기를 선택합니다.
다음 코드를 입력합니다.
-- Find songs with a 4/4 beat and danceable tempo SELECT artist_name, song_title, tempo FROM <catalog>.<schema>.songs_prepared WHERE time_signature = 4 AND tempo between 100 and 140;
<catalog>
및<schema>
을(를) 테이블이 있는 카탈로그 및 스키마의 이름으로 바꾸십시오. 예들 들어data_pipelines.songs_data.songs_prepared
입니다.선택한 실행을 클릭합니다.
4단계: 파이프라인을 실행하는 작업 만들기
다음으로 Databricks 작업을 사용하여 데이터 수집, 처리 및 분석 단계를 자동화하는 워크플로를 만듭니다.
- 작업 영역에서
사이드바의 작업 및 파이프라인입니다.
- 새로 만들기에서 작업을 클릭합니다.
- 작업 제목 상자에서 새 작업 <날짜 및 시간을> 작업 이름으로 바꿉 있습니다. 예들 들어
Songs workflow
입니다. -
작업 이름에 첫 번째 작업의 이름을 입력합니다(예:
ETL_songs_data
.). - 형식에서 파이프라인을 선택합니다.
- 파이프라인에서 1단계에서 만든 파이프라인을 선택합니다.
- 만들기를 클릭합니다.
- 워크플로를 실행하려면 지금 실행을 클릭합니다. 실행에 대한 세부 정보를 보려면 실행 탭을 클릭합니다. 작업을 클릭하여 작업 실행에 대한 세부 정보를 봅니다.
- 워크플로가 완료될 때 결과를 보려면 성공한 최신 실행으로 이동 또는 작업 실행 시작 시간을 클릭합니다. 출력 페이지가 나타나고 쿼리 결과가 표시됩니다.
작업 실행에 대한 자세한 내용은 Lakeflow 작업의 모니터링 및 관찰 가능성을 참조하세요.
5단계: 파이프라인 작업 예약
일정에 따라 ETL 파이프라인을 실행하려면 다음 단계를 수행합니다.
- 작업과 동일한 Azure Databricks 작업 영역에서 작업 및 파이프라인 UI로 이동합니다.
- 필요에 따라 작업 필터와 내 소유 필터를 선택합니다.
- 이름 열에서 작업 이름을 클릭합니다. 측면 패널에는 작업 세부 정보가 표시됩니다.
- 일정 및 트리거 패널에서 트리거 추가를 클릭하고 트리거 유형에서 예약을 선택합니다.
- 기간, 시작 시간 및 표준 시간대를 지정합니다.
- 저장을 클릭합니다.
자세한 정보
- Lakeflow 선언적 파이프라인을 사용한 데이터 처리 파이프라인에 대한 자세한 내용은 Lakeflow 선언적 파이프라인을 참조하세요.
- Databricks Notebooks에 대한 자세한 내용은 Databricks Notebook 소개를 참조하세요.
- Lakeflow 작업에 대한 자세한 내용은 작업이란?을 참조하세요.
- Delta Lake에 대한 자세한 내용은 Azure Databricks의 Delta Lake란?