Oktatóanyag: Adatok kinyerása, átalakítása és betöltése az Azure Databricks használatával

Ebben az oktatóanyagban ETL-műveletet (adatok kinyerése, átalakítása és betöltése) hajt végre az Azure Databricks használatával. Adatokat nyerhet ki az Azure Data Lake Storage Gen2-ből az Azure Databricksbe, átalakításokat futtathat az Azure Databricksben lévő adatokon, és betöltheti az átalakított adatokat az Azure Synapse Analyticsbe.

Az oktatóanyag lépései az Azure Databricks Azure Synapse-összekötőjével továbbítják az adatokat az Azure Databricksbe. Ez az összekötő az Azure Blob Storage-t használja ideiglenes tárolóként az Azure Databricks-fürt és az Azure Synapse közötti adatátvitelhez.

Az alábbi ábrán az alkalmazásfolyam látható:

Azure Databricks with Data Lake Store and Azure Synapse

Ez az oktatóanyag a következő feladatokat mutatja be:

  • Hozzon létre egy Azure Databricks-szolgáltatást.
  • Spark-fürt létrehozása az Azure Databricksben.
  • Hozzon létre egy fájlrendszert a Data Lake Storage Gen2-fiókban.
  • Mintaadatok feltöltése az Azure Data Lake Storage Gen2-fiókba.
  • Hozzon létre egy szolgáltatásnevet.
  • Adatok kinyerése az Azure Data Lake Storage Gen2-fiókból.
  • Adatok átalakítása az Azure Databricksben.
  • Adatok betöltése az Azure Synapse-be.

Ha még nincs Azure-előfizetése, kezdés előtt hozzon létre egy ingyenes fiókot.

Feljegyzés

Ez az oktatóanyag nem végezhető el ingyenes Azure-próbaverziós előfizetéssel. Ha ingyenes fiókja van, nyissa meg a profilját, és módosítsa előfizetését használatalapú fizetésre. További információkért lásd az ingyenes Azure-fiókot ismertető cikket. Ezután törölje a költségkeretet, majd igényeljen kvótaemelést a vCPU-k esetében a régiójában. Az Azure Databricks-munkaterület létrehozásakor kiválaszthatja a Próbaverzió (Prémium – 14 napos ingyenes DBU-k) tarifacsomagot, hogy a munkaterület 14 napig ingyenes Prémium Szintű Azure Databricks DBU-khoz férhessen hozzá.

Előfeltételek

Az oktatóanyag megkezdése előtt végezze el ezeket a feladatokat:

Gyűjtse össze a szükséges információkat

Győződjön meg arról, hogy teljesíti az oktatóanyag előfeltételeit.

Mielőtt hozzákezdene, az alábbi információkra van szüksége:

✔️ Az Azure Synapse adatbázisneve, adatbázis-kiszolgálójának neve, felhasználóneve és jelszava.

✔️ A Blob Storage-fiók hozzáférési kulcsa.

✔️ A Data Lake Storage Gen2-tárfiók neve.

✔️ Az előfizetés bérlőazonosítója.

✔️ A Microsoft Entra ID-val (korábban Azure Active Directory) regisztrált alkalmazás alkalmazásazonosítója.

✔️ A Microsoft Entra ID azonosítóval (korábban Azure Active Directory) regisztrált alkalmazás hitelesítési kulcsa.

Azure Databricks-szolgáltatás létrehozása

Ebben a szakaszban egy Azure Databricks-szolgáltatást hoz létre az Azure Portal használatával.

  1. Az Azure Portal menüjében válassza az Erőforrás létrehozása elemet.

    Create a resource on Azure portal

    Ezután válassza az Analytics>Azure Databricks lehetőséget.

    Create Azure Databricks on Azure portal

  2. Az Azure Databricks Service alatt adja meg a következő értékeket egy Databricks-szolgáltatás létrehozásához:

    Tulajdonság Leírás
    Munkaterület neve Adja meg a Databricks-munkaterület nevét.
    Előfizetés Válassza ki a legördülő menüből a saját Azure-előfizetését.
    Erőforráscsoport Adja meg, hogy új erőforráscsoportot kíván-e létrehozni, vagy egy meglévőt szeretne használni. Az erőforráscsoport egy tároló, amely Azure-megoldásokhoz kapcsolódó erőforrásokat tárol. További információért olvassa el az Azure-erőforráscsoportok áttekintését.
    Helyen Válassza az USA 2. nyugati régióját. A további elérhető régiókért tekintse meg az elérhető Azure-szolgáltatások régiók szerinti bontását.
    Tarifacsomag Válassza a Standard lehetőséget.
  3. A fiók létrehozása eltarthat néhány percig. A művelet állapotának figyeléséhez tekintse meg a folyamatjelző sávot a tetején.

  4. Válassza a Rögzítés az irányítópulton, majd a Létrehozás lehetőséget.

Spark-fürt létrehozása az Azure Databricksben

  1. Az Azure Portalon nyissa meg a létrehozott Databricks szolgáltatást, és válassza a Munkaterület indítása lehetőséget.

  2. A rendszer átirányítja az Azure Databricks portálra. A portálon válassza a Fürt elemet.

    Databricks on Azure

  3. Az Új fürt lapon adja meg a fürt létrehozásához szükséges értékeket.

    Create Databricks Spark cluster on Azure

  4. Adjon meg értékeket a következő mezőkben, és fogadja el az alapértelmezett értékeket a többi mezőben:

    • Adjon egy nevet a fürtnek.

    • Jelölje be a Megszakítás __ perc inaktivitás után jelölőnégyzetet. Ha a fürt nincs használatban, adjon meg egy időtartamot (percben) a fürt leállításához.

    • Válassza a Fürt létrehozása lehetőséget. A fürt futtatása után jegyzetfüzeteket csatolhat a fürthöz, és Spark-feladatokat futtathat.

Fájlrendszer létrehozása az Azure Data Lake Storage Gen2-fiókban

Ebben a szakaszban létrehoz egy jegyzetfüzetet az Azure Databricks-munkaterületen, majd kódrészleteket futtat a tárfiók konfigurálásához

  1. Az Azure Portalon nyissa meg a létrehozott Azure Databricks szolgáltatást, és válassza a Munkaterület indítása lehetőséget.

  2. A bal oldalon válassza a Munkaterület lehetőséget. A Munkaterület legördülő menüből válassza a Létrehozás>Jegyzetfüzet lehetőséget.

    Create a notebook in Databricks

  3. A Jegyzetfüzet létrehozása párbeszédpanelen adja meg a jegyzetfüzet nevét. Válassza a Scala nyelvet, majd válassza ki a korábban létrehozott Spark-fürtöt.

    Provide details for a notebook in Databricks

  4. Válassza a Létrehozás lehetőséget.

  5. Az alábbi kódblokk alapértelmezett szolgáltatásnév-hitelesítő adatokat állít be a Spark-munkamenetben elért ADLS Gen 2-fiókhoz. A második kódblokk hozzáfűzi a fiók nevét a beállításhoz egy adott ADLS Gen 2-fiók hitelesítő adatainak megadásához. Másolja és illessze be bármelyik kódblokkot az Azure Databricks-jegyzetfüzet első cellájába.

    Munkamenet-konfiguráció

    val appID = "<appID>"
    val secret = "<secret>"
    val tenantID = "<tenant-id>"
    
    spark.conf.set("fs.azure.account.auth.type", "OAuth")
    spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    spark.conf.set("fs.azure.account.oauth2.client.id", "<appID>")
    spark.conf.set("fs.azure.account.oauth2.client.secret", "<secret>")
    spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<tenant-id>/oauth2/token")
    spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
    

    Fiók konfigurálása

    val storageAccountName = "<storage-account-name>"
    val appID = "<app-id>"
    val secret = "<secret>"
    val fileSystemName = "<file-system-name>"
    val tenantID = "<tenant-id>"
    
    spark.conf.set("fs.azure.account.auth.type." + storageAccountName + ".dfs.core.windows.net", "OAuth")
    spark.conf.set("fs.azure.account.oauth.provider.type." + storageAccountName + ".dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    spark.conf.set("fs.azure.account.oauth2.client.id." + storageAccountName + ".dfs.core.windows.net", "" + appID + "")
    spark.conf.set("fs.azure.account.oauth2.client.secret." + storageAccountName + ".dfs.core.windows.net", "" + secret + "")
    spark.conf.set("fs.azure.account.oauth2.client.endpoint." + storageAccountName + ".dfs.core.windows.net", "https://login.microsoftonline.com/" + tenantID + "/oauth2/token")
    spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
    dbutils.fs.ls("abfss://" + fileSystemName  + "@" + storageAccountName + ".dfs.core.windows.net/")
    spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "false")
    
  6. Ebben a kódblokkban cserélje le a <app-id>kódblokkban lévő , <secret>, , <tenant-id>és <storage-account-name> helyőrző értékeket az oktatóanyag előfeltételeinek teljesítése során gyűjtött értékekre. Cserélje le a <file-system-name> helyőrző értéket a fájlrendszernek adni kívánt névre.

    • Az <app-id>, és <secret> azon alkalmazásból származnak, amelyet egy szolgáltatásnév létrehozása során regisztrált az Active Directoryban.

    • Az <tenant-id> előfizetésből származik.

    • Az <storage-account-name> Azure Data Lake Storage Gen2-tárfiók neve.

  7. Nyomja le a SHIFT + ENTER billentyűket a kód ebben a blokkban való futtatásához.

Mintaadatok betöltése az Azure Data Lake Storage Gen2-fiókba

Mielőtt ehhez a szakaszhoz hozzáfogna, a következő előfeltételeknek kell eleget tennie:

Írja be az alábbi kódot egy jegyzetfüzetcellába:

%sh wget -P /tmp https://raw.githubusercontent.com/Azure/usql/master/Examples/Samples/Data/json/radiowebsite/small_radio_json.json

A cellában nyomja le a SHIFT + ENTER billentyűkombinációt a kód futtatásához.

Most egy új cellába írja be a következő kódot, és cserélje le a szögletes zárójelben megjelenő értékeket a korábban használt értékekkel:

dbutils.fs.cp("file:///tmp/small_radio_json.json", "abfss://" + fileSystemName + "@" + storageAccountName + ".dfs.core.windows.net/")

A cellában nyomja le a SHIFT + ENTER billentyűkombinációt a kód futtatásához.

Adatok kinyerése az Azure Data Lake Storage Gen2-fiókból

  1. Most már betöltheti a minta json-fájlt adatkeretként az Azure Databricksben. Illessze be a következő kódot egy új cellába. Cserélje le a szögletes zárójelben látható helyőrzőket az értékekre.

    val df = spark.read.json("abfss://" + fileSystemName + "@" + storageAccountName + ".dfs.core.windows.net/small_radio_json.json")
    
  2. Nyomja le a SHIFT + ENTER billentyűket a kód ebben a blokkban való futtatásához.

  3. Futtassa a következő kódot az adatkeret tartalmának megtekintéséhez:

    df.show()
    

    Az alábbi kódrészlethez hasonló kimenet jelenik meg:

    +---------------------+---------+---------+------+-------------+----------+---------+-------+--------------------+------+--------+-------------+---------+--------------------+------+-------------+------+
    |               artist|     auth|firstName|gender|itemInSession|  lastName|   length|  level|            location|method|    page| registration|sessionId|                song|status|           ts|userId|
    +---------------------+---------+---------+------+-------------+----------+---------+-------+--------------------+------+--------+-------------+---------+--------------------+------+-------------+------+
    | El Arrebato         |Logged In| Annalyse|     F|            2|Montgomery|234.57914| free  |  Killeen-Temple, TX|   PUT|NextSong|1384448062332|     1879|Quiero Quererte Q...|   200|1409318650332|   309|
    | Creedence Clearwa...|Logged In|   Dylann|     M|            9|    Thomas|340.87138| paid  |       Anchorage, AK|   PUT|NextSong|1400723739332|       10|        Born To Move|   200|1409318653332|    11|
    | Gorillaz            |Logged In|     Liam|     M|           11|     Watts|246.17751| paid  |New York-Newark-J...|   PUT|NextSong|1406279422332|     2047|                DARE|   200|1409318685332|   201|
    ...
    ...
    

    Ezzel kinyerte az adatokat a 2. generációs Azure Data Lake Storage-ből az Azure Databricksbe.

Adatok átalakítása az Azure Databricksben

A nyers mintaadatok small_radio_json.json fájl rögzíti a rádióállomás célközönségét, és számos oszlopot tartalmaz. Ebben a szakaszban úgy alakíthatja át az adatokat, hogy csak bizonyos oszlopokat kérjen le az adathalmazból.

  1. Először csak a firstName, a lastName, a nem, a hely és a szint oszlopokat kérje le a létrehozott adatkeretből.

    val specificColumnsDf = df.select("firstname", "lastname", "gender", "location", "level")
    specificColumnsDf.show()
    

    A kimenet az alábbi kódrészletben látható módon jelenik meg:

    +---------+----------+------+--------------------+-----+
    |firstname|  lastname|gender|            location|level|
    +---------+----------+------+--------------------+-----+
    | Annalyse|Montgomery|     F|  Killeen-Temple, TX| free|
    |   Dylann|    Thomas|     M|       Anchorage, AK| paid|
    |     Liam|     Watts|     M|New York-Newark-J...| paid|
    |     Tess|  Townsend|     F|Nashville-Davidso...| free|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...| free|
    |     Alan|     Morse|     M|Chicago-Napervill...| paid|
    |Gabriella|   Shelton|     F|San Jose-Sunnyval...| free|
    |   Elijah|  Williams|     M|Detroit-Warren-De...| paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...| free|
    |     Tess|  Townsend|     F|Nashville-Davidso...| free|
    |     Alan|     Morse|     M|Chicago-Napervill...| paid|
    |     Liam|     Watts|     M|New York-Newark-J...| paid|
    |     Liam|     Watts|     M|New York-Newark-J...| paid|
    |   Dylann|    Thomas|     M|       Anchorage, AK| paid|
    |     Alan|     Morse|     M|Chicago-Napervill...| paid|
    |   Elijah|  Williams|     M|Detroit-Warren-De...| paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...| free|
    |     Alan|     Morse|     M|Chicago-Napervill...| paid|
    |   Dylann|    Thomas|     M|       Anchorage, AK| paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...| free|
    +---------+----------+------+--------------------+-----+
    
  2. Az adatok további átalakításához nevezze át a level oszlopot a következőre: subscription_type.

    val renamedColumnsDF = specificColumnsDf.withColumnRenamed("level", "subscription_type")
    renamedColumnsDF.show()
    

    A kimenet az alábbi kódrészletben látható módon jelenik meg.

    +---------+----------+------+--------------------+-----------------+
    |firstname|  lastname|gender|            location|subscription_type|
    +---------+----------+------+--------------------+-----------------+
    | Annalyse|Montgomery|     F|  Killeen-Temple, TX|             free|
    |   Dylann|    Thomas|     M|       Anchorage, AK|             paid|
    |     Liam|     Watts|     M|New York-Newark-J...|             paid|
    |     Tess|  Townsend|     F|Nashville-Davidso...|             free|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...|             free|
    |     Alan|     Morse|     M|Chicago-Napervill...|             paid|
    |Gabriella|   Shelton|     F|San Jose-Sunnyval...|             free|
    |   Elijah|  Williams|     M|Detroit-Warren-De...|             paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...|             free|
    |     Tess|  Townsend|     F|Nashville-Davidso...|             free|
    |     Alan|     Morse|     M|Chicago-Napervill...|             paid|
    |     Liam|     Watts|     M|New York-Newark-J...|             paid|
    |     Liam|     Watts|     M|New York-Newark-J...|             paid|
    |   Dylann|    Thomas|     M|       Anchorage, AK|             paid|
    |     Alan|     Morse|     M|Chicago-Napervill...|             paid|
    |   Elijah|  Williams|     M|Detroit-Warren-De...|             paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...|             free|
    |     Alan|     Morse|     M|Chicago-Napervill...|             paid|
    |   Dylann|    Thomas|     M|       Anchorage, AK|             paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...|             free|
    +---------+----------+------+--------------------+-----------------+
    

Adatok betöltése az Azure Synapse-be

Ebben a szakaszban feltölti az átalakított adatokat az Azure Synapse-be. Az Azure Databricks Azure Synapse-összekötőjével közvetlenül feltölthet egy adatkeretet táblázatként egy Synapse Spark-készletbe.

Ahogy korábban említettük, az Azure Synapse-összekötő az Azure Blob Storage-t használja ideiglenes tárolóként az Adatok feltöltéséhez az Azure Databricks és az Azure Synapse között. Ezért első lépésként adja meg a tárfiókhoz való csatlakozáshoz szükséges konfigurációt. A cikk előfeltételeinek részeként már létre kell hoznia a fiókot.

  1. Adja meg az Azure Storage-fiók Azure Databricksből való eléréséhez szükséges konfigurációt.

    val blobStorage = "<blob-storage-account-name>.blob.core.windows.net"
    val blobContainer = "<blob-container-name>"
    val blobAccessKey =  "<access-key>"
    
  2. Adjon meg egy ideiglenes mappát, amelyet az adatok Az Azure Databricks és az Azure Synapse közötti áthelyezés során használhat.

    val tempDir = "wasbs://" + blobContainer + "@" + blobStorage +"/tempDirs"
    
  3. Futtassa az alábbi kódrészletet az Azure Blob Storage hozzáférési kulcsainak a konfigurációban való tárolásához. Ez a művelet biztosítja, hogy ne kelljen egyszerű szövegben tartania a jegyzetfüzet hozzáférési kulcsát.

    val acntInfo = "fs.azure.account.key."+ blobStorage
    sc.hadoopConfiguration.set(acntInfo, blobAccessKey)
    
  4. Adja meg az Azure Synapse-példányhoz való csatlakozáshoz szükséges értékeket. Előfeltételként létre kell hoznia egy Azure Synapse Analytics-szolgáltatást. Használja a dwServer teljes kiszolgálónevét. Például: <servername>.database.windows.net.

    //Azure Synapse related settings
    val dwDatabase = "<database-name>"
    val dwServer = "<database-server-name>"
    val dwUser = "<user-name>"
    val dwPass = "<password>"
    val dwJdbcPort =  "1433"
    val dwJdbcExtraOptions = "encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
    val sqlDwUrl = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass + ";$dwJdbcExtraOptions"
    val sqlDwUrlSmall = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass
    
  5. Futtassa az alábbi kódrészletet az átalakított, átnevezettColumnsDF nevű adatkeret azure Synapse-táblázatként való betöltéséhez. Ez a kódrészlet létrehoz egy SampleTable nevű táblát az SQL-adatbázisban.

    spark.conf.set(
        "spark.sql.parquet.writeLegacyFormat",
        "true")
    
    renamedColumnsDF.write.format("com.databricks.spark.sqldw").option("url", sqlDwUrlSmall).option("dbtable", "SampleTable")       .option( "forward_spark_azure_storage_credentials","True").option("tempdir", tempDir).mode("overwrite").save()
    

    Feljegyzés

    Ez a minta a jelzőt forward_spark_azure_storage_credentials használja, amely miatt az Azure Synapse hozzáférési kulcs használatával fér hozzá a Blob Storage-ból származó adatokhoz. Ez az egyetlen támogatott hitelesítési módszer.

    Ha az Azure Blob Storage csak virtuális hálózatok kiválasztására korlátozódik, az Azure Synapse a hozzáférési kulcsok helyett felügyeltszolgáltatás-identitást igényel. Ez "Ez a kérés nem jogosult a művelet végrehajtására".

  6. Csatlakozás az SQL-adatbázishoz, és ellenőrizze, hogy megjelenik-e egy névvel ellátott adatbázisSampleTable.

    Verify the sample table

  7. Futtasson egy választó lekérdezést a tábla tartalmának ellenőrzéséhez. A táblának ugyanazokat az adatokat kell megadnia, mint az átnevezettColumnsDF adatkeretnek.

    Verify the sample table content

Az erőforrások eltávolítása

Az oktatóanyag befejezése után megszakíthatja a fürtöt. Az Azure Databricks-munkaterületen válassza a bal oldali Fürtök lehetőséget. Ahhoz, hogy a fürt leálljon, a Műveletek csoportban mutasson a három pontra (...), és válassza a Leállítás ikont.

Stop a Databricks cluster

Ha nem állítja le manuálisan a fürtöt, az automatikusan leáll, feltéve, hogy a fürt létrehozásakor bejelölte a Megszakítás __ perc inaktivitás után jelölőnégyzetet. Ilyen esetben a fürt automatikusan leáll, ha a megadott ideig inaktív.

Következő lépések

Ez az oktatóanyag bemutatta, hogyan végezheti el az alábbi műveleteket:

  • Azure Databricks-szolgáltatás létrehozása
  • Spark-fürt létrehozása az Azure Databricksben
  • Jegyzetfüzet létrehozása az Azure Databricksben
  • Adatok kinyerása a Data Lake Storage Gen2-fiókból
  • Adatok átalakítása az Azure Databricksben
  • Adatok betöltése az Azure Synapse-be