Tutoriel : Azure Data Lake Storage Gen2, Azure Databricks et Spark
Ce tutoriel vous montre comment connecter un cluster Azure Databricks aux données contenues dans un compte de stockage Azure compatible avec Azure Data Lake Storage Gen2. Cette connexion vous permet d’exécuter en mode natif des requêtes et analyses sur des données à partir de votre cluster.
Ce didacticiel présente les procédures suivantes :
- Ingérer des données non structurées dans un compte de stockage
- Exécuter une analytique sur vos données dans le stockage Blob
Si vous n’avez pas d’abonnement Azure, créez un compte gratuit avant de commencer.
Prérequis
Créez un compte de stockage qui possède un espace de noms hiérarchique (Azure Data Lake Storage Gen2)
Consultez Créer un compte de stockage à utiliser avec Azure Data Lake Storage Gen2.
Vérifiez que le rôle Contributeur aux données Blob du stockage est attribué à votre compte d’utilisateur.
Installez AzCopy v10. Consultez Transférer des données avec AzCopy v10.
Créez un principal de service et une clé secrète client, puis accordez au principal de service l’accès au compte de stockage.
Consultez Tutoriel : Se connecter à Azure Data Lake Storage Gen2 (étapes 1 à 3). Après avoir effectué ces étapes, veillez à coller les valeurs d’ID de locataire, d’ID d’application et de clé secrète client dans un fichier texte. Vous en aurez besoin bientôt.
Téléchargement des données de vol
Ce tutoriel utilise des données de vol issues du Bureau of Transportation Statistics pour montrer comment effectuer une opération ETL. Vous devez télécharger ces données pour suivre ce tutoriel.
Téléchargez le fichier On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip. Ce fichier contient les données de vol.
Décompressez le contenu du fichier compressé et notez le nom du fichier et son chemin. Vous aurez besoin de ces informations lors d’une étape ultérieure.
Réception de données
Copier des données sources dans le compte de stockage
Utilisez AzCopy pour copier des données de votre fichier .csv dans votre compte Data Lake Storage Gen2.
Ouvrez une fenêtre d’invite de commandes et entrez la commande suivante pour vous connecter à votre compte de stockage.
azcopy login
Suivez les instructions qui apparaissent dans la fenêtre d’invite de commandes pour authentifier votre compte d’utilisateur.
Pour copier des données du compte .csv, entrez la commande suivante.
azcopy cp "<csv-folder-path>" https://<storage-account-name>.dfs.core.windows.net/<container-name>/folder1/On_Time.csv
Remplacez la valeur d’espace réservé
<csv-folder-path>
par le chemin du fichier .csv.Remplacez la valeur d’espace réservé
<storage-account-name>
par le nom de votre compte de stockage.Remplacez la valeur d’espace réservé
<container-name>
par le nom d’un conteneur dans votre compte de stockage.
Créer un espace de travail, un cluster et un notebook Azure Databricks
Créer un espace de travail Azure Databricks. Consultez Créer un espace de travail Azure Databricks.
Créer un cluster. Voir Créez un cluster.
Créez un bloc-notes. Consultez Création d’un notebook. Choisissez Python comme langage par défaut du notebook.
Créer et monter un conteneur
Dans la liste déroulante Cluster, vérifiez que le cluster que vous avez créé précédemment est sélectionné.
Cliquez sur Créer. Le notebook s’ouvre avec une cellule vide en haut.
Copiez et collez le bloc de code suivant dans la première cellule, mais n’exécutez pas ce code pour l’instant.
configs = {"fs.azure.account.auth.type": "OAuth", "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider", "fs.azure.account.oauth2.client.id": "<appId>", "fs.azure.account.oauth2.client.secret": "<clientSecret>", "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenant>/oauth2/token", "fs.azure.createRemoteFileSystemDuringInitialization": "true"} dbutils.fs.mount( source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/folder1", mount_point = "/mnt/flightdata", extra_configs = configs)
Dans ce bloc de code, remplacez les valeurs d’espace réservé
appId
,clientSecret
,tenant
etstorage-account-name
par celles que vous avez collectées au moment de la finalisation des prérequis de ce tutoriel. Remplacez la valeur d’espace réservécontainer-name
par le nom du conteneur.Appuyez sur les touches Maj +Entrée pour exécuter le code de ce bloc.
Laissez ce notebook ouvert car vous allez y ajouter des commandes plus tard.
Utiliser Databricks Notebook pour convertir CSV en Parquet
Dans le notebook que vous avez créé précédemment, ajoutez une nouvelle cellule et collez-y le code suivant.
# Use the previously established DBFS mount point to read the data.
# create a data frame to read data.
flightDF = spark.read.format('csv').options(
header='true', inferschema='true').load("/mnt/flightdata/*.csv")
# read the airline csv file and write the output to parquet format for easy query.
flightDF.write.mode("append").parquet("/mnt/flightdata/parquet/flights")
print("Done")
Explorer des données
Dans une nouvelle cellule, collez le code suivant pour obtenir la liste des fichiers CSV téléchargés par le biais d’AzCopy.
import os.path
import IPython
from pyspark.sql import SQLContext
display(dbutils.fs.ls("/mnt/flightdata"))
Pour créer un nouveau fichier et répertorier les fichiers dans le dossier parquet/flights dossier, exécutez ce script :
dbutils.fs.put("/mnt/flightdata/1.txt", "Hello, World!", True)
dbutils.fs.ls("/mnt/flightdata/parquet/flights")
Avec ces exemples de code, vous avez exploré la nature hiérarchique de HDFS avec des données stockées dans un compte de stockage compatible avec Azure Data Lake Storage Gen2.
Interroger les données
Vous pouvez ensuite commencer à interroger les données que vous avez chargées dans votre compte de stockage. Entrez chacun des blocs de code suivants dans Cmd 1 et appuyez sur Cmd + Entrée pour exécuter le script Python.
Pour créer des trames de données pour vos sources de données, exécutez le script suivant :
- Remplacez la valeur d’espace réservé
<csv-folder-path>
par le chemin du fichier .csv.
# Copy this into a Cmd cell in your notebook.
acDF = spark.read.format('csv').options(
header='true', inferschema='true').load("/mnt/flightdata/On_Time.csv")
acDF.write.parquet('/mnt/flightdata/parquet/airlinecodes')
# read the existing parquet file for the flights database that was created earlier
flightDF = spark.read.format('parquet').options(
header='true', inferschema='true').load("/mnt/flightdata/parquet/flights")
# print the schema of the dataframes
acDF.printSchema()
flightDF.printSchema()
# print the flight database size
print("Number of flights in the database: ", flightDF.count())
# show the first 20 rows (20 is the default)
# to show the first n rows, run: df.show(n)
acDF.show(100, False)
flightDF.show(20, False)
# Display to run visualizations
# preferably run this in a separate cmd cell
display(flightDF)
Entrez ce script pour exécuter des requêtes d’analyse basiques sur les données.
# Run each of these queries, preferably in a separate cmd cell for separate analysis
# create a temporary sql view for querying flight information
FlightTable = spark.read.parquet('/mnt/flightdata/parquet/flights')
FlightTable.createOrReplaceTempView('FlightTable')
# create a temporary sql view for querying airline code information
AirlineCodes = spark.read.parquet('/mnt/flightdata/parquet/airlinecodes')
AirlineCodes.createOrReplaceTempView('AirlineCodes')
# using spark sql, query the parquet file to return total flights in January and February 2016
out1 = spark.sql("SELECT * FROM FlightTable WHERE Month=1 and Year= 2016")
NumJan2016Flights = out1.count()
out2 = spark.sql("SELECT * FROM FlightTable WHERE Month=2 and Year= 2016")
NumFeb2016Flights = out2.count()
print("Jan 2016: ", NumJan2016Flights, " Feb 2016: ", NumFeb2016Flights)
Total = NumJan2016Flights+NumFeb2016Flights
print("Total flights combined: ", Total)
# List out all the airports in Texas
out = spark.sql(
"SELECT distinct(OriginCityName) FROM FlightTable where OriginStateName = 'Texas'")
print('Airports in Texas: ', out.show(100))
# find all airlines that fly from Texas
out1 = spark.sql(
"SELECT distinct(Reporting_Airline) FROM FlightTable WHERE OriginStateName='Texas'")
print('Airlines that fly to/from Texas: ', out1.show(100, False))
Nettoyer les ressources
Lorsque vous n’en avez plus besoin, supprimez le groupe de ressources et toutes les ressources associées. Pour ce faire, sélectionnez le groupe de ressources du compte de stockage, puis sélectionnez Supprimer.