자습서: Azure Data Lake Storage Gen2, Azure Databricks 및 Spark

이 자습서에서는 Azure Databricks 클러스터를 Azure Data Lake Storage Gen2가 활성화된 Azure 스토리지 계정에 저장된 데이터에 연결하는 방법을 보여줍니다. 이 연결을 통해 기본적으로 사용자의 클러스터에서 데이터에 대한 쿼리 및 분석을 실행할 수 있습니다.

이 자습서에서는 다음을 수행합니다.

  • 스토리지 계정으로 비구조적 데이터 수집
  • Blob 스토리지의 데이터에 대한 분석 실행

Azure 구독이 아직 없는 경우 시작하기 전에 체험 계정을 만듭니다.

필수 조건

Azure Databricks 작업 영역, 클러스터, Notebook 만들기

  1. Azure Databricks 작업 영역 만들기 Azure Databricks 작업 영역 만들기를 참조하세요.

  2. 클러스터를 만듭니다. 클러스터 만들기를 참조하세요.

  3. Notebook을 만듭니다. Notebook 만들기를 참조하세요. Python을 Notebook의 기본 언어로 선택합니다.

Notebook을 열어 두세요. 다음 섹션에서 필요합니다.

비행 데이터 다운로드

이 자습서에서는 교통통계국의 2016년 1월 정시 성능 비행 데이터를 사용하여 ETL 작업을 수행하는 방법을 보여줍니다. 자습서를 완료하려면 이 데이터를 다운로드해야 합니다.

  1. On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip 파일을 다운로드합니다. 이 파일에는 플라이트 데이터가 포함되어 있습니다.

  2. 압축된 파일을 풀고, 파일 이름 및 파일 경로를 적어 둡니다. 이후 단계에서 이 정보가 필요합니다.

정시 보고 성능 데이터에 캡처된 정보에 대해 알아보려면 교통통계국 웹 사이트에서 필드 설명을 볼 수 있습니다.

데이터 수집

이 섹션에서는 .csv 비행 데이터를 Azure Data Lake Storage Gen2 계정에 업로드한 다음, 스토리지 계정을 Databricks 클러스터에 탑재합니다. 마지막으로 Databricks를 사용하여 .csv 비행 데이터를 읽고 Apache parquet 형식의 스토리지에 다시 씁니다.

스토리지 계정에 비행 데이터를 업로드합니다.

AzCopy를 사용하여 .csv 파일을 Azure Data Lake Storage Gen2 계정으로 복사합니다. 이 azcopy make 명령을 사용하여 스토리지 계정에 컨테이너를 만듭니다. 그런 다음, azcopy copy 명령을 사용하여 방금 다운로드한 csv데이터를 해당 컨테이너의 디렉터리에 복사합니다.

다음 단계에서는 만들려는 컨테이너의 이름과 비행 데이터를 업로드할 디렉터리 및 Blob을 컨테이너에 입력해야 합니다. 각 단계에서 제안된 이름을 사용하거나 컨테이너, 디렉터리 및 Blob에 대한 명명 규칙을 관찰하는 고유한 이름을 지정할 수 있습니다.

  1. 명령 프롬프트 창을 열고 다음 명령을 입력하여 Azure Active Directory에 로그인하여 스토리지 계정에 액세스합니다.

    azcopy login
    

    명령 프롬프트 창에 나타나는 지침에 따라 사용자 계정을 인증합니다.

  2. 스토리지 계정에 비행 데이터를 저장할 컨테이너를 만들려면 다음 명령을 입력합니다.

    azcopy make  "https://<storage-account-name>.dfs.core.windows.net/<container-name>" 
    
    • <storage-account-name> 자리 표시자 값을 스토리지 계정 이름으로 바꿉니다.

    • <container-name> 자리 표시자를 csv 데이터를 저장하기 위해 만들려는 컨테이너의 이름(예: flight-data-container)으로 바꿉니다.

  3. csv 데이터를 스토리지 계정에 업로드(복사) 하려면 다음 명령을 입력합니다.

    azcopy copy "<csv-folder-path>" https://<storage-account-name>.dfs.core.windows.net/<container-name>/<directory-name>/On_Time.csv
    
    • <csv-folder-path> 자리 표시자 값을 .csv 파일의 경로로 바꿉니다.

    • <storage-account-name> 자리 표시자 값을 스토리지 계정 이름으로 바꿉니다.

    • <container-name> 자리 표시자를 스토리지 계정의 컨테이너 이름으로 바꿉니다.

    • <directory-name> 자리 표시자를 컨테이너에 데이터를 저장할 디렉터리의 이름으로 바꿉니다(예: jan2016).

Databricks 클러스터에 스토리지 계정을 탑재합니다.

이 섹션에서는 Azure Data Lake Storage Gen2 클라우드 개체 스토리지를 DBFS(Databricks 파일 시스템)에 탑재합니다. 스토리지 계정으로 인증하기 위해 이전에 만든 Azure AD 서비스 원칙을 사용합니다. 자세한 내용은 Azure Databricks에 클라우드 개체 스토리지 탑재를 참조하세요.

  1. 클러스터에 Notebook을 연결합니다.

    1. 이전에 만든 Notebook에서 Notebook 도구 모음의 오른쪽 위 모서리에 있는 연결 단추를 선택합니다. 이 단추는 컴퓨팅 선택기를 엽니다. (Notebook을 클러스터에 이미 연결한 경우 해당 클러스터의 이름이 연결이 아닌 단추 문자에 표시됩니다).

    2. 클러스터 드롭다운 메뉴에서 이전에 만든 클러스터를 선택합니다.

    3. 클러스터 선택기에서 문자가 시작으로 변경됩니다. 계속하기 전에 클러스터 시작이 완료되고 클러스터 이름이 단추에 나타날 때까지 기다립니다.

  2. 다음 코드 블록을 복사하여 첫 번째 셀에 붙여넣습니다. 하지만 이 코드를 아직 실행하지 마십시오.

    configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": "<appId>",
           "fs.azure.account.oauth2.client.secret": "<clientSecret>",
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenantId>/oauth2/token",
           "fs.azure.createRemoteFileSystemDuringInitialization": "true"}
    
    dbutils.fs.mount(
    source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<directory-name>",
    mount_point = "/mnt/flightdata",
    extra_configs = configs)
    
  3. 이 코드 블록에서:

    • configs 에서, <appId>, <clientSecret><tenantId> 자리 표시자 값을 필수 구성 요소에서 서비스 주체를 만들 때 복사한 애플리케이션 ID, 클라이언트 암호 및 테넌트 ID로 바꿉니다.

    • source URI에서 자리표시자 값 <storage-account-name>, <container-name><directory-name>을(를) Azure Data Lake Storage Gen2 스토리지 계정의 이름과 비행 데이터를 스토리지 계정에 업로드할 때 지정한 컨테이너 및 디렉터리의 이름으로 바꿉니다.

      참고 항목

      URI의 체계 식별자 abfss은(는) Databricks에 TLS(전송 계층 보안)와 함께 Azure Blob File System 드라이버를 사용하도록 지시합니다. URI에 대한 자세한 내용은 Azure Data Lake Storage Gen2 URI 사용을 참조하세요.

  4. 계속하기 전에 클러스터 시작이 완료되었는지 확인합니다.

  5. 이 블록에서 코드를 실행하려면 SHIFT + ENTER 키를 누릅니다.

이제 스토리지 계정에서 비행 데이터를 업로드한 컨테이너 및 디렉터리가 탑재 지점 /mnt/flightdata를 통해 Notebook에서 액세스할 수 있습니다.

Databricks Notebook을 사용하여 CSV를 Parquet로 변환

이제 DBFS 탑재 지점을 통해 csv 비행 데이터에 액세스할 수 있으므로 Apache Spark DataFrame을 사용하여 작업 영역에 로드하고 Apache parquet 형식으로 Azure Data Lake Storage Gen2 개체 스토리지에 다시 쓸 수 있습니다.

  • Spark DataFrame은 잠재적으로 서로 다른 형식의 열이 있는 2차원 레이블이 지정된 데이터 구조입니다. DataFrame을 사용하여 지원되는 다양한 형식으로 데이터를 쉽게 읽고 쓸 수 있습니다. DataFrame을 사용하면 클라우드 개체 스토리지의 기본 데이터에 영향을 주지 않고 클라우드 개체 스토리지에서 데이터를 로드하고 컴퓨팅 클러스터 내에서 분석 및 변환을 수행할 수 있습니다. 자세히 알아보려면 Azure Databricks에서 PySpark DataFrames과 함께 작업을 참조하세요.

  • Apache Parquet은 쿼리 속도를 높이기 위한 최적화를 제공하는 열 파일 형식입니다. CSV 또는 JSON보다 효율적인 파일 형식입니다. 자세히 알아보려면 Parquet Files를 참조하세요.

Notebook에 새 셀을 추가하고 다음 코드를 붙여넣습니다.

# Use the previously established DBFS mount point to read the data.
# Create a DataFrame to read the csv data.
# The header option specifies that the first row of data should be used as the DataFrame column names
# The inferschema option specifies that the column data types should be inferred from the data in the file
flight_df = spark.read.format('csv').options(
    header='true', inferschema='true').load("/mnt/flightdata/*.csv")

# Read the airline csv file and write the output to parquet format for easy query.
flight_df.write.mode("append").parquet("/mnt/flightdata/parquet/flights")
print("Done")

이 블록에서 코드를 실행하려면 SHIFT + ENTER 키를 누릅니다.

다음 섹션으로 진행하기 전에 모든 parquet 데이터가 기록되었는지 확인하고 출력에 "완료"가 표시되는지 확인합니다.

데이터 탐색

이 섹션에서는 Databricks 파일 시스템 유틸리티를 사용하여 이전 섹션에서 만든 DBFS 탑재 지점을 사용하여 Azure Data Lake Storage Gen2 개체 스토리지를 탐색합니다.

새 셀에 다음 코드를 붙여넣어 탑재 지점의 파일 목록을 가져옵니다. 첫 번째 명령은 파일 및 디렉터리 목록을 출력합니다. 두 번째 명령은 더 쉽게 읽을 수 있는 테이블 형식으로 출력을 표시합니다.

dbutils.fs.ls("/mnt/flightdata")
display(dbutils.fs.ls("/mnt/flightdata"))

이 블록에서 코드를 실행하려면 SHIFT + ENTER 키를 누릅니다.

parquet 디렉터리가 목록에 표시됩니다. 이전 섹션의 parquet/flights 디렉터리에 .csv 비행 데이터를 parquet 형식으로 저장했습니다. parquet/flights 디렉터리에 파일을 나열하려면 다음 코드를 새 셀에 붙여넣고 실행합니다.

display(dbutils.fs.ls("/mnt/flightdata/parquet/flights"))

새 파일을 만들고 나열하려면 다음 코드를 새 셀에 붙여넣고 실행합니다.

dbutils.fs.put("/mnt/flightdata/mydirectory/mysubdirectory/1.txt", "Hello, World!", True)
display(dbutils.fs.ls("/mnt/flightdata/mydirectory/mysubdirectory"))

이 자습서에서는 1.txt 파일이 필요하지 않으므로 다음 코드를 셀에 붙여넣고 실행하여 mydirectory를 재귀적으로 삭제할 수 있습니다. True 매개 변수는 재귀 삭제를 나타냅니다.

dbutils.fs.rm("/mnt/flightdata/mydirectory", True)

편의를 위해 도움말 명령을 사용하여 다른 명령에 대한 세부 정보를 알아볼 수 있습니다.

dbutils.fs.help("rm")

이러한 코드 샘플을 통해 Azure Data Lake Storage Gen2가 사용되는 스토리지 계정에 저장된 데이터를 사용하여 HDFS의 계층적 특성을 살펴보았습니다.

데이터 쿼리

다음으로, 스토리지 계정에 업로드한 데이터를 쿼리할 수 있습니다. 다음 코드 블록을 각각 새 셀에 입력하고, SHIFT + ENTER를 눌러 Python 스크립트를 실행합니다.

DataFrames는 일반적인 데이터 분석 문제를 효율적으로 해결할 수 있는 다양한 함수 집합(열 선택, 필터, 조인, 집계)를 제공합니다.

이전에 저장된 parquet 비행 데이터에서 DataFrame을 로드하고 지원되는 기능 중 일부를 탐색하려면 이 스크립트를 새 셀에 입력하고 실행합니다.

# Read the existing parquet file for the flights database that was created earlier
flight_df = spark.read.parquet("/mnt/flightdata/parquet/flights")

# Print the schema of the dataframe
flight_df.printSchema()

# Print the flight database size
print("Number of flights in the database: ", flight_df.count())

# Show the first 25 rows (20 is the default)
# To show the first n rows, run: df.show(n)
# The second parameter indicates that column lengths shouldn't be truncated (default is 20 characters)
flight_df.show(25, False)

# You can also use the DataFrame to run simple queries. Results are returned in a DataFrame.
# Show the first 25 rows of the results of a query that returns selected colums for all flights originating from airports in Texas
flight_df.select("FlightDate", "Reporting_Airline", "Flight_Number_Reporting_Airline", "OriginCityName", "DepTime", "DestCityName", "ArrTime", "ArrDelay").filter("OriginState = 'TX'").show(258, False)

# Use display to run visualizations
# Preferably run this in a separate cmd cell
display(flight_df)

새 셀에 이 스크립트를 입력하여 데이터에 대한 몇 가지 기본 분석 쿼리를 실행합니다. 전체 스크립트(SHIFT + ENTER)를 실행하거나, 각 쿼리를 강조 표시하고, CTRL + SHIFT + ENTER를 사용하여 개별적으로 실행하거나, 각 쿼리를 별도의 셀에 입력하고 실행하도록 선택할 수 있습니다.

# create a temporary sql view for querying flight information
flight_data = spark.read.parquet('/mnt/flightdata/parquet/flights')
flight_data.createOrReplaceTempView('FlightTable')

# Print the total number of flights in Jan 2016 (the number of rows in the flight data).
print("Number of flights in Jan 2016: ", flight_data.count())

# Using spark sql, query the parquet file to return the total flights of each airline
num_flights_by_airline=spark.sql("SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline ORDER BY NumFlights DESC")
num_flights_by_airline.show()

# List out all the airports in Texas
airports_in_texas = spark.sql(
    "SELECT DISTINCT(OriginCityName) FROM FlightTable WHERE OriginStateName = 'Texas'")
print('Airports in Texas: ', airports_in_texas.count())
airports_in_texas.show(100, False)

# Find all airlines that fly from Texas
airlines_flying_from_texas = spark.sql(
    "SELECT DISTINCT(Reporting_Airline) FROM FlightTable WHERE OriginStateName='Texas'")
print('Airlines that fly to/from Texas: ', airlines_flying_from_texas.count())
airlines_flying_from_texas.show(100, False)

# List airlines by average arrival delay (negative values indicate early flights)
avg_arrival_delay=spark.sql(
    "SELECT Reporting_Airline, count(*) AS NumFlights, avg(DepDelay) AS AverageDepDelay, avg(ArrDelay) AS AverageArrDelay FROM FlightTable GROUP BY Reporting_Airline ORDER BY AverageArrDelay DESC")
print("Airlines by average arrival delay")
avg_arrival_delay.show()

# List airlines by the highest percentage of delayed flights. A delayed flight is one with a  departure or arrival delay that is greater than 15 minutes
spark.sql("DROP VIEW IF EXISTS totalFlights")
spark.sql("DROP VIEW IF EXISTS delayedFlights")
spark.sql(
    "CREATE TEMPORARY VIEW totalFlights AS SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline")
spark.sql(
    "CREATE TEMPORARY VIEW delayedFlights AS SELECT Reporting_Airline, count(*) AS NumDelayedFlights FROM FlightTable WHERE DepDelay>15 or ArrDelay>15 GROUP BY Reporting_Airline")
percent_delayed_flights=spark.sql(
    "SELECT totalFlights.Reporting_Airline, totalFlights.NumFlights, delayedFlights.NumDelayedFlights, delayedFlights.NumDelayedFlights/totalFlights.NumFlights*100 AS PercentFlightsDelayed FROM totalFlights INNER JOIN delayedFlights ON totalFlights.Reporting_Airline = delayedFlights.Reporting_Airline ORDER BY PercentFlightsDelayed DESC")
print("Airlines by percentage of flights delayed")
percent_delayed_flights.show()

요약

이 자습서에서는 다음을 수행합니다.

  • Azure Data Lake Storage Gen2 스토리지 계정 및 Azure AD 서비스 주체를 비롯한 Azure 리소스를 만들고 스토리지 계정에 액세스할 수 있는 권한을 할당했습니다.

  • Azure Databricks 작업 영역, Notebook 및 컴퓨팅 클러스터를 만들었습니다.

  • AzCopy를 사용하여 비정형 .csv 비행 데이터를 Azure Data Lake Storage Gen2 스토리지 계정에 업로드했습니다.

  • Databricks 파일 시스템 유틸리티 함수를 사용하여 Azure Data Lake Storage Gen2 스토리지 계정을 탑재하고 해당 계층적 파일 시스템을 탐색했습니다.

  • Apache Spark DataFrames를 사용하여 .csv 비행 데이터를 Apache parquet 형식으로 변환하고 Azure Data Lake Storage Gen2 스토리지 계정에 다시 저장했습니다.

  • DataFrame을 사용하여 비행 데이터를 탐색하고 간단한 쿼리를 수행했습니다.

  • Apache Spark SQL을 사용하여 2016년 1월 각 항공사의 총 항공편 수, 텍사스 공항, 텍사스에서 출발하는 항공사, 전국적으로 각 항공사의 평균 도착 지연 시간(분), 출발 또는 도착이 지연된 각 항공사 항공편의 비율을 쿼리했습니다.

리소스 정리

Notebook을 유지하고 나중에 다시 돌아가려면 요금이 부과되지 않도록 클러스터를 종료하는 것이 좋습니다. 클러스터를 종료하려면 Notebook 도구 모음의 오른쪽 위에 있는 컴퓨팅 선택기에서 클러스터를 선택하고 메뉴에서 종료를 선택하고 선택 항목을 확인합니다. (기본적으로 클러스터는 비활성 120분 후에 자동으로 종료됩니다.)

Notebook 및 클러스터와 같은 개별 작업 영역 리소스를 삭제하려면 작업 영역의 왼쪽 사이드바에서 삭제할 수 있습니다. 자세한 지침은 클러스터 삭제 또는 Notebook 삭제를 참조하세요.

더 이상 필요하지 않으면 리소스 그룹 및 모든 관련 리소스를 삭제합니다. 이렇게 하려면 Azure Portal에서 스토리지 계정 및 작업 영역에 대한 리소스 그룹을 선택한 다음, 삭제를 선택합니다.

다음 단계