이 자습서에서는 Lakeflow Spark 선언적 파이프라인 및 자동 로더를 사용하여 데이터 오케스트레이션을 위한 ETL(추출, 변환 및 로드) 파이프라인을 만들고 배포하는 방법을 설명합니다. ETL 파이프라인은 원본 시스템에서 데이터를 읽고, 데이터 품질 검사 및 중복 제거 기록과 같은 요구 사항에 따라 해당 데이터를 변환하고, 데이터 웨어하우스 또는 데이터 레이크와 같은 대상 시스템에 데이터를 쓰는 단계를 구현합니다.
이 자습서에서는 파이프라인 및 자동 로더를 사용하여 다음을 수행합니다.
- 대상 테이블에 원시 원본 데이터를 적재합니다.
- 원시 원본 데이터를 변환하고 변환된 데이터를 두 개의 대상 구체화된 뷰에 씁니다.
- 변환된 데이터를 쿼리합니다.
- Databricks 작업을 사용하여 ETL 파이프라인을 자동화합니다.
파이프라인 및 자동 로더에 대한 자세한 내용은 Lakeflow Spark 선언적 파이프라인 및자동 로더란?을 참조하세요.
요구 사항
이 자습서를 완료하려면 다음 요구 사항을 충족해야 합니다.
- Azure Databricks 작업 영역에 로그인합니다.
- 작업 영역에 Unity 카탈로그 를 사용하도록 설정합니다.
- 계정에 대해 서버리스 컴퓨팅 을 사용하도록 설정해야 합니다. 서버리스 Lakeflow Spark 선언적 파이프라인은 모든 작업 영역 지역에서 사용할 수 없습니다. 사용 가능한 지역에 대한 지역 가용성이 제한된 기능을 참조하세요.
- 컴퓨팅 리소스를 만들거나 컴퓨팅 리소스에액세스할 수 있는 권한이 있습니다.
-
카탈로그에 새 스키마를 만들 수 있는 권한이 있습니다. 필요한 사용 권한은
ALL PRIVILEGES또는USE CATALOG및CREATE SCHEMA입니다. -
기존 스키마에 새 볼륨을 만들 수 있는 권한이 있습니다. 필요한 사용 권한은
ALL PRIVILEGES또는USE SCHEMA및CREATE VOLUME입니다.
데이터 세트 정보
이 예제에 사용된 데이터 세트는 밀리언 송 데이터 세트의 하위 집합으로, 현대 음악 트랙에 대한 기능 및 메타데이터 컬렉션입니다. 이 데이터 세트는 Azure Databricks 작업 영역에 포함된 샘플 데이터 세트에서 사용할 수 있습니다.
1단계: 파이프라인 만들기
먼저 파이프라인 구문을 사용하여 파일( 소스 코드라고 함)에서 데이터 세트를 정의하여 파이프라인을 만듭니다. 각 소스 코드 파일은 하나의 언어만 포함할 수 있지만 파이프라인에 여러 언어별 파일을 추가할 수 있습니다. 자세한 내용은 Lakeflow Spark 선언적 파이프라인을 참조하세요.
이 자습서에서는 서버리스 컴퓨팅 및 Unity 카탈로그를 사용합니다. 지정되지 않은 모든 구성 옵션에 대해 기본 설정을 사용합니다. 작업 영역에서 서버리스 컴퓨팅을 사용하도록 설정하거나 지원하지 않는 경우 기본 컴퓨팅 설정을 사용하여 작성된 자습서를 완료할 수 있습니다.
새 파이프라인을 만들려면 다음 단계를 수행합니다.
- 작업 영역에서
을 클릭합니다.사이드바에서 새로 고른 다음 ETL 파이프라인을 선택합니다.
- 파이프라인에 고유한 이름을 지정합니다.
- 이름 바로 아래에서 생성하는 데이터의 기본 카탈로그 및 스키마를 선택합니다. 변환에서 다른 대상을 지정할 수 있지만 이 자습서에서는 이러한 기본값을 사용합니다. 만든 카탈로그 및 스키마에 대한 권한이 있어야 합니다. 요구 사항을 참조하세요.
- 이 자습서에서는 빈 파일로 시작을 선택합니다.
- 폴더 경로에서 원본 파일의 위치를 지정하거나 기본값(사용자 폴더)을 적용합니다.
- 첫 번째 소스 파일의 언어로 Python 또는 SQL 을 선택합니다(파이프라인은 언어를 혼합하고 일치시킬 수 있지만 각 파일은 단일 언어여야 합니다).
- 선택을 클릭합니다.
새 파이프라인에 대한 파이프라인 편집기가 나타납니다. 언어에 대한 빈 소스 파일이 만들어지고 첫 번째 변환이 준비됩니다.
2단계: 파이프라인 논리 개발
이 단계에서는 Lakeflow 파이프라인 편집 기를 사용하여 파이프라인에 대한 소스 코드를 대화형으로 개발하고 유효성을 검사합니다.
이 코드는 증분 데이터 수집에 자동 로더를 사용합니다. 자동 로더는 클라우드 개체 스토리지에 도착하는 새 파일을 자동으로 감지하고 처리합니다. 자세한 내용은 자동 로더란?
빈 소스 코드 파일이 파이프라인에 대해 자동으로 만들어지고 구성됩니다. 파일은 파이프라인의 변환 폴더에 만들어집니다. 기본적으로 변환 폴더의 모든 *.py 및 *.sql 파일은 파이프라인 원본의 일부입니다.
다음 코드를 복사하여 소스 파일에 붙여넣습니다. 1단계에서 파일에 대해 선택한 언어를 사용해야 합니다.
파이썬
# Import modules from pyspark import pipelines as dp from pyspark.sql.functions import * from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField # Define the path to the source data file_path = f"/databricks-datasets/songs/data-001/" # Define a streaming table to ingest data from a volume schema = StructType( [ StructField("artist_id", StringType(), True), StructField("artist_lat", DoubleType(), True), StructField("artist_long", DoubleType(), True), StructField("artist_location", StringType(), True), StructField("artist_name", StringType(), True), StructField("duration", DoubleType(), True), StructField("end_of_fade_in", DoubleType(), True), StructField("key", IntegerType(), True), StructField("key_confidence", DoubleType(), True), StructField("loudness", DoubleType(), True), StructField("release", StringType(), True), StructField("song_hotnes", DoubleType(), True), StructField("song_id", StringType(), True), StructField("start_of_fade_out", DoubleType(), True), StructField("tempo", DoubleType(), True), StructField("time_signature", DoubleType(), True), StructField("time_signature_confidence", DoubleType(), True), StructField("title", StringType(), True), StructField("year", IntegerType(), True), StructField("partial_sequence", IntegerType(), True) ] ) @dp.table( comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." ) def songs_raw(): return (spark.readStream .format("cloudFiles") .schema(schema) .option("cloudFiles.format", "csv") .option("sep","\t") .load(file_path)) # Define a materialized view that validates data and renames a column @dp.materialized_view( comment="Million Song Dataset with data cleaned and prepared for analysis." ) @dp.expect("valid_artist_name", "artist_name IS NOT NULL") @dp.expect("valid_title", "song_title IS NOT NULL") @dp.expect("valid_duration", "duration > 0") def songs_prepared(): return ( spark.read.table("songs_raw") .withColumnRenamed("title", "song_title") .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year") ) # Define a materialized view that has a filtered, aggregated, and sorted view of the data @dp.materialized_view( comment="A table summarizing counts of songs released by the artists who released the most songs each year." ) def top_artists_by_year(): return ( spark.read.table("songs_prepared") .filter(expr("year > 0")) .groupBy("artist_name", "year") .count().withColumnRenamed("count", "total_number_of_songs") .sort(desc("total_number_of_songs"), desc("year")) )SQL (영문)
-- Define a streaming table to ingest data from a volume CREATE OR REFRESH STREAMING TABLE songs_raw COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." AS SELECT * FROM STREAM read_files( '/databricks-datasets/songs/data-001/part*', format => "csv", header => "false", delimiter => "\t", schema => """ artist_id STRING, artist_lat DOUBLE, artist_long DOUBLE, artist_location STRING, artist_name STRING, duration DOUBLE, end_of_fade_in DOUBLE, key INT, key_confidence DOUBLE, loudness DOUBLE, release STRING, song_hotnes DOUBLE, song_id STRING, start_of_fade_out DOUBLE, tempo DOUBLE, time_signature INT, time_signature_confidence DOUBLE, title STRING, year INT, partial_sequence STRING """, schemaEvolutionMode => "none"); -- Define a materialized view that validates data and renames a column CREATE OR REFRESH MATERIALIZED VIEW songs_prepared( CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL), CONSTRAINT valid_title EXPECT (song_title IS NOT NULL), CONSTRAINT valid_duration EXPECT (duration > 0) ) COMMENT "Million Song Dataset with data cleaned and prepared for analysis." AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year FROM songs_raw; -- Define a materialized view that has a filtered, aggregated, and sorted view of the data CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs." AS SELECT artist_name, year, COUNT(*) AS total_number_of_songs FROM songs_prepared WHERE year > 0 GROUP BY artist_name, year ORDER BY total_number_of_songs DESC, year DESC;이 소스에는 세 개의 쿼리에 대한 코드가 포함되어 있습니다. 이러한 쿼리를 별도의 파일에 배치하여 파일을 구성하고 원하는 방식으로 코딩할 수도 있습니다.
을 클릭합니다.파일을 실행하거나 파이프라인을 실행하여 연결된 파이프라인에 대한 업데이트를 시작합니다. 파이프라인에 원본 파일이 하나만 있으면 기능적으로 동일합니다.
업데이트가 완료되면 편집기가 파이프라인에 대한 정보로 업데이트됩니다.
- 코드 오른쪽의 사이드바에 있는 DAG(파이프라인 그래프)에는 세 개의 테이블,
songs_rawsongs_prepared및top_artists_by_year. - 업데이트 요약은 파이프라인 자산 브라우저의 맨 위에 표시됩니다.
- 생성된 테이블의 세부 정보는 아래쪽 창에 표시되며, 테이블을 선택하여 테이블에서 데이터를 찾아볼 수 있습니다.
여기에는 원시 및 정리된 데이터뿐만 아니라 연도별 최고 아티스트를 찾기 위한 몇 가지 간단한 분석이 포함됩니다. 다음 단계에서는 파이프라인의 별도 파일에서 추가 분석을 위한 임시 쿼리를 만듭니다.
3단계: 파이프라인에서 만든 데이터 세트 살펴보기
이 단계에서는 ETL 파이프라인에서 처리된 데이터에 대해 임시 쿼리를 수행하여 Databricks SQL 편집기에서 노래 데이터를 분석합니다. 이러한 쿼리는 이전 단계에서 만든 준비된 레코드를 사용합니다.
먼저 1990년 이후 매년 가장 많은 곡을 발표한 아티스트를 찾는 쿼리를 실행합니다.
파이프라인 자산 브라우저 사이드바에서
을 클릭합니다.그런 다음 탐색을 추가합니다.
이름을 입력하고 탐색 파일에 대해 SQL을 선택합니다. SQL Notebook이 새
explorations폴더에 만들어집니다. 폴더의explorations파일은 기본적으로 파이프라인 업데이트의 일부로 실행되지 않습니다. SQL Notebook에는 함께 또는 별도로 실행할 수 있는 셀이 있습니다.1990년 이후 매년 가장 많은 곡을 릴리스하는 아티스트 테이블을 만들려면 새 SQL 파일에 다음 코드를 입력합니다(파일에 샘플 코드가 있는 경우 대체). 이 Notebook은 파이프라인의 일부가 아니므로 기본 카탈로그 및 스키마를 사용하지 않습니다. 파이프라인의
<catalog>.<schema>기본값으로 사용한 카탈로그 및 스키마로 바꿉니다.-- Which artists released the most songs each year in 1990 or later? SELECT artist_name, total_number_of_songs, year -- replace with the catalog/schema you are using: FROM <catalog>.<schema>.top_artists_by_year WHERE year >= 1990 ORDER BY total_number_of_songs DESC, year DESC;을 클릭하거나 키를 눌러
Shift + Enter이 쿼리를 실행합니다.
이제 4/4 비트와 댄스 템포로 노래를 찾는 또 다른 쿼리를 실행합니다.
같은 파일의 다음 셀에 다음 코드를 추가합니다. 다시 파이프라인의
<catalog>.<schema>기본값으로 사용한 카탈로그 및 스키마로 바꿉니다.-- Find songs with a 4/4 beat and danceable tempo SELECT artist_name, song_title, tempo -- replace with the catalog/schema you are using: FROM <catalog>.<schema>.songs_prepared WHERE time_signature = 4 AND tempo between 100 and 140;을 클릭하거나 키를 눌러
Shift + Enter이 쿼리를 실행합니다.
4단계: 파이프라인을 실행하는 작업 만들기
다음으로 일정에 따라 실행되는 Databricks 작업을 사용하여 데이터 수집, 처리 및 분석 단계를 자동화하는 워크플로를 만듭니다.
- 편집기의 맨 위에서 일정 단추를 선택합니다.
- 일정 대화 상자 가 나타나면 일정 추가를 선택합니다.
- 그러면 새 일정 대화 상자가 열리고, 여기서 일정에 따라 파이프라인을 실행하는 작업을 만들 수 있습니다.
- 필요에 따라 작업에 이름을 지정합니다.
- 기본적으로 일정은 하루에 한 번 실행되도록 설정됩니다. 기본값을 수락하거나 사용자 설정 일정을 설정할 수 있습니다. 고급을 선택하면 작업이 실행되는 특정 시간을 설정하는 옵션이 제공됩니다. 추가 옵션을 선택하면 작업이 실행되면 알림을 만들 수 있습니다.
- 만들기를 선택하여 변경 내용을 적용하고 작업을 만듭니다.
이제 작업이 매일 실행되어 파이프라인을 최신 상태로 유지합니다. 일정을 다시 선택하여 일정 목록을 볼 수 있습니다. 일정 추가, 편집 또는 제거를 포함하여 해당 대화 상자에서 파이프라인에 대한 일정을 관리할 수 있습니다.
일정(또는 작업)의 이름을 클릭하면 작업 및 파이프라인 목록의 작업 페이지로 이동합니다. 여기에서 실행 기록을 포함하여 작업 실행에 대한 세부 정보를 보거나 지금 실행 단추를 사용하여 작업을 즉시 실행할 수 있습니다.
작업 실행에 대한 자세한 내용은 Lakeflow 작업의 모니터링 및 관찰 가능성을 참조하세요.
자세한 정보
- 데이터 처리 파이프라인에 대한 자세한 내용은 Lakeflow Spark 선언적 파이프라인을 참조하세요.
- Databricks Notebook에 대한 자세한 내용은 Databricks Notebook을 참조하세요.
- Lakeflow 작업에 대한 자세한 내용은 작업이란?을 참조하세요.
- Delta Lake에 대한 자세한 내용은 Azure Databricks의 Delta Lake란?