Databricks에서 엔드투엔드 데이터 파이프라인 빌드
이 문서에서는 원시 데이터를 수집하고, 데이터를 변환하고, 처리된 데이터에 대한 분석을 실행하는 방법을 포함하여 엔드투엔드 데이터 처리 파이프라인을 만들고 배포하는 방법을 보여 줍니다.
참고 항목
이 문서에서는 Databricks Notebook 및 Azure Databricks 작업을 사용하여 전체 데이터 파이프라인을 만들어 워크플로를 오케스트레이션하는 방법을 보여 주지만 Databricks는 안정적이고 유지 관리 가능하며 테스트 가능한 데이터 처리 파이프라인을 빌드하기 위한 선언적 인터페이스인 Delta Live Tables를 사용하는 것이 좋습니다.
데이터 파이프라인이란?
데이터 파이프라인은 원본 시스템에서 데이터를 이동하고, 요구 사항에 따라 해당 데이터를 변환하고, 대상 시스템에 데이터를 저장하는 데 필요한 단계를 구현합니다. 데이터 파이프라인에는 원시 데이터를 사용자가 사용할 수 있는 준비된 데이터로 전환하는 데 필요한 모든 프로세스가 포함됩니다. 예를 들어 데이터 파이프라인은 데이터 분석가와 데이터 과학자가 분석 및 보고를 통해 데이터에서 가치를 추출할 수 있도록 데이터를 준비할 수 있습니다.
ETL(추출, 변환 및 로드) 워크플로는 데이터 파이프라인의 일반적인 예입니다. ETL 처리에서 데이터는 원본 시스템에서 수집되어 준비 영역에 기록되고, 요구 사항(데이터 품질 보장, 레코드 중복 제거 등)에 따라 변환된 다음, 데이터 웨어하우스 또는 데이터 레이크와 같은 대상 시스템에 기록됩니다.
데이터 파이프라인 단계별 실행
Azure Databricks에서 데이터 파이프라인 빌드를 시작하는 데 도움이 되도록 이 문서에 포함된 예제에서는 데이터 처리 워크플로를 만드는 방법을 안내합니다.
- Azure Databricks 기능을 사용하여 원시 데이터 세트를 탐색합니다.
- 원시 원본 데이터를 수집하고 대상 테이블에 원시 데이터를 쓰는 Databricks Notebook을 만듭니다.
- Databricks Notebook을 만들어 원시 원본 데이터를 변환하고 변환된 데이터를 대상 테이블에 씁니다.
- 데이터를 변환하는 쿼리로 Databricks Notebook 만들기
- Azure Databricks 작업을 사용하여 데이터 파이프라인을 자동화합니다.
요구 사항
- Azure Databricks에 로그인했으며 데이터 과학 및 엔지니어링 작업 영역에 있습니다.
- 클러스터를 만들거나 클러스터에 액세스할 수 있는 권한이 있습니다.
- (선택 사항) Unity 카탈로그에 테이블을 게시하려면 Unity 카탈로그에서 카탈로그 및 스키마를 만들어야 합니다.
예: Million Song 데이터 세트
이 예제에 사용된 데이터 세트는 Million Song 데이터 세트의 하위 집합으로, 현대 음악 트랙에 대한 기능 및 메타데이터 컬렉션입니다. 이 데이터 세트는 Azure Databricks 작업 영역에 포함된 샘플 데이터 세트에서 사용할 수 있습니다.
1단계: 클러스터 만들기
이 예제에서 데이터 처리 및 분석을 수행하려면 명령을 실행하는 데 필요한 컴퓨팅 리소스를 제공하는 클러스터를 만듭니다.
참고 항목
이 예제에서는 DBFS에 저장된 샘플 데이터 세트를 사용하고 테이블을 Unity 카탈로그에 유지하기 때문에 단일 사용자 액세스 모드로 구성된 클러스터를 만듭니다. 단일 사용자 액세스 모드는 DBFS에 대한 모든 권한을 제공하는 동시에 Unity 카탈로그에 대한 액세스를 사용하도록 설정합니다. DBFS 및 Unity 카탈로그 모범 사례를 참조하세요.
- 사이드바에서 컴퓨팅을 클릭합니다.
- 클러스터 페이지에서 클러스터 만들기를 클릭합니다.
- 새 클러스터 페이지에 클러스터의 고유 이름을 입력하세요.
- 액세스 모드에서 단일 사용자를 선택합니다.
- 단일 사용자 또는 서비스 주체 액세스에서 사용자 이름을 선택합니다.
- 기본 상태의 나머지 값을 그대로 두고, 클러스터 만들기를 클릭합니다.
Databricks 클러스터에 대한 자세한 내용은 컴퓨팅을 참조하세요.
2단계: 원본 데이터를 탐색합니다.
Azure Databricks 인터페이스를 사용하여 원시 원본 데이터를 탐색하는 방법을 알아보려면 데이터 파이프라인에 대한 원본 데이터 탐색을 참조하세요. 데이터 수집 및 준비로 직접 이동하려면 3단계: 원시 데이터 수집을 계속합니다.
3단계: 원시 데이터 수집
이 단계에서는 원시 데이터를 테이블에 로드하여 추가 처리에 사용할 수 있도록 합니다. 테이블과 같은 Databricks 플랫폼에서 데이터 자산을 관리하기 위해 Databricks는 Unity 카탈로그를 권장합니다. 그러나 Unity 카탈로그에 테이블을 게시하는 데 필요한 카탈로그 및 스키마를 만들 수 있는 권한이 없는 경우에도 Hive 메타스토어에 테이블을 게시하여 다음 단계를 완료할 수 있습니다.
데이터 수집을 위해, Databricks는 자동 로더를 사용할 것을 권장합니다. 자동 로더는 클라우드 개체 스토리지에 도착하는 새 파일을 자동으로 감지하고 처리합니다.
로드된 데이터의 스키마를 자동으로 감지하도록 자동 로더를 구성하여 데이터 스키마를 명시적으로 선언하지 않고 테이블을 초기화하고, 새 열이 도입될 때 테이블 스키마를 진화시킬 수 있습니다. 이렇게 하면 시간이 지남에 따라 스키마 변경 내용을 수동으로 추적하고 적용할 필요가 없습니다. Databricks는 자동 로더 사용 시 스키마 인터페이스를 권장합니다. 그러나 데이터 탐색 단계에서 볼 수 있듯이 음악 데이터에는 헤더 정보가 포함되지 않습니다. 헤더는 데이터와 함께 저장되지 않으므로 다음 예제와 같이 스키마를 명시적으로 정의해야 합니다.
사이드바에서 새로 만들기를 클릭하고 메뉴에서 Notebook을 선택합니다. Notebook 만들기 대화 상자가 나타납니다.
Notebook 이름을 입력합니다(예:
Ingest songs data
). 기본적으로:- Python은 선택한 언어입니다.
- Notebook이 사용한 마지막 클러스터에 연결됩니다. 이 경우 1단계: 클러스터 만들기에서 만든 클러스터입니다.
다음을 Notebook의 첫 번째 셀에 입력합니다.
from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField # Define variables used in the code below file_path = "/databricks-datasets/songs/data-001/" table_name = "<table-name>" checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data" schema = StructType( [ StructField("artist_id", StringType(), True), StructField("artist_lat", DoubleType(), True), StructField("artist_long", DoubleType(), True), StructField("artist_location", StringType(), True), StructField("artist_name", StringType(), True), StructField("duration", DoubleType(), True), StructField("end_of_fade_in", DoubleType(), True), StructField("key", IntegerType(), True), StructField("key_confidence", DoubleType(), True), StructField("loudness", DoubleType(), True), StructField("release", StringType(), True), StructField("song_hotnes", DoubleType(), True), StructField("song_id", StringType(), True), StructField("start_of_fade_out", DoubleType(), True), StructField("tempo", DoubleType(), True), StructField("time_signature", DoubleType(), True), StructField("time_signature_confidence", DoubleType(), True), StructField("title", StringType(), True), StructField("year", IntegerType(), True), StructField("partial_sequence", IntegerType(), True) ] ) (spark.readStream .format("cloudFiles") .schema(schema) .option("cloudFiles.format", "csv") .option("sep","\t") .load(file_path) .writeStream .option("checkpointLocation", checkpoint_path) .trigger(availableNow=True) .toTable(table_name) )
Unity 카탈로그를 사용하는 경우 카탈로그, 스키마 및 테이블 이름이 수집된 레코드(예:
data_pipelines.songs_data.raw_song_data
)를 포함하도록<table-name>
을(를) 바꿉니다. 그렇지 않은 경우,<table-name>
을(를) 수집된 레코드를 포함하는 테이블의 이름(예:raw_song_data
)으로 바꿉니다.<checkpoint-path>
을(를) 검사점 파일을 유지 관리하기 위한 DBFS의 디렉터리에 대한 경로(예:/tmp/pipeline_get_started/_checkpoint/song_data
)로 바꿉니다.을(를) 클릭하고 셀 실행을 선택합니다. 이 예제에서는
README
의 정보를 사용하여 데이터 스키마를 정의하고,file_path
에 포함된 모든 파일에서 노래 데이터를 수집하고,table_name
에 의하여 지정된 테이블에 데이터를 씁니다.
4단계: 원시 데이터 준비
분석을 위해 원시 데이터를 준비하기 위해 다음 단계에서는 불필요한 열을 필터링하고 새 레코드를 만들기 위한 타임스탬프가 포함된 새 필드를 추가하여 원시 노래 데이터를 변환합니다.
사이드바에서 새로 만들기를 클릭하고 메뉴에서 Notebook을 선택합니다. Notebook 만들기 대화 상자가 나타납니다.
Notebook 이름을 입력하세요. 예들 들어
Prepare songs data
입니다. 기본 언어를 SQL으로 바꿉니다.Notebook의 첫 번째 셀에 다음 코드를 입력합니다.
CREATE OR REPLACE TABLE <table-name> ( artist_id STRING, artist_name STRING, duration DOUBLE, release STRING, tempo DOUBLE, time_signature DOUBLE, title STRING, year DOUBLE, processed_time TIMESTAMP ); INSERT INTO <table-name> SELECT artist_id, artist_name, duration, release, tempo, time_signature, title, year, current_timestamp() FROM <raw-songs-table-name>
Unity 카탈로그를 사용하는 경우
<table-name>
을(를) 필터링 및 변환된 레코드(예:data_pipelines.songs_data.prepared_song_data
)가 포함되도록 카탈로그, 스키마 및 테이블 이름으로 바꿉니다. 그렇지 않은 경우,<table-name>
을(를) 필터링 및 변환된 레코드를 포함하는 테이블의 이름(예:prepared_song_data
)으로 바꿉니다.<raw-songs-table-name>
을(를) 이전 단계에서 수집한 원시 노래 레코드를 포함하는 테이블의 이름으로 바꿉니다.을(를) 클릭하고 셀 실행을 선택합니다.
5단계: 변환된 데이터 쿼리
이 단계에서는 노래 데이터를 분석하는 쿼리를 추가하여 처리 파이프라인을 확장합니다. 이러한 쿼리는 이전 단계에서 만든 준비된 레코드를 사용합니다.
사이드바에서 새로 만들기를 클릭하고 메뉴에서 Notebook을 선택합니다. Notebook 만들기 대화 상자가 나타납니다.
Notebook 이름을 입력하세요. 예들 들어
Analyze songs data
입니다. 기본 언어를 SQL으로 바꿉니다.Notebook의 첫 번째 셀에 다음 코드를 입력합니다.
-- Which artists released the most songs each year? SELECT artist_name, count(artist_name) AS num_songs, year FROM <prepared-songs-table-name> WHERE year > 0 GROUP BY artist_name, year ORDER BY num_songs DESC, year DESC
<prepared-songs-table-name>
을(를) 준비된 데이터가 포함된 테이블의 이름으로 바꿉니다. 예들 들어data_pipelines.songs_data.prepared_song_data
입니다.셀 작업 메뉴에서 을(를) 클릭하고 아래 셀 추가를 선택하고 새 셀에 다음을 입력하세요.
-- Find songs for your DJ list SELECT artist_name, title, tempo FROM <prepared-songs-table-name> WHERE time_signature = 4 AND tempo between 100 and 140;
<prepared-songs-table-name>
을(를) 이전 단계에서 만든 준비된 테이블의 이름으로 바꿉니다. 예들 들어data_pipelines.songs_data.prepared_song_data
입니다.쿼리를 실행하고 출력을 보려면 모두 실행을 클릭합니다.
6단계: 파이프라인을 실행할 Azure Databricks 작업 만들기
Azure Databricks 작업을 사용하여 데이터 수집, 처리 및 분석 단계 실행을 자동화하는 워크플로를 만들 수 있습니다.
- 데이터 과학 및 엔지니어링 작업 영역에서 다음 중 하나를 수행합니다.
- 사이드바에서 워크플로를 클릭하고 를 클릭합니다.
- 사이드바에서 새로 만들기를 클릭하고 작업을 선택합니다.
- 작업 탭에 표시되는 작업 대화 상자에서 작업 이름 추가…를 작업 이름으로 바꿉니다. 예를 들면 "음악 워크플로"입니다.
- 작업 이름에서 첫 번째 작업 이름(예:
Ingest_songs_data
)을 입력합니다. - 형식에서 Notebook 작업 종류를 선택합니다.
- 원본에서 작업 영역을 선택합니다.
- 파일 브라우저를 사용하여 데이터 수집 Notebook을 찾고 Notebook 이름을 클릭한 다음 확인을 클릭합니다.
- 클러스터에서 Shared_job_cluster 또는
Create a cluster
단계에서 만든 클러스터를 선택합니다. - 만들기를 클릭합니다.
- 방금 만든 작업 아래 을(를) 클릭하고 Notebook을 선택합니다.
- 작업 이름에 작업 이름(예:
Prepare_songs_data
)을 입력합니다. - 형식에서 Notebook 작업 종류를 선택합니다.
- 원본에서 작업 영역을 선택합니다.
- 파일 브라우저를 사용하여 데이터 준비 Notebook을 찾고 Notebook 이름을 클릭한 다음 확인을 클릭합니다.
- 클러스터에서 Shared_job_cluster 또는
Create a cluster
단계에서 만든 클러스터를 선택합니다. - 만들기를 클릭합니다.
- 방금 만든 작업 아래 을(를) 클릭하고 Notebook을 선택합니다.
- 작업 이름에 작업 이름(예:
Analyze_songs_data
)을 입력합니다. - 형식에서 Notebook 작업 종류를 선택합니다.
- 원본에서 작업 영역을 선택합니다.
- 파일 브라우저를 사용하여 데이터 분석 Notebook을 찾고 Notebook 이름을 클릭한 다음 확인을 클릭합니다.
- 클러스터에서 Shared_job_cluster 또는
Create a cluster
단계에서 만든 클러스터를 선택합니다. - 만들기를 클릭합니다.
- 워크플로를 실행하려면 을(를) 클릭합니다. 실행 세부 정보를 보려면 작업 실행 보기에서 실행의 시작 시간 열에 있는 링크를 클릭합니다. 각 작업을 클릭하여 작업 실행 세부 정보를 살펴봅니다.
- 워크플로가 완료된 경우 결과를 보려면 최종 데이터 분석 작업을 클릭합니다. 출력 페이지가 나타나고 쿼리 결과가 표시됩니다.
7단계: 데이터 파이프라인 작업 예약
참고 항목
Azure Databricks 작업을 사용하여 예약된 워크플로를 오케스트레이션하는 방법을 보여주기 위해 이 시작 예제는 수집, 준비 및 분석 단계를 별도의 Notebook으로 구분하고 각 Notebook을 사용하여 작업에서 작업을 만듭니다. 모든 처리가 단일 Notebook에 포함된 경우 Azure Databricks Notebook UI에서 직접 Notebook을 쉽게 예약할 수 있습니다. 예약된 Notebook 작업 만들기 및 관리를 참조하세요.
일반적인 요구 사항은 일정에 따라 데이터 파이프라인을 실행하는 것입니다. 파이프라인을 실행하는 작업에 대한 일정을 정의하려면 다음을 수행합니다.
- 사이드바에서 워크플로를 클릭합니다.
- 이름 열에서 작업 이름을 클릭합니다. 측면 패널에 작업 세부 정보가 표시됩니다.
- 작업 세부 정보 패널에서 트리거 추가를 클릭하고 트리거 유형에서 예약을 선택합니다.
- 기간, 시작 시간 및 표준 시간대를 지정합니다. 필요에 따라 Cron 구문 표시 확인란을 선택하여 Quartz Cron 구문에서 일정을 표시하고 편집합니다.
- 저장을 클릭합니다.
자세한 정보
- Databricks Notebook에 대한 자세한 내용은 Databricks Notebooks 소개를 참조하세요.
- Azure Databricks 작업에 대한 자세한 내용은 Databricks 작업이란?을 참조하세요.
- Delta Lake에 대한 자세한 내용은 Delta Lake란?을 참조하세요.
- Delta Live Tables를 사용한 데이터 처리 파이프라인에 대한 자세한 내용은 Delta Live Tables란?을 참조하세요.