자습서: Azure Databricks를 사용하여 데이터 추출, 변환 및 로드

이 자습서에서는 Azure Databricks를 사용하여 ETL(데이터 추출, 변환 및 로드) 작업을 수행합니다. Azure Data Lake Storage Gen2에서 Azure Databricks로 데이터를 추출하고, Azure Databricks의 데이터에 대한 변환을 실행하고, 변환된 데이터를 Azure Synapse Analytics에 로드합니다.

이 자습서의 단계에서는 Azure Databricks용 Azure Synapse 커넥터를 사용하여 Azure Databricks로 데이터를 전송합니다. 이 커넥터는 Azure Databricks 클러스터와 Azure Synapse 간에 전송되는 데이터에 대한 임시 스토리지로 Azure Blob Storage를 사용합니다.

다음 그림에서는 애플리케이션 흐름을 보여 줍니다.

Azure Databricks with Data Lake Store and Azure Synapse

이 자습서에서 다루는 작업은 다음과 같습니다.

  • Azure Databricks 서비스를 만듭니다.
  • Azure Databricks에 Spark 클러스터 만들기
  • Data Lake Storage Gen2 계정에 파일 시스템을 만듭니다.
  • Azure Data Lake Storage Gen2 계정에 샘플 데이터를 업로드합니다.
  • 서비스 주체를 생성합니다.
  • Azure Data Lake Storage Gen2 계정에서 데이터를 추출합니다.
  • Azure Databricks에서 데이터를 변환합니다.
  • Azure Synapse에 데이터를 로드합니다.

Azure 구독이 아직 없는 경우 시작하기 전에 체험 계정을 만듭니다.

참고 항목

이 자습서는 Azure 평가판 구독을 사용하여 수행할 수 없습니다. 무료 계정이 있는 경우 프로필로 이동하여 구독을 종량제로 변경합니다. 자세한 내용은 Azure 체험 계정을 참조 하세요. 그런 다음, 지출 한도를 제거하고 해당 지역의 vCPU에 대한 할당량 증가를 요청합니다. Azure Databricks 작업 영역을 만들 때 평가판(프리미엄 - 14일 무료 DPU) 가격 책정 계층을 선택하여 작업 영역에 14일 동안 무료 Premium Azure Databricks DPU에 대한 액세스 권한을 부여할 수 있습니다.

필수 조건

이 자습서를 시작하기 전에 다음 작업을 완료합니다.

필요한 정보 수집

이 자습서의 필수 조건을 완료했는지 확인합니다.

시작하기 전에 다음 정보 항목이 있어야 합니다.

✔️ 데이터베이스 이름, 데이터베이스 서버 이름, 사용자 이름, Azure Synapse의 암호.

✔️ Blob Storage 계정에 대한 액세스 키.

✔️ Azure Data Lake Storage Gen2 스토리지 계정의 이름.

✔️ 구독의 테넌트 ID.

✔️ Microsoft Entra ID(이전의 Azure Active Directory)에 등록한 앱의 애플리케이션 ID입니다.

✔️ Microsoft Entra ID(이전의 Azure Active Directory)에 등록한 앱의 인증 키입니다.

Azure Databricks 서비스 만들기

이 섹션에서는 Azure Portal을 사용하여 Azure Databricks 서비스를 만듭니다.

  1. Azure Portal 메뉴에서 리소스 만들기를 선택합니다.

    Create a resource on Azure portal

    그런 다음, Analytics>Azure Databricks를 선택합니다.

    Create Azure Databricks on Azure portal

  2. Azure Databricks 서비스 아래에서 다음 값을 입력하여 Databricks 서비스를 만듭니다.

    속성 설명
    작업 영역 이름 Databricks 작업 영역에 대한 이름을 제공합니다.
    구독 드롭다운에서 Azure 구독을 선택합니다.
    리소스 그룹 새 리소스 그룹을 만들지, 아니면 기존 그룹을 사용할지 여부를 지정합니다. 리소스 그룹은 Azure 솔루션에 대한 관련 리소스를 저장하는 컨테이너입니다. 자세한 내용은 Azure 리소스 그룹 개요를 참조하세요.
    위치 미국 서부 2를 선택합니다. 사용 가능한 다른 지역은 지역별로 사용할 수 있는 Azure 서비스를 참조하세요.
    가격 책정 계층 표준을 선택합니다.
  3. 계정 생성에는 몇 분 정도가 소요됩니다. 작업 상태 모니터링하려면 위쪽의 진행률 표시줄을 봅니다.

  4. 대시보드에 고정을 선택한 다음 만들기를 선택합니다.

Azure Databricks에서 Spark 클러스터 만들기

  1. Azure Portal에서 만든 Databricks 서비스로 이동하여 작업 영역 시작을 선택합니다.

  2. Azure Databricks 포털로 리디렉션됩니다. 포털에서 클러스터를 선택합니다.

    Databricks on Azure

  3. 새 클러스터 페이지에서 클러스터를 만들 값을 제공합니다.

    Create Databricks Spark cluster on Azure

  4. 다음 필드에 대한 값을 입력하고, 다른 필드에는 기본값을 그대로 적용합니다.

    • 클러스터의 이름을 입력합니다.

    • 비활성 __ 분 후 종료 확인란을 선택해야 합니다. 클러스터를 사용하지 않는 경우 클러스터를 종료하는 기간(분)을 제공합니다.

    • 클러스터 만들기를 선택합니다. 클러스터가 실행되면 Notebook을 클러스터에 연결하고 Spark 작업을 실행할 수 있습니다.

Azure Data Lake Storage Gen2 계정에서 파일 시스템 만들기

이 섹션에서는 Azure Databricks 작업 영역에서 Notebook을 만든 다음 코드 조각을 실행하여 스토리지 계정을 구성합니다.

  1. Azure Portal에서 본인이 만든 Azure Databricks 서비스로 이동한 다음, 작업 영역 시작을 선택합니다.

  2. 왼쪽에서 작업 영역을 선택합니다. 작업 영역 드롭다운에서 전자 필기장 만들기>를 선택합니다.

    Create a notebook in Databricks

  3. 전자 필기 장 만들기 대화 상자에서 전자 필 기장 이름을 입력합니다. 언어로 Scala를 선택한 다음, 이전에 만든 Spark 클러스터를 선택합니다.

    Provide details for a notebook in Databricks

  4. 만들기를 실행합니다.

  5. 다음 코드 블록은 Spark 세션에서 액세스하는 모든 ADLS Gen 2 계정에 대한 기본 서비스 주체 자격 증명을 설정합니다. 두 번째 코드 블록은 설정에 계정 이름을 추가하여 특정 ADLS Gen 2 계정에 대한 자격 증명을 지정합니다. Azure Databricks Notebook의 첫 번째 셀에 코드 블록 중 하나를 복사하여 붙여넣습니다.

    세션 구성

    val appID = "<appID>"
    val secret = "<secret>"
    val tenantID = "<tenant-id>"
    
    spark.conf.set("fs.azure.account.auth.type", "OAuth")
    spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    spark.conf.set("fs.azure.account.oauth2.client.id", "<appID>")
    spark.conf.set("fs.azure.account.oauth2.client.secret", "<secret>")
    spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<tenant-id>/oauth2/token")
    spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
    

    계정 구성

    val storageAccountName = "<storage-account-name>"
    val appID = "<app-id>"
    val secret = "<secret>"
    val fileSystemName = "<file-system-name>"
    val tenantID = "<tenant-id>"
    
    spark.conf.set("fs.azure.account.auth.type." + storageAccountName + ".dfs.core.windows.net", "OAuth")
    spark.conf.set("fs.azure.account.oauth.provider.type." + storageAccountName + ".dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    spark.conf.set("fs.azure.account.oauth2.client.id." + storageAccountName + ".dfs.core.windows.net", "" + appID + "")
    spark.conf.set("fs.azure.account.oauth2.client.secret." + storageAccountName + ".dfs.core.windows.net", "" + secret + "")
    spark.conf.set("fs.azure.account.oauth2.client.endpoint." + storageAccountName + ".dfs.core.windows.net", "https://login.microsoftonline.com/" + tenantID + "/oauth2/token")
    spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
    dbutils.fs.ls("abfss://" + fileSystemName  + "@" + storageAccountName + ".dfs.core.windows.net/")
    spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "false")
    
  6. 이 코드 블록에서 이 코드 블록의 <app-id>, <secret>, <tenant-id><storage-account-name> 자리 표시자 값을 이 자습서의 필수 구성 요소를 완료하는 동안 수집한 값으로 바꿉니다. <file-system-name> 자리 표시자 값을 파일 시스템에 제공하려는 이름으로 바꿉니다.

    • <app-id><secret>는 서비스 주체 만들기의 일환으로 활성 디렉터리에 등록한 앱에서 가져온 것입니다.

    • 구독 <tenant-id> 에서 온 것입니다.

    • <storage-account-name>은 Azure Data Lake Storage Gen2 스토리지 계정의 이름입니다.

  7. SHIFT + ENTER 키를 눌러 이 블록에서 코드를 실행합니다.

Azure Data Lake Storage Gen2 계정에 샘플 데이터 수집

이 섹션을 시작하기 전에 다음 필수 구성 요소를 완료해야 합니다.

노트북 셀에 다음 코드를 입력합니다.

%sh wget -P /tmp https://raw.githubusercontent.com/Azure/usql/master/Examples/Samples/Data/json/radiowebsite/small_radio_json.json

셀에서 Shift+Enter를 눌러 코드를 실행합니다.

이제 이 셀 아래의 새 셀에서 다음 코드를 입력하고 대괄호 안에 나타나는 값을 이전에 사용한 것과 동일한 값으로 바꿉니다.

dbutils.fs.cp("file:///tmp/small_radio_json.json", "abfss://" + fileSystemName + "@" + storageAccountName + ".dfs.core.windows.net/")

셀에서 Shift+Enter를 눌러 코드를 실행합니다.

Azure Data Lake Storage Gen2 계정에서 데이터 추출

  1. 이제 Azure Databricks에서 샘플 json 파일을 데이터 프레임으로 로드할 수 있습니다. 다음 코드를 새 셀에 붙여넣습니다. 대괄호 안에 표시된 자리 표시자를 사용자 고유의 값으로 바꿉니다.

    val df = spark.read.json("abfss://" + fileSystemName + "@" + storageAccountName + ".dfs.core.windows.net/small_radio_json.json")
    
  2. SHIFT + ENTER 키를 눌러 이 블록에서 코드를 실행합니다.

  3. 다음 코드를 실행하여 데이터 프레임의 내용을 확인합니다.

    df.show()
    

    다음 코드 조각과 유사한 출력이 표시됩니다.

    +---------------------+---------+---------+------+-------------+----------+---------+-------+--------------------+------+--------+-------------+---------+--------------------+------+-------------+------+
    |               artist|     auth|firstName|gender|itemInSession|  lastName|   length|  level|            location|method|    page| registration|sessionId|                song|status|           ts|userId|
    +---------------------+---------+---------+------+-------------+----------+---------+-------+--------------------+------+--------+-------------+---------+--------------------+------+-------------+------+
    | El Arrebato         |Logged In| Annalyse|     F|            2|Montgomery|234.57914| free  |  Killeen-Temple, TX|   PUT|NextSong|1384448062332|     1879|Quiero Quererte Q...|   200|1409318650332|   309|
    | Creedence Clearwa...|Logged In|   Dylann|     M|            9|    Thomas|340.87138| paid  |       Anchorage, AK|   PUT|NextSong|1400723739332|       10|        Born To Move|   200|1409318653332|    11|
    | Gorillaz            |Logged In|     Liam|     M|           11|     Watts|246.17751| paid  |New York-Newark-J...|   PUT|NextSong|1406279422332|     2047|                DARE|   200|1409318685332|   201|
    ...
    ...
    

    이제 Azure Data Lake Storage Gen2에서 Azure Databricks로 데이터를 추출했습니다.

Azure Databricks에서 데이터 변환

원시 샘플 데이터 small_radio_json.json 파일은 라디오 방송국의 대상 그룹을 캡처하고 다양한 열을 가립니다. 이 섹션에서는 데이터 세트에서 특정 열만 검색하도록 데이터를 변환합니다.

  1. 먼저 만든 데이터 프레임에서 firstName, lastName, gender, locationlevel만 검색합니다.

    val specificColumnsDf = df.select("firstname", "lastname", "gender", "location", "level")
    specificColumnsDf.show()
    

    다음 코드 조각과 같이 출력을 받습니다.

    +---------+----------+------+--------------------+-----+
    |firstname|  lastname|gender|            location|level|
    +---------+----------+------+--------------------+-----+
    | Annalyse|Montgomery|     F|  Killeen-Temple, TX| free|
    |   Dylann|    Thomas|     M|       Anchorage, AK| paid|
    |     Liam|     Watts|     M|New York-Newark-J...| paid|
    |     Tess|  Townsend|     F|Nashville-Davidso...| free|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...| free|
    |     Alan|     Morse|     M|Chicago-Napervill...| paid|
    |Gabriella|   Shelton|     F|San Jose-Sunnyval...| free|
    |   Elijah|  Williams|     M|Detroit-Warren-De...| paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...| free|
    |     Tess|  Townsend|     F|Nashville-Davidso...| free|
    |     Alan|     Morse|     M|Chicago-Napervill...| paid|
    |     Liam|     Watts|     M|New York-Newark-J...| paid|
    |     Liam|     Watts|     M|New York-Newark-J...| paid|
    |   Dylann|    Thomas|     M|       Anchorage, AK| paid|
    |     Alan|     Morse|     M|Chicago-Napervill...| paid|
    |   Elijah|  Williams|     M|Detroit-Warren-De...| paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...| free|
    |     Alan|     Morse|     M|Chicago-Napervill...| paid|
    |   Dylann|    Thomas|     M|       Anchorage, AK| paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...| free|
    +---------+----------+------+--------------------+-----+
    
  2. 이 데이터를 추가로 변환하여 열 수준의이름을 subscription_type 수 있습니다.

    val renamedColumnsDF = specificColumnsDf.withColumnRenamed("level", "subscription_type")
    renamedColumnsDF.show()
    

    다음 코드 조각과 같이 출력을 받습니다.

    +---------+----------+------+--------------------+-----------------+
    |firstname|  lastname|gender|            location|subscription_type|
    +---------+----------+------+--------------------+-----------------+
    | Annalyse|Montgomery|     F|  Killeen-Temple, TX|             free|
    |   Dylann|    Thomas|     M|       Anchorage, AK|             paid|
    |     Liam|     Watts|     M|New York-Newark-J...|             paid|
    |     Tess|  Townsend|     F|Nashville-Davidso...|             free|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...|             free|
    |     Alan|     Morse|     M|Chicago-Napervill...|             paid|
    |Gabriella|   Shelton|     F|San Jose-Sunnyval...|             free|
    |   Elijah|  Williams|     M|Detroit-Warren-De...|             paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...|             free|
    |     Tess|  Townsend|     F|Nashville-Davidso...|             free|
    |     Alan|     Morse|     M|Chicago-Napervill...|             paid|
    |     Liam|     Watts|     M|New York-Newark-J...|             paid|
    |     Liam|     Watts|     M|New York-Newark-J...|             paid|
    |   Dylann|    Thomas|     M|       Anchorage, AK|             paid|
    |     Alan|     Morse|     M|Chicago-Napervill...|             paid|
    |   Elijah|  Williams|     M|Detroit-Warren-De...|             paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...|             free|
    |     Alan|     Morse|     M|Chicago-Napervill...|             paid|
    |   Dylann|    Thomas|     M|       Anchorage, AK|             paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...|             free|
    +---------+----------+------+--------------------+-----------------+
    

Azure Synapse에 데이터 로드

이 섹션에서는 변환된 데이터를 Azure Synapse에 업로드합니다. Azure Databricks용 Azure Synapse 커넥터를 사용하여 Synapse Spark 풀에서 데이터 프레임을 테이블로 직접 업로드합니다.

앞에서 멘션 Azure Synapse 커넥터는 Azure Blob Storage를 임시 스토리지로 사용하여 Azure Databricks와 Azure Synapse 간에 데이터를 업로드합니다. 따라서 먼저 스토리지 계정에 연결할 구성을 제공합니다. 이 문서의 필수 구성 요소의 일부로 계정을 이미 만들었어야 합니다.

  1. Azure Databricks에서 Azure Storage 계정에 액세스하는 구성을 제공합니다.

    val blobStorage = "<blob-storage-account-name>.blob.core.windows.net"
    val blobContainer = "<blob-container-name>"
    val blobAccessKey =  "<access-key>"
    
  2. Azure Databricks와 Azure Synapse 간에 데이터를 이동하는 동안 사용할 임시 폴더를 지정합니다.

    val tempDir = "wasbs://" + blobContainer + "@" + blobStorage +"/tempDirs"
    
  3. 다음 코드 조각을 실행하여 구성에 Azure Blob Storage 액세스 키를 저장합니다. 이렇게 하면 Notebook의 액세스 키를 일반 텍스트로 유지할 필요가 없습니다.

    val acntInfo = "fs.azure.account.key."+ blobStorage
    sc.hadoopConfiguration.set(acntInfo, blobAccessKey)
    
  4. Azure Synapse 인스턴스에 연결할 값을 제공합니다. 필수 조건으로 Azure Synapse Analytics 서비스를 만들었어야 합니다. dwServer의 정규화된 서버 이름을 사용합니다. 예: <servername>.database.windows.net.

    //Azure Synapse related settings
    val dwDatabase = "<database-name>"
    val dwServer = "<database-server-name>"
    val dwUser = "<user-name>"
    val dwPass = "<password>"
    val dwJdbcPort =  "1433"
    val dwJdbcExtraOptions = "encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
    val sqlDwUrl = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass + ";$dwJdbcExtraOptions"
    val sqlDwUrlSmall = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass
    
  5. 다음 코드 조각을 실행하여 변환된 데이터 프레임인 RenamedColumnsDF를 Azure Synapse의 테이블로 로드합니다. 이 코드 조각은 SQL 데이터베이스에 SampleTable이라는 테이블을 만듭니다.

    spark.conf.set(
        "spark.sql.parquet.writeLegacyFormat",
        "true")
    
    renamedColumnsDF.write.format("com.databricks.spark.sqldw").option("url", sqlDwUrlSmall).option("dbtable", "SampleTable")       .option( "forward_spark_azure_storage_credentials","True").option("tempdir", tempDir).mode("overwrite").save()
    

    참고 항목

    이 샘플에서는 플래그를 forward_spark_azure_storage_credentials 사용하여 Azure Synapse가 액세스 키를 사용하여 Blob Storage의 데이터에 액세스합니다. 이것이 유일하게 지원되는 인증 방법입니다.

    Azure Blob Storage가 가상 네트워크를 선택하도록 제한된 경우 Azure Synapse에는 액세스 키 대신 관리 서비스 ID가 필요합니다. 이렇게 하면 "이 요청은 작업을 수행할 권한이 없습니다." 오류가 발생합니다.

  6. SQL 데이터베이스에 연결하여 SampleTable이라는 데이터베이스가 있는지 확인합니다.

    Verify the sample table

  7. 선택 쿼리를 실행하여 테이블의 내용을 확인합니다. 이 테이블에 renamedColumnsDF 데이터 프레임과 똑같은 데이터가 있어야 합니다.

    Verify the sample table content

리소스 정리

자습서를 마친 후에는 클러스터를 종료해도 됩니다. Azure Databricks 작업 영역에서 왼쪽의 클러스터를 선택합니다. 클러스터가 종료되도록 하려면 작업 아래에서 줄임표(...)를 가리키고 종료 아이콘을 선택합니다.

Stop a Databricks cluster

클러스터를 만들 때 비활성 __ 분 후 종료 확인란을 선택한 경우 클러스터를 수동으로 종료하지 않으면 클러스터가 자동으로 중지됩니다. 이 경우 지정된 시간 동안 비활성 상태인 경우 클러스터가 자동으로 중지됩니다.

다음 단계

이 자습서에서는 다음 작업 방법을 알아보았습니다.

  • Azure Databricks 서비스 만들기
  • Azure Databricks에서 Spark 클러스터 만들기
  • Azure Databricks에서 Notebook 만들기
  • Data Lake Storage Gen2 계정에서 데이터 추출
  • Azure Databricks에서 데이터 변환
  • Azure Synapse에 데이터 로드