Apache Airflow로 Azure Databricks 작업 조정

이 문서에서는 Azure Databricks를 사용하여 데이터 파이프라인을 오케스트레이션하기 위한 Apache Airflow 지원에 대해 설명하고, Airflow를 로컬로 설치 및 구성하기 위한 지침을 제공하며, Airflow를 사용하여 Azure Databricks 워크플로를 배포하고 실행하는 예제를 제공합니다.

데이터 파이프라인의 작업 오케스트레이션

데이터 처리 파이프라인을 개발하고 배포하려면 태스크 간의 복잡한 종속성을 관리해야 하는 경우가 많습니다. 예를 들어 파이프라인은 원본에서 데이터를 읽고, 데이터를 클린, 클린 데이터를 변환하고, 변환된 데이터를 대상에 쓸 수 있습니다. 또한 파이프라인을 운영할 때 오류 테스트, 예약 및 문제 해결을 지원해야 합니다.

워크플로 시스템은 작업 간의 종속성을 정의하고, 파이프라인이 실행되는 시기를 예약하고, 워크플로를 모니터링할 수 있도록 하여 이러한 문제를 해결합니다. Apache Airflow는 데이터 파이프라인을 관리하고 예약하기 위한 오픈 소스 솔루션입니다. Airflow는 데이터 파이프라인을 작업의 지시된 DAG(방향성 비순환 그래프)로 나타냅니다. Python 파일에서 워크플로를 정의하고 Airflow는 예약 및 실행을 관리합니다. Airflow Azure Databricks 연결을 사용하면 Airflow의 예약 기능을 통해 Azure Databricks에서 제공하는 최적화된 Spark 엔진을 활용할 수 있습니다.

요구 사항

  • Airflow와 Azure Databricks 간의 통합을 위해서는 Airflow 버전 2.5.0 이상이 필요합니다. 이 문서의 예제는 Airflow 버전 2.6.1로 테스트됩니다.
  • 공기 흐름에는 Python 3.8, 3.9, 3.10 또는 3.11이 필요합니다. 이 문서의 예제는 Python 3.8을 사용하여 테스트됩니다.
  • Airflow를 설치하고 실행하려면 이 문서의 지침에 따라 Python 가상 환경을 만들려면 pipenv필요합니다.

Databricks에 대한 Airflow 연산자

Airflow DAG는 각 태스크가 Airflow 운영자를 실행하는 작업으로 구성됩니다. Databricks에 통합을 지원하는 Airflow 운영자는 Databricks 공급자에서 구현됩니다.

Databricks 공급자에는 데이터 가져오기, SQL 쿼리 실행, Databricks Git 폴더 작업 Azure Databricks 작업 영역에 대해 여러 작업을 실행하는 연산자가 포함되어 있습니다.

Databricks 공급자는 작업을 트리거하는 두 개의 연산자를 구현합니다.

  • DatabricksRunNowOperator에는 기존 Azure Databricks 작업이 필요하며 POST /api/2.1/jobs/run-now API 요청을 사용하여 실행을 트리거합니다. Databricks는 작업 정의의 중복을 줄이고 이 연산자를 사용하여 트리거된 작업 실행을 작업 UI에서 찾을 수 있기 때문에 사용하는 것이 좋습니다DatabricksRunNowOperator.
  • DatabricksSubmitRunOperatorAzure Databricks에 있는 작업이 필요하지 않으며 POST /api/2.1/jobs/runs/submit API 요청을 사용하여 작업 사양을 제출하고 실행을 트리거합니다.

새 Azure Databricks 작업을 만들거나 기존 작업을 다시 설정하기 위해 Databricks 공급자는 DatabricksCreateJobsOperator를 구현합니다. POST DatabricksCreateJobsOperator /api/2.1/jobs/createPOST /api/2.1/jobs/reset API 요청을 사용합니다. 이 DatabricksCreateJobsOperatorDatabricksRunNowOperator 기능을 사용하여 작업을 만들고 실행할 수 있습니다.

참고 항목

Databricks 연산자를 사용하여 작업을 트리거하려면 Databricks 연결 구성에서 자격 증명을 제공해야 합니다. Airflow에 대한 Azure Databricks 개인용 액세스 토큰 만들기를 참조하세요.

Databricks Airflow 연산자는 작업 실행 페이지 URL을 에어플로 로그에 마다 polling_period_seconds 씁니다(기본값은 30초). 자세한 내용은 Airflow 웹 사이트의 apache-airflow-providers-databricks 패키지 페이지를 참조하세요.

Airflow Azure Databricks 통합을 로컬로 설치

테스트 및 개발을 위해 Airflow 및 Databricks 공급자를 로컬로 설치하려면 다음 단계를 사용합니다. 프로덕션 설치 만들기를 비롯한 다른 Airflow 설치 옵션은 Airflow 설명서의 설치를 참조하세요.

터미널을 열고 다음 명령을 실행합니다.

mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>

<lastname>사용자 이름 및 <email> 전자 메일로 대체<firstname>합니다. 관리자의 암호를 입력하라는 메시지가 표시됩니다. Airflow UI에 로그인해야 하므로 이 암호를 저장해야 합니다.

이 스크립트는 다음 단계를 수행합니다.

  1. 이름이 지정된 airflow 디렉터리를 만들고 해당 디렉터리로 변경합니다.
  2. Python 가상 환경을 만들고 생성하는 데 사용합니다 pipenv . Databricks는 Python 가상 환경을 사용하여 패키지 버전 및 코드 종속성을 해당 환경으로 격리하는 것이 좋습니다. 이러한 격리는 예기치 않은 패키지 버전 불일치 및 코드 종속성 충돌을 줄이는 데 도움이 됩니다.
  3. 디렉터리의 경로로 설정된 환경 AIRFLOW_HOME 변수를 airflow 초기화합니다.
  4. Airflow 및 Airflow Databricks 공급자 패키지를 설치합니다.
  5. airflow/dags 디렉터리를 만듭니다. Airflow는 dags 디렉터리를 사용하여 DAG 정의를 저장합니다.
  6. Airflow에서 메타데이터를 추적하는 데 사용하는 SQLite 데이터베이스를 초기화합니다. 프로덕션 Airflow 배포에서는 표준 데이터베이스를 사용하여 Airflow를 구성합니다. Airflow 배포에 대한 SQLite 데이터베이스 및 기본 구성이 airflow 디렉터리에서 초기화됩니다.
  7. Airflow에 대한 관리 사용자를 만듭니다.

Databricks 공급자의 설치를 확인하려면 Airflow 설치 디렉터리에서 다음 명령을 실행합니다.

airflow providers list

Airflow 웹 서버 및 스케줄러 시작

Airflow UI를 보려면 Airflow 웹 서버가 필요합니다. 웹 서버를 시작하려면 Airflow 설치 디렉터리에서 터미널을 열고 다음 명령을 실행합니다.

참고 항목

포트 충돌로 인해 Airflow 웹 서버를 시작하지 못하는 경우 Airflow 구성에서 기본 포트를 변경할 수 있습니다.

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver

스케줄러는 DAG를 예약하는 Airflow 구성 요소입니다. 스케줄러를 시작하려면 Airflow 설치 디렉터리에서 새 터미널을 열고 다음 명령을 실행합니다.

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler

Airflow 설치 테스트

Airflow 설치를 확인하려면 Airflow에 포함된 예제 DAG 중 하나를 실행할 수 있습니다.

  1. 브라우저 창에서 http://localhost:8080/home을(를) 엽니다. Airflow를 설치할 때 만든 사용자 이름 및 암호를 사용하여 Airflow UI에 로그인합니다. Airflow DAG 페이지가 나타납니다.
  2. DAG 일시 중지/일시 중지 해제 토글을 클릭하여 예제 DAG 중 하나를 일시 중지 해제합니다(예example_python_operator).
  3. DAG 트리거 단추를 클릭하여 예제 DAG트리거합니다.
  4. DAG의 실행 상태를 포함하여 세부 정보를 보려면 DAG 이름을 클릭합니다.

Airflow에 대한 Azure Databricks 개인용 액세스 토큰 만들기

Airflow는 Azure Databricks PAT(개인용 액세스 토큰)를 사용하여 Databricks에 연결합니다. PAT를 만들려면 다음을 수행합니다.

  1. Azure Databricks 작업 영역의 위쪽 표시줄에서 Azure Databricks 사용자 이름을 클릭한 다음 드롭다운에서 설정 선택합니다.
  2. 개발자를 클릭합니다.
  3. 액세스 토큰 옆에 있는 관리를 클릭합니다.
  4. 새 토큰 생성을 클릭합니다.
  5. (선택 사항) 나중에 이 토큰을 식별할 수 있도록 하는 설명을 입력하고 토큰의 기본 수명을 90일로 변경합니다. 수명이 없는 토큰을 만들려면(권장하지 않음) 수명(일) 상자를 비워 둡니다(공백).
  6. 생성을 클릭합니다.
  7. 표시된 토큰을 안전한 위치에 복사한 다음 완료를 클릭합니다.

참고 항목

복사한 토큰을 안전한 위치에 저장합니다. 복사한 토큰을 다른 사용자와 공유하지 마세요. 복사한 토큰을 분실하면 정확히 동일한 토큰을 다시 생성할 수 없습니다. 대신 이 프로시저를 반복하여 새 토큰을 만들어야 합니다. 복사한 토큰이 손실되었거나 토큰이 손상되었다고 생각되는 경우 Databricks는 액세스 토큰 페이지의 토큰 옆에 있는 휴지통(해지) 아이콘을 클릭하여 작업 영역에서 해당 토큰 을 즉시 삭제하는 것이 좋습니다.

작업 영역에서 토큰을 만들거나 사용할 수 없는 경우 작업 영역 관리자가 토큰을 사용하지 않도록 설정했거나 토큰을 만들거나 사용할 수 있는 권한을 부여하지 않았기 때문일 수 있습니다. 작업 영역 관리자 또는 다음을 참조하세요.

참고 항목

보안 모범 사례로, 자동화된 도구, 시스템, 스크립트 및 앱을 사용하여 인증하는 경우 Databricks는 작업 영역 사용자 대신 서비스 주체에 속한 개인용 액세스 토큰을 사용하는 것이 좋습니다. 서비스 주체에 대한 토큰을 만들려면 서비스 주체에 대한 토큰 관리를 참조하세요.

Microsoft Entra ID(이전의 Azure Active Directory) 토큰을 사용하여 Azure Databricks에 인증할 수도 있습니다. Airflow 설명서의 Databricks 커넥트ion을 참조하세요.

Azure Databricks 연결 구성

Airflow 설치에는 Azure Databricks에 대한 기본 연결이 포함되어 있습니다. 위에서 만든 개인용 액세스 토큰을 사용하여 작업 영역에 연결하도록 연결을 업데이트하려면 다음을 수행합니다.

  1. 브라우저 창에서 http://localhost:8080/connection/list/을(를) 엽니다. 로그인하라는 메시지가 표시되면 관리자 사용자 이름과 암호를 입력합니다.
  2. Conn ID에서 databricks_default를 찾아 레코드 편집 단추를 클릭합니다.
  3. 예를 들어 호스트 필드의 값을 Azure Databricks 배포의 작업 영역 인스턴스 이름으로 바꿉니다.https://adb-123456789.cloud.databricks.com
  4. 암호 필드에 Azure Databricks 개인용 액세스 토큰을 입력합니다.
  5. 저장을 클릭합니다.

Microsoft Entra ID 토큰을 사용하는 경우 인증 구성에 대한 자세한 내용은 Airflow 설명서의 Databricks 커넥트ion을 참조하세요.

예: Azure Databricks 작업을 실행하는 Airflow DAG 만들기

다음 예제에서는 로컬 컴퓨터에서 실행되고 Azure Databricks에서 실행을 트리거하는 예제 DAG를 배포하는 간단한 Airflow 배포를 만드는 방법을 보여 줍니다. 이 예에서는 다음을 수행합니다.

  1. 새 Notebook을 만들고 구성된 매개 변수를 기반으로 인사말을 인쇄하는 코드를 추가합니다.
  2. Notebook을 실행하는 단일 작업을 사용하여 Azure Databricks 작업을 만듭니다.
  3. Azure Databricks 작업 영역에 대한 Airflow 연결을 구성합니다.
  4. Airflow DAG를 만들어 Notebook 작업을 트리거합니다. DatabricksRunNowOperator을(를) 사용하여 Python 스크립트에서 DAG를 정의합니다.
  5. Airflow UI를 사용하여 DAG를 트리거하고 실행 상태를 확인합니다.

Notebook 만들기

이 예제에서는 두 개의 셀이 포함된 Notebook을 사용합니다.

  • 첫 번째 셀에는 기본값 world(으)로 설정된 변수 greeting을(를) 정의하는 Databricks Utilities 텍스트 위젯이 포함되어 있습니다.
  • 두 번째 셀은 접두사 hello로 된 greeting 변수의 값을 인쇄합니다.

Notebook을 만들려면 다음을 수행합니다.

  1. Azure Databricks 작업 영역으로 이동하여 사이드바에서 새로 만들기를 클릭하고새 아이콘 Notebook을 선택합니다.

  2. Notebook에 Hello Airflow와 같은 이름을 지정하고 기본 언어가 Python으로 설정되어 있는지 확인합니다.

  3. 다음 Python 코드를 복사하여 Notebook의 첫 번째 셀에 붙여넣습니다.

    dbutils.widgets.text("greeting", "world", "Greeting")
    greeting = dbutils.widgets.get("greeting")
    
  4. 첫 번째 셀 아래에 새 셀을 추가하고 다음 Python 코드를 복사하여 새 셀에 붙여넣습니다.

    print("hello {}".format(greeting))
    

작업 만들기

  1. 사이드바에서 워크플로를 클릭합니다작업 아이콘.

  2. 작업 만들기 단추을 클릭합니다.

    작업 탭에 작업 만들기 대화 상자가 표시됩니다.

    첫 번째 작업 만들기 대화 상자

  3. 작업 이름 추가...를 작업 이름으로 바꿉니다.

  4. 작업 이름 필드에 작업의 이름(예: greeting-task)을 입력합니다.

  5. 형식 드롭다운 메뉴에서 전자 필기장을 선택합니다.

  6. 원본 드롭다운 메뉴에서 작업 영역을 선택합니다.

  7. 경로 텍스트 상자를 클릭하고 파일 브라우저를 사용하여 만든 전자 필기장을 찾고 전자 필기장 이름을 클릭한 다음 확인을 클릭합니다.

  8. 매개 변수 아래에서 추가를 클릭합니다. 필드에 greeting을(를) 입력합니다. 필드에 Airflow user를 입력합니다.

  9. 작업 만들기를 클릭합니다.

작업 세부 정보 패널에서 작업 ID 값을 복사합니다. 이 값은 Airflow에서 작업을 트리거하는 데 필요합니다.

작업 실행

Azure Databricks 워크플로 UI에서 새 작업을 테스트하려면 오른쪽 위 모서리를 클릭합니다 지금 실행 단추 . 실행이 완료되면 작업 실행 세부 정보를 확인하여 출력을 확인할 수 있습니다.

새 Airflow DAG 만들기

Python 파일에서 Airflow DAG를 정의합니다. 예제 Notebook 작업을 트리거하는 DAG를 만들려면 다음을 수행합니다.

  1. 텍스트 편집기 또는 IDE에서 다음 내용으로 databricks_dag.py라고 명명된 새 파일을 만듭니다.

    from airflow import DAG
    from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
      'owner': 'airflow'
    }
    
    with DAG('databricks_dag',
      start_date = days_ago(2),
      schedule_interval = None,
      default_args = default_args
      ) as dag:
    
      opr_run_now = DatabricksRunNowOperator(
        task_id = 'run_now',
        databricks_conn_id = 'databricks_default',
        job_id = JOB_ID
      )
    

    JOB_ID을(를) 이전에 저장한 작업 ID의 값으로 바꿉니다.

  2. 파일을 airflow/dags 디렉터리에 저장합니다. Airflow는 자동으로 airflow/dags/에 저장된 DAG 파일을 읽고 설치합니다.

Airflow에서 DAG 설치 및 확인

Airflow UI에서 DAG를 트리거하고 확인하려면 다음을 수행합니다.

  1. 브라우저 창에서 http://localhost:8080/home을(를) 엽니다. Airflow DAG 화면이 나타납니다.
  2. databricks_dag을(를) 찾아 DAG 일시 중지/일시 중지 해제 토글을 클릭하여 DAG를 일시 중지 해제합니다.
  3. DAG 트리거 단추를 클릭하여 DAG트리거합니다.
  4. 실행 열에서 실행을 클릭하여 실행의 상태 및 세부 정보를 확인합니다.