Tutorial: Azure Data Lake Storage Gen2, Azure Databricks e Apache Spark

Este tutorial mostra como conectar seu cluster do Azure Databricks aos dados armazenados em uma conta de armazenamento do Azure que tenha o Azure Data Lake Storage Gen2 habilitado. Essa conexão permite que você execute nativamente consultas e análises do cluster em seus dados.

Neste tutorial, vai:

  • Ingerir dados não estruturados numa conta de armazenamento
  • Execute análises em seus dados no armazenamento de Blob

Se não tiver uma subscrição do Azure, crie uma conta gratuita antes de começar.

Pré-requisitos

Criar um espaço de trabalho, cluster e bloco de anotações do Azure Databricks

  1. Crie um espaço de trabalho do Azure Databricks. Consulte Criar um espaço de trabalho do Azure Databricks.

  2. Crie um cluster. Consulte Criar um cluster.

  3. Crie um bloco de notas. Consulte Criar um bloco de notas. Escolha Python como o idioma padrão do bloco de anotações.

Mantenha o seu bloco de notas aberto. Você usá-lo nas seções a seguir.

Transferir os dados de voos

Este tutorial usa dados de voo de desempenho pontual para janeiro de 2016 do Bureau of Transportation Statistics para demonstrar como executar uma operação ETL. Você deve baixar esses dados para concluir o tutorial.

  1. Transfira o ficheiro On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip . Este ficheiro contém os dados do voo.

  2. Descompacte o conteúdo do arquivo compactado e anote o nome do arquivo e o caminho do arquivo. Você precisa dessas informações em uma etapa posterior.

Se você quiser saber mais sobre as informações capturadas nos dados de desempenho de relatórios pontuais, você pode ver as descrições de campo no site do Bureau of Transportation Statistics.

Ingerir dados

Nesta seção, você carrega os dados de voo .csv em sua conta do Azure Data Lake Storage Gen2 e, em seguida, monta a conta de armazenamento em seu cluster Databricks. Finalmente, você usa o Databricks para ler os dados de voo .csv e gravá-los de volta no armazenamento no formato de parquet Apache.

Carregue os dados do voo na sua conta de armazenamento

Use o AzCopy para copiar seu arquivo de .csv para sua conta do Azure Data Lake Storage Gen2. Use o azcopy make comando para criar um contêiner em sua conta de armazenamento. Em seguida, você usa o azcopy copy comando para copiar os dados csv que você acabou de baixar para um diretório nesse contêiner.

Nas etapas a seguir, você precisa inserir nomes para o contêiner que deseja criar e o diretório e blob para os quais deseja carregar os dados de voo no contêiner. Você pode usar os nomes sugeridos em cada etapa ou especificar seus próprios observando as convenções de nomenclatura para contêineres, diretórios e blobs.

  1. Abra uma janela de prompt de comando e digite o seguinte comando para entrar no Azure Ative Directory para acessar sua conta de armazenamento.

    azcopy login
    

    Siga as instruções que aparecem na janela do prompt de comando para autenticar sua conta de usuário.

  2. Para criar um contêiner em sua conta de armazenamento para armazenar os dados de voo, digite o seguinte comando:

    azcopy make  "https://<storage-account-name>.dfs.core.windows.net/<container-name>" 
    
    • Substitua o valor de espaço reservado <storage-account-name> pelo nome da sua conta de armazenamento.

    • Substitua o espaço reservado por um nome para o <container-name> contêiner que você deseja criar para armazenar os dados csv , por exemplo, flight-data-container.

  3. Para carregar (copiar) os dados csv para sua conta de armazenamento, digite o seguinte comando.

    azcopy copy "<csv-folder-path>" https://<storage-account-name>.dfs.core.windows.net/<container-name>/<directory-name>/On_Time.csv
    
    • Substitua o valor de espaço reservado pelo caminho para o <csv-folder-path>arquivo .csv .

    • Substitua o valor de espaço reservado <storage-account-name> pelo nome da sua conta de armazenamento.

    • Substitua o espaço reservado <container-name> pelo nome do contêiner em sua conta de armazenamento.

    • Substitua o espaço reservado <directory-name> pelo nome de um diretório para armazenar seus dados no contêiner, por exemplo, jan2016.

Monte sua conta de armazenamento no cluster Databricks

Nesta seção, você monta seu armazenamento de objetos na nuvem do Azure Data Lake Storage Gen2 no Sistema de Arquivos Databricks (DBFS). Você usa o princípio de serviço do Azure AD criado anteriormente para autenticação com a conta de armazenamento. Para obter mais informações, consulte Montagem do armazenamento de objetos na nuvem no Azure Databricks.

  1. Anexe o bloco de notas ao cluster.

    1. No bloco de notas que criou anteriormente, selecione o botão Ligar no canto superior direito da barra de ferramentas do bloco de notas. Este botão abre o seletor de computação. (Se você já tiver conectado seu bloco de anotações a um cluster, o nome desse cluster será mostrado no texto do botão em vez deConecte-se).

    2. No menu suspenso do cluster, selecione o cluster criado anteriormente.

    3. Observe que o texto no seletor de cluster muda para iniciar. Aguarde até que o cluster termine de iniciar e que o nome do cluster apareça no botão antes de continuar.

  2. Copie e cole o seguinte bloco de código na primeira célula, mas ainda não execute esse código.

    configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": "<appId>",
           "fs.azure.account.oauth2.client.secret": "<clientSecret>",
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenantId>/oauth2/token",
           "fs.azure.createRemoteFileSystemDuringInitialization": "true"}
    
    dbutils.fs.mount(
    source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<directory-name>",
    mount_point = "/mnt/flightdata",
    extra_configs = configs)
    
  3. Neste bloco de código:

    • No configs, substitua os <appId>valores , e de espaço reservado pela ID do aplicativo, <clientSecret>segredo do cliente e <tenantId> ID do locatário que você copiou quando criou a entidade de serviço nos pré-requisitos.

    • source No URI, substitua os valores , <container-name>e espaço reservado pelo nome da sua conta de armazenamento do Azure Data Lake Storage Gen2 e o nome do contêiner e <directory-name> diretório que você especificou quando carregou os <storage-account-name>dados de voo para a conta de armazenamento.

      Nota

      O identificador de esquema no URI, , abfssdiz ao Databricks para usar o driver do Sistema de Arquivos de Blob do Azure com TLS (Transport Layer Security). Para saber mais sobre o URI, consulte Usar o URI do Azure Data Lake Storage Gen2.

  4. Certifique-se de que o cluster terminou de iniciar antes de continuar.

  5. Pressione as teclas SHIFT + ENTER para executar o código neste bloco.

O contêiner e o diretório onde você carregou os dados de voo em sua conta de armazenamento agora estão acessíveis em seu notebook através do ponto de montagem, /mnt/flightdata.

Utilize o Databricks Notebook para converter CSV em Parquet

Agora que os dados de voo csv estão acessíveis por meio de um ponto de montagem DBFS, você pode usar um Apache Spark DataFrame para carregá-los em seu espaço de trabalho e gravá-los novamente no formato Apache parquet em seu armazenamento de objetos do Azure Data Lake Storage Gen2.

  • Um Spark DataFrame é uma estrutura de dados rotulada bidimensional com colunas de tipos potencialmente diferentes. Você pode usar um DataFrame para ler e gravar dados facilmente em vários formatos suportados. Com um DataFrame, você pode carregar dados do armazenamento de objetos na nuvem e executar análises e transformações dentro do cluster de computação sem afetar os dados subjacentes no armazenamento de objetos na nuvem. Para saber mais, consulte Trabalhar com DataFrames PySpark no Azure Databricks.

  • Apache parquet é um formato de arquivo colunar com otimizações que aceleram as consultas. É um formato de arquivo mais eficiente do que CSV ou JSON. Para saber mais, consulte Arquivos do Parquet.

No bloco de anotações, adicione uma nova célula e cole o código a seguir nela.

# Use the previously established DBFS mount point to read the data.
# Create a DataFrame to read the csv data.
# The header option specifies that the first row of data should be used as the DataFrame column names
# The inferschema option specifies that the column data types should be inferred from the data in the file
flight_df = spark.read.format('csv').options(
    header='true', inferschema='true').load("/mnt/flightdata/*.csv")

# Read the airline csv file and write the output to parquet format for easy query.
flight_df.write.mode("append").parquet("/mnt/flightdata/parquet/flights")
print("Done")

Pressione as teclas SHIFT + ENTER para executar o código neste bloco.

Antes de prosseguir para a próxima seção, certifique-se de que todos os dados do parquet foram gravados e "Concluído" aparece na saída.

Explorar dados

Nesta seção, você usa o utilitário do sistema de arquivos Databricks para explorar seu armazenamento de objetos do Azure Data Lake Storage Gen2 usando o ponto de montagem DBFS criado na seção anterior.

Em uma nova célula, cole o código a seguir para obter uma lista dos arquivos no ponto de montagem. O primeiro comando gera uma lista de arquivos e diretórios. O segundo comando exibe a saída em formato tabular para facilitar a leitura.

dbutils.fs.ls("/mnt/flightdata")
display(dbutils.fs.ls("/mnt/flightdata"))

Pressione as teclas SHIFT + ENTER para executar o código neste bloco.

Observe que o diretório parquet aparece na listagem. Você salvou os dados de voo .csv em formato parquet no diretório parquet/flights na seção anterior. Para listar arquivos no diretório parquet/flights , cole o seguinte código em uma nova célula e execute-o:

display(dbutils.fs.ls("/mnt/flightdata/parquet/flights"))

Para criar um novo arquivo e listá-lo, cole o seguinte código em uma nova célula e execute-o:

dbutils.fs.put("/mnt/flightdata/mydirectory/mysubdirectory/1.txt", "Hello, World!", True)
display(dbutils.fs.ls("/mnt/flightdata/mydirectory/mysubdirectory"))

Como você não precisa do arquivo 1.txt neste tutorial, você pode colar o código a seguir em uma célula e executá-lo para excluir recursivamente mydirectory. O True parâmetro indica uma exclusão recursiva.

dbutils.fs.rm("/mnt/flightdata/mydirectory", True)

Como uma conveniência, você pode usar o comando help para aprender detalhes sobre outros comandos.

dbutils.fs.help("rm")

Com esses exemplos de código, você explorou a natureza hierárquica do HDFS usando dados armazenados em uma conta de armazenamento com o Azure Data Lake Storage Gen2 habilitado.

Consultar os dados

Em seguida, pode começar a consultar os dados que carregou para a sua conta de armazenamento. Insira cada um dos seguintes blocos de código em uma nova célula e pressione SHIFT + ENTER para executar o script Python.

DataFrames fornecem um rico conjunto de funções (selecionar colunas, filtrar, juntar, agregar) que permitem resolver problemas comuns de análise de dados de forma eficiente.

Para carregar um DataFrame a partir dos dados de voo do parquet salvos anteriormente e explorar algumas das funcionalidades suportadas, insira esse script em uma nova célula e execute-o.

# Read the existing parquet file for the flights database that was created earlier
flight_df = spark.read.parquet("/mnt/flightdata/parquet/flights")

# Print the schema of the dataframe
flight_df.printSchema()

# Print the flight database size
print("Number of flights in the database: ", flight_df.count())

# Show the first 25 rows (20 is the default)
# To show the first n rows, run: df.show(n)
# The second parameter indicates that column lengths shouldn't be truncated (default is 20 characters)
flight_df.show(25, False)

# You can also use the DataFrame to run simple queries. Results are returned in a DataFrame.
# Show the first 25 rows of the results of a query that returns selected colums for all flights originating from airports in Texas
flight_df.select("FlightDate", "Reporting_Airline", "Flight_Number_Reporting_Airline", "OriginCityName", "DepTime", "DestCityName", "ArrTime", "ArrDelay").filter("OriginState = 'TX'").show(258, False)

# Use display to run visualizations
# Preferably run this in a separate cmd cell
display(flight_df)

Insira esse script em uma nova célula para executar algumas consultas básicas de análise nos dados. Você pode optar por executar o script inteiro (SHIFT + ENTER), realçar cada consulta e executá-la separadamente com CTRL + SHIFT + ENTER, ou inserir cada consulta em uma célula separada e executá-la lá.

# create a temporary sql view for querying flight information
flight_data = spark.read.parquet('/mnt/flightdata/parquet/flights')
flight_data.createOrReplaceTempView('FlightTable')

# Print the total number of flights in Jan 2016 (the number of rows in the flight data).
print("Number of flights in Jan 2016: ", flight_data.count())

# Using spark sql, query the parquet file to return the total flights of each airline
num_flights_by_airline=spark.sql("SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline ORDER BY NumFlights DESC")
num_flights_by_airline.show()

# List out all the airports in Texas
airports_in_texas = spark.sql(
    "SELECT DISTINCT(OriginCityName) FROM FlightTable WHERE OriginStateName = 'Texas'")
print('Airports in Texas: ', airports_in_texas.count())
airports_in_texas.show(100, False)

# Find all airlines that fly from Texas
airlines_flying_from_texas = spark.sql(
    "SELECT DISTINCT(Reporting_Airline) FROM FlightTable WHERE OriginStateName='Texas'")
print('Airlines that fly to/from Texas: ', airlines_flying_from_texas.count())
airlines_flying_from_texas.show(100, False)

# List airlines by average arrival delay (negative values indicate early flights)
avg_arrival_delay=spark.sql(
    "SELECT Reporting_Airline, count(*) AS NumFlights, avg(DepDelay) AS AverageDepDelay, avg(ArrDelay) AS AverageArrDelay FROM FlightTable GROUP BY Reporting_Airline ORDER BY AverageArrDelay DESC")
print("Airlines by average arrival delay")
avg_arrival_delay.show()

# List airlines by the highest percentage of delayed flights. A delayed flight is one with a  departure or arrival delay that is greater than 15 minutes
spark.sql("DROP VIEW IF EXISTS totalFlights")
spark.sql("DROP VIEW IF EXISTS delayedFlights")
spark.sql(
    "CREATE TEMPORARY VIEW totalFlights AS SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline")
spark.sql(
    "CREATE TEMPORARY VIEW delayedFlights AS SELECT Reporting_Airline, count(*) AS NumDelayedFlights FROM FlightTable WHERE DepDelay>15 or ArrDelay>15 GROUP BY Reporting_Airline")
percent_delayed_flights=spark.sql(
    "SELECT totalFlights.Reporting_Airline, totalFlights.NumFlights, delayedFlights.NumDelayedFlights, delayedFlights.NumDelayedFlights/totalFlights.NumFlights*100 AS PercentFlightsDelayed FROM totalFlights INNER JOIN delayedFlights ON totalFlights.Reporting_Airline = delayedFlights.Reporting_Airline ORDER BY PercentFlightsDelayed DESC")
print("Airlines by percentage of flights delayed")
percent_delayed_flights.show()

Resumo

Neste tutorial:

  • Recursos do Azure criados, incluindo uma conta de armazenamento do Azure Data Lake Storage Gen2 e uma entidade de serviço do Azure AD, e permissões atribuídas para acessar a conta de armazenamento.

  • Criou um espaço de trabalho, um bloco de anotações e um cluster de computação do Azure Databricks.

  • AzCopy usado para carregar dados de voo .csv não estruturados para a conta de armazenamento do Azure Data Lake Storage Gen2.

  • Funções usadas do utilitário Sistema de Arquivos Databricks para montar sua conta de armazenamento do Azure Data Lake Storage Gen2 e explorar seu sistema de arquivos hierárquico.

  • Usou o Apache Spark DataFrames para transformar seus dados de voo .csv para o formato Apache parquet e armazená-los de volta em sua conta de armazenamento do Azure Data Lake Storage Gen2.

  • DataFrames usados para explorar os dados de voo e realizar uma consulta simples.

  • Usado o Apache Spark SQL para consultar os dados de voo para o número total de voos de cada companhia aérea em janeiro de 2016, os aeroportos no Texas, as companhias aéreas que voam do Texas, o atraso médio de chegada em minutos para cada companhia aérea nacionalmente e a porcentagem de voos de cada companhia aérea que atrasaram partidas ou chegadas.

Clean up resources (Limpar recursos)

Se você quiser preservar o bloco de anotações e voltar a ele mais tarde, é uma boa ideia desligar (encerrar) o cluster para evitar cobranças. Para encerrar o cluster, selecione-o no seletor de computação localizado no canto superior direito da barra de ferramentas do bloco de anotações, selecione Encerrar no menu e confirme sua seleção. (Por padrão, o cluster será encerrado automaticamente após 120 minutos de inatividade.)

Se quiser excluir recursos individuais do espaço de trabalho, como blocos de anotações e clusters, você pode fazê-lo na barra lateral esquerda do espaço de trabalho. Para obter instruções detalhadas, consulte Excluir um cluster ou Excluir um bloco de anotações.

Quando não forem mais necessários, exclua o grupo de recursos e todos os recursos relacionados. Para fazer isso no portal do Azure, selecione o grupo de recursos para a conta de armazenamento e o espaço de trabalho e selecione Excluir.

Próximos passos