Share via


Oktatóanyag: Végpontok közötti lakehouse-elemzési folyamat futtatása

Ez az oktatóanyag bemutatja, hogyan állíthat be egy végpontok közötti elemzési folyamatot egy Azure Databricks lakehouse-hoz.

Fontos

Ez az oktatóanyag interaktív jegyzetfüzetekkel végzi el a Pythonban a Unity Catalog-kompatibilis fürtökön végzett gyakori ETL-feladatokat. Ha nem a Unity Catalogot használja, olvassa el az első ETL-számítási feladat futtatása az Azure Databricksben című témakört.

Ebben az oktatóanyagban szereplő feladatok

A cikk végére kényelmesen érezheti magát:

  1. Unity Catalog-kompatibilis számítási fürt indítása.
  2. Databricks-jegyzetfüzet létrehozása.
  3. Adatok írása és olvasása egy Unity-katalógus külső helyről.
  4. Növekményes adatbetöltés konfigurálása Unity Catalog-táblába az Automatikus betöltővel.
  5. Jegyzetfüzetcellák végrehajtása az adatok feldolgozásához, lekérdezéséhez és előnézetéhez.
  6. Jegyzetfüzet ütemezése Databricks-feladatként.
  7. Unity Catalog-táblák lekérdezése a Databricks SQL-ből

Az Azure Databricks éles üzemre kész eszközöket biztosít, amelyek lehetővé teszik az adatszakértők számára a kinyerési, átalakítási és betöltési (ETL-) folyamatok gyors fejlesztését és üzembe helyezését. A Unity Catalog lehetővé teszi, hogy az adatgondnokok konfigurálják és biztonságossá teszik a tároló hitelesítő adatait, a külső helyeket és az adatbázis-objektumokat a teljes szervezet felhasználói számára. A Databricks SQL lehetővé teszi az elemzők számára, hogy sql-lekérdezéseket futtasson az éles ETL számítási feladatokban használt táblákon, így nagy méretekben valós idejű üzleti intelligenciát tesz lehetővé.

A Delta Live Tables használatával ETL-folyamatokat is létrehozhat. A Databricks delta live tableset hozott létre az éles ETL-folyamatok létrehozásának, üzembe helyezésének és karbantartásának összetettségének csökkentése érdekében. Lásd az oktatóanyagot: Futtassa az első Delta Live Tables-folyamatot.

Követelmények

Feljegyzés

Ha nem rendelkezik fürtvezérlési jogosultságokkal, akkor is elvégezheti az alábbi lépések többségét, amíg hozzáféréssel rendelkezik egy fürthöz.

1. lépés: Fürt létrehozása

Feltáró adatelemzés és adatelemzés elvégzéséhez hozzon létre egy fürtöt, amely biztosítja a parancsok végrehajtásához szükséges számítási erőforrásokat.

  1. Kattintson a Számítás gombra számítási ikonaz oldalsávon.
  2. Kattintson Új ikonaz Oldalsáv Új gombjára, majd válassza a Fürt lehetőséget. Ekkor megnyílik az Új fürt/Számítás lap.
  3. Adja meg a fürt egyedi nevét.
  4. Válassza az egycsomópontos választógombot.
  5. Válassza az Egy felhasználó lehetőséget a Hozzáférési mód legördülő listából.
  6. Győződjön meg arról, hogy az e-mail-címe látható az Egyfelhasználós mezőben.
  7. A Unity Catalog használatához válassza ki a Databricks 11.1-es vagy újabb verzióját.
  8. Kattintson a Számítás létrehozása elemre a fürt létrehozásához.

A Databricks-fürtökkel kapcsolatos további információkért lásd: Compute.

2. lépés: Databricks-jegyzetfüzet létrehozása

Ha hozzá szeretne kezdeni interaktív kód írásához és végrehajtásához az Azure Databricksben, hozzon létre egy jegyzetfüzetet.

  1. Kattintson Új ikonaz Oldalsáv Új elemére, majd a Jegyzetfüzet gombra.
  2. A Jegyzetfüzet létrehozása lapon:
    • Adjon meg egy egyedi nevet a jegyzetfüzetnek.
    • Győződjön meg arról, hogy az alapértelmezett nyelv Pythonra van állítva.
    • A Csatlakozás legördülő menüben válassza ki az 1. lépésben létrehozott fürtöt a Fürt legördülő listából.

A jegyzetfüzet egy üres cellával nyílik meg.

A jegyzetfüzetek létrehozásáról és kezeléséről további információt a Jegyzetfüzetek kezelése című témakörben talál.

3. lépés: Adatok írása és olvasása a Unity Catalog által felügyelt külső helyről

A Databricks az Automatikus betöltő használatát javasolja a növekményes adatbetöltéshez. Az automatikus betöltő automatikusan észleli és feldolgozza az új fájlokat, amikor megérkeztek a felhőobjektum-tárolóba.

A Unity Catalog használatával kezelheti a külső helyekhez való biztonságos hozzáférést. A külső helyen engedélyekkel rendelkező READ FILES felhasználók vagy szolgáltatásnevek az Automatikus betöltő használatával tölthetik be az adatokat.

Az adatok általában más rendszerekből származó írások miatt külső helyre érkeznek. Ebben a bemutatóban az adatok érkezését szimulálhatja úgy, hogy JSON-fájlokat ír ki egy külső helyre.

Másolja az alábbi kódot egy jegyzetfüzetcellába. Cserélje le a sztring értékét catalog a katalógus nevére és USE CATALOG az engedélyekreCREATE CATALOG. Cserélje le a sztring értékét external_location egy külső hely elérési útjára a következőre READ FILES: , WRITE FILESés CREATE EXTERNAL TABLE engedélyek.

A külső helyek definiálhatók teljes tárolóként, de gyakran egy tárolóba ágyazott könyvtárra mutatnak.

A külső hely elérési útjának formátuma a "abfss://container_name@storage_account.dfs.core.windows.net/path/to/external_location"megfelelő.


 external_location = "<your-external-location>"
 catalog = "<your-catalog>"

 dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
 display(dbutils.fs.head(f"{external_location}/filename.txt"))
 dbutils.fs.rm(f"{external_location}/filename.txt")

 display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))

A cella végrehajtásához egy 12 bájtos vonalat kell nyomtatnia, ki kell nyomtatnia a "Hello world!" sztringet, és meg kell jelenítenie a katalógusban található összes adatbázist. Ha nem tudja futtatni ezt a cellát, győződjön meg arról, hogy unitykatalógus-kompatibilis munkaterületen van, és megfelelő engedélyeket kér a munkaterület rendszergazdájától az oktatóanyag elvégzéséhez.

Az alábbi Python-kód az e-mail-cím használatával hoz létre egy egyedi adatbázist a megadott katalógusban, és egy egyedi tárolási helyet a megadott külső helyen. A cella végrehajtása eltávolítja az oktatóanyaghoz társított összes adatot, lehetővé téve a példa idempotens végrehajtását. Egy osztály definiálva és példányosítva lesz, amellyel szimulálhatja a csatlakoztatott rendszerből a forrás külső helyére érkező adatkötegeket.

Másolja ezt a kódot a jegyzetfüzet egy új cellájába, és hajtsa végre a környezet konfigurálásához.

Feljegyzés

A kódban definiált változóknak lehetővé kell tenni, hogy biztonságosan végrehajtsa azt anélkül, hogy a meglévő munkaterületi eszközökkel vagy más felhasználókkal ütközhet. A korlátozott hálózati vagy tárolási engedélyek hibát okoznak a kód végrehajtásakor; A korlátozások elhárításához forduljon a munkaterület rendszergazdájához.


from pyspark.sql.functions import col

# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"

spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")

spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")

# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)

# Define a class to load batches of data to source
class LoadData:

    def __init__(self, source):
        self.source = source

    def get_date(self):
        try:
            df = spark.read.format("json").load(source)
        except:
            return "2016-01-01"
        batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
        if batch_date.month == 3:
            raise Exception("Source data exhausted")
        return batch_date

    def get_batch(self, batch_date):
        return (
            spark.table("samples.nyctaxi.trips")
            .filter(col("tpep_pickup_datetime").cast("date") == batch_date)
        )

    def write_batch(self, batch):
        batch.write.format("json").mode("append").save(self.source)

    def land_batch(self):
        batch_date = self.get_date()
        batch = self.get_batch(batch_date)
        self.write_batch(batch)

RawData = LoadData(source)

Most már le is helyezhet egy adatköteget úgy, hogy a következő kódot egy cellába másolja és végrehajtja. Ezt a cellát akár 60 alkalommal manuálisan is végrehajthatja az új adatok érkezésének aktiválásához.

RawData.land_batch()

4. lépés: Az automatikus betöltő konfigurálása adatok Unity-katalógusba való betöltéséhez

A Databricks azt javasolja, hogy tárolja az adatokat a Delta Lake-zel. A Delta Lake egy nyílt forráskód tárolási réteg, amely ACID-tranzakciókat biztosít, és lehetővé teszi a data lakehouse-t. A Databricksben létrehozott táblák alapértelmezett formátuma a Delta Lake.

Az Automatikus betöltő konfigurálásához másolja és illessze be a következő kódot egy üres cellába a jegyzetfüzetben:

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .option("mergeSchema", "true")
  .toTable(table))

Az automatikus betöltőről további információt a Mi az automatikus betöltő? című témakörben talál.

A Strukturált streamelés és a Unity Catalog használatával kapcsolatos további információkért lásd : A Unity Katalógus használata strukturált streameléssel.

5. lépés: Adatok feldolgozása és használata

A jegyzetfüzetek cellánként hajtják végre a logikai cellákat. A következő lépésekkel hajthatja végre a logikát a cellában:

  1. Az előző lépésben befejezett cella futtatásához jelölje ki a cellát, és nyomja le a SHIFT+ENTER billentyűkombinációt.

  2. Az imént létrehozott táblázat lekérdezéséhez másolja és illessze be a következő kódot egy üres cellába, majd nyomja le a SHIFT+ENTER billentyűkombinációt a cella futtatásához.

    df = spark.read.table(table_name)
    
  3. A DataFrame adatainak előnézetéhez másolja és illessze be a következő kódot egy üres cellába, majd nyomja le a SHIFT+ENTER billentyűkombinációt a cella futtatásához.

    display(df)
    

Az adatok megjelenítésének interaktív lehetőségeiről további információt a Databricks-jegyzetfüzetek vizualizációi című témakörben talál.

6. lépés: Feladat ütemezése

A Databricks-jegyzetfüzeteket éles szkriptekként futtathatja, ha feladatként adja hozzá őket egy Databricks-feladathoz. Ebben a lépésben létrehoz egy új feladatot, amelyet manuálisan aktiválhat.

A jegyzetfüzet feladatként való ütemezése:

  1. Kattintson az Ütemezés gombra a fejlécsáv jobb oldalán.
  2. Adjon meg egy egyedi nevet a feladatnévnek.
  3. Kattintson a Kézi gombra.
  4. A Fürt legördülő listában válassza ki az 1. lépésben létrehozott fürtöt.
  5. Kattintson a Létrehozás gombra.
  6. A megjelenő ablakban kattintson a Futtatás gombra.
  7. A feladatfuttatás eredményeinek megtekintéséhez kattintson az Külső hivatkozás Utolsó futtatási időbélyeg melletti ikonra.

A feladatokról további információt az Azure Databricks-feladatok ismertetése című témakörben talál.

7. lépés: Tábla lekérdezése a Databricks SQL-ből

Bárki, aki rendelkezik az USE CATALOG aktuális katalógus engedélyével, az USE SCHEMA aktuális sémára vonatkozó engedéllyel és SELECT a tábla engedélyeivel, lekérdezheti a tábla tartalmát az előnyben részesített Databricks API-ból.

A Databricks SQL-ben lekérdezések végrehajtásához hozzá kell férnie egy futó SQL-raktárhoz.

Az oktatóanyag korábbi részében létrehozott tábla neve target_table. Lekérdezheti az első cellában megadott katalógus és a patern e2e_lakehouse_<your-username>adatbázis használatával. A Katalóguskezelővel megkeresheti a létrehozott adatobjektumokat.

További integrációk

További információ az Azure Databricks adatmérnöki integrációjáról és eszközeiről: