Oktatóanyag: Végpontok közötti lakehouse-elemzési folyamat futtatása
Ez az oktatóanyag bemutatja, hogyan állíthat be egy végpontok közötti elemzési folyamatot egy Azure Databricks lakehouse-hoz.
Fontos
Ez az oktatóanyag interaktív jegyzetfüzetekkel végzi el a Pythonban a Unity Catalog-kompatibilis fürtökön végzett gyakori ETL-feladatokat. Ha nem a Unity Catalogot használja, olvassa el az első ETL-számítási feladat futtatása az Azure Databricksben című témakört.
Ebben az oktatóanyagban szereplő feladatok
A cikk végére kényelmesen érezheti magát:
- Unity Catalog-kompatibilis számítási fürt indítása.
- Databricks-jegyzetfüzet létrehozása.
- Adatok írása és olvasása egy Unity-katalógus külső helyről.
- Növekményes adatbetöltés konfigurálása Unity Catalog-táblába az Automatikus betöltővel.
- Jegyzetfüzetcellák végrehajtása az adatok feldolgozásához, lekérdezéséhez és előnézetéhez.
- Jegyzetfüzet ütemezése Databricks-feladatként.
- Unity Catalog-táblák lekérdezése a Databricks SQL-ből
Az Azure Databricks éles üzemre kész eszközöket biztosít, amelyek lehetővé teszik az adatszakértők számára a kinyerési, átalakítási és betöltési (ETL-) folyamatok gyors fejlesztését és üzembe helyezését. A Unity Catalog lehetővé teszi, hogy az adatgondnokok konfigurálják és biztonságossá teszik a tároló hitelesítő adatait, a külső helyeket és az adatbázis-objektumokat a teljes szervezet felhasználói számára. A Databricks SQL lehetővé teszi az elemzők számára, hogy sql-lekérdezéseket futtasson az éles ETL számítási feladatokban használt táblákon, így nagy méretekben valós idejű üzleti intelligenciát tesz lehetővé.
A Delta Live Tables használatával ETL-folyamatokat is létrehozhat. A Databricks delta live tableset hozott létre az éles ETL-folyamatok létrehozásának, üzembe helyezésének és karbantartásának összetettségének csökkentése érdekében. Lásd az oktatóanyagot: Futtassa az első Delta Live Tables-folyamatot.
Követelmények
Feljegyzés
Ha nem rendelkezik fürtvezérlési jogosultságokkal, akkor is elvégezheti az alábbi lépések többségét, amíg hozzáféréssel rendelkezik egy fürthöz.
1. lépés: Fürt létrehozása
Feltáró adatelemzés és adatelemzés elvégzéséhez hozzon létre egy fürtöt, amely biztosítja a parancsok végrehajtásához szükséges számítási erőforrásokat.
- Kattintson a Számítás gombra az oldalsávon.
- Kattintson az Oldalsáv Új gombjára, majd válassza a Fürt lehetőséget. Ekkor megnyílik az Új fürt/Számítás lap.
- Adja meg a fürt egyedi nevét.
- Válassza az egycsomópontos választógombot.
- Válassza az Egy felhasználó lehetőséget a Hozzáférési mód legördülő listából.
- Győződjön meg arról, hogy az e-mail-címe látható az Egyfelhasználós mezőben.
- A Unity Catalog használatához válassza ki a Databricks 11.1-es vagy újabb verzióját.
- Kattintson a Számítás létrehozása elemre a fürt létrehozásához.
A Databricks-fürtökkel kapcsolatos további információkért lásd: Compute.
2. lépés: Databricks-jegyzetfüzet létrehozása
Ha jegyzetfüzetet szeretne létrehozni a munkaterületen, kattintson az Oldalsáv Új gombjára, majd a Jegyzetfüzet elemre. Megnyílik egy üres jegyzetfüzet a munkaterületen.
A jegyzetfüzetek létrehozásáról és kezeléséről további információt a Jegyzetfüzetek kezelése című témakörben talál.
3. lépés: Adatok írása és olvasása a Unity Catalog által felügyelt külső helyről
A Databricks az Automatikus betöltő használatát javasolja a növekményes adatbetöltéshez. Az automatikus betöltő automatikusan észleli és feldolgozza az új fájlokat, amikor megérkeztek a felhőobjektum-tárolóba.
A Unity Catalog használatával kezelheti a külső helyekhez való biztonságos hozzáférést. A külső helyen engedélyekkel rendelkező READ FILES
felhasználók vagy szolgáltatásnevek az Automatikus betöltő használatával tölthetik be az adatokat.
Az adatok általában más rendszerekből származó írások miatt külső helyre érkeznek. Ebben a bemutatóban az adatok érkezését szimulálhatja úgy, hogy JSON-fájlokat ír ki egy külső helyre.
Másolja az alábbi kódot egy jegyzetfüzetcellába. Cserélje le a sztring értékét catalog
a katalógus nevére és USE CATALOG
az engedélyekreCREATE CATALOG
. Cserélje le a sztring értékét external_location
egy külső hely elérési útjára a következőre READ FILES
: , WRITE FILES
és CREATE EXTERNAL TABLE
engedélyek.
A külső helyek definiálhatók teljes tárolóként, de gyakran egy tárolóba ágyazott könyvtárra mutatnak.
A külső hely elérési útjának formátuma a "abfss://container_name@storage_account.dfs.core.windows.net/path/to/external_location"
megfelelő.
external_location = "<your-external-location>"
catalog = "<your-catalog>"
dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
display(dbutils.fs.head(f"{external_location}/filename.txt"))
dbutils.fs.rm(f"{external_location}/filename.txt")
display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))
A cella végrehajtásához egy 12 bájtos vonalat kell nyomtatnia, ki kell nyomtatnia a "Hello world!" sztringet, és meg kell jelenítenie a katalógusban található összes adatbázist. Ha nem tudja futtatni ezt a cellát, győződjön meg arról, hogy unitykatalógus-kompatibilis munkaterületen van, és megfelelő engedélyeket kér a munkaterület rendszergazdájától az oktatóanyag elvégzéséhez.
Az alábbi Python-kód az e-mail-cím használatával hoz létre egy egyedi adatbázist a megadott katalógusban, és egy egyedi tárolási helyet a megadott külső helyen. A cella végrehajtása eltávolítja az oktatóanyaghoz társított összes adatot, lehetővé téve a példa idempotens végrehajtását. Egy osztály definiálva és példányosítva lesz, amellyel szimulálhatja a csatlakoztatott rendszerből a forrás külső helyére érkező adatkötegeket.
Másolja ezt a kódot a jegyzetfüzet egy új cellájába, és hajtsa végre a környezet konfigurálásához.
Feljegyzés
A kódban definiált változóknak lehetővé kell tenni, hogy biztonságosan végrehajtsa azt anélkül, hogy a meglévő munkaterületi eszközökkel vagy más felhasználókkal ütközhet. A korlátozott hálózati vagy tárolási engedélyek hibát okoznak a kód végrehajtásakor; A korlátozások elhárításához forduljon a munkaterület rendszergazdájához.
from pyspark.sql.functions import col
# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"
spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")
spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")
# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)
# Define a class to load batches of data to source
class LoadData:
def __init__(self, source):
self.source = source
def get_date(self):
try:
df = spark.read.format("json").load(source)
except:
return "2016-01-01"
batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
if batch_date.month == 3:
raise Exception("Source data exhausted")
return batch_date
def get_batch(self, batch_date):
return (
spark.table("samples.nyctaxi.trips")
.filter(col("tpep_pickup_datetime").cast("date") == batch_date)
)
def write_batch(self, batch):
batch.write.format("json").mode("append").save(self.source)
def land_batch(self):
batch_date = self.get_date()
batch = self.get_batch(batch_date)
self.write_batch(batch)
RawData = LoadData(source)
Most már le is helyezhet egy adatköteget úgy, hogy a következő kódot egy cellába másolja és végrehajtja. Ezt a cellát akár 60 alkalommal manuálisan is végrehajthatja az új adatok érkezésének aktiválásához.
RawData.land_batch()
4. lépés: Az automatikus betöltő konfigurálása adatok Unity-katalógusba való betöltéséhez
A Databricks azt javasolja, hogy tárolja az adatokat a Delta Lake-zel. A Delta Lake egy nyílt forráskód tárolási réteg, amely ACID-tranzakciókat biztosít, és lehetővé teszi a data lakehouse-t. A Databricksben létrehozott táblák alapértelmezett formátuma a Delta Lake.
Az Automatikus betöltő konfigurálásához másolja és illessze be a következő kódot egy üres cellába a jegyzetfüzetben:
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(source)
.select("*", col("_metadata.source").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.option("mergeSchema", "true")
.toTable(table))
Az automatikus betöltőről további információt a Mi az automatikus betöltő? című témakörben talál.
A Strukturált streamelés és a Unity Catalog használatával kapcsolatos további információkért lásd : A Unity Katalógus használata strukturált streameléssel.
5. lépés: Adatok feldolgozása és használata
A jegyzetfüzetek cellánként hajtják végre a logikai cellákat. A következő lépésekkel hajthatja végre a logikát a cellában:
Az előző lépésben befejezett cella futtatásához jelölje ki a cellát, és nyomja le a SHIFT+ENTER billentyűkombinációt.
Az imént létrehozott táblázat lekérdezéséhez másolja és illessze be a következő kódot egy üres cellába, majd nyomja le a SHIFT+ENTER billentyűkombinációt a cella futtatásához.
df = spark.read.table(table)
A DataFrame adatainak előnézetéhez másolja és illessze be a következő kódot egy üres cellába, majd nyomja le a SHIFT+ENTER billentyűkombinációt a cella futtatásához.
display(df)
Az adatok megjelenítésének interaktív lehetőségeiről további információt a Databricks-jegyzetfüzetek vizualizációi című témakörben talál.
6. lépés: Feladat ütemezése
A Databricks-jegyzetfüzeteket éles szkriptekként futtathatja, ha feladatként adja hozzá őket egy Databricks-feladathoz. Ebben a lépésben létrehoz egy új feladatot, amelyet manuálisan aktiválhat.
A jegyzetfüzet feladatként való ütemezése:
- Kattintson az Ütemezés gombra a fejlécsáv jobb oldalán.
- Adjon meg egy egyedi nevet a feladatnévnek.
- Kattintson a Kézi gombra.
- A Fürt legördülő listában válassza ki az 1. lépésben létrehozott fürtöt.
- Kattintson a Létrehozás gombra.
- A megjelenő ablakban kattintson a Futtatás gombra.
- A feladatfuttatás eredményeinek megtekintéséhez kattintson az Utolsó futtatási időbélyeg melletti ikonra.
További információ a feladatokról: Mik azok a Databricks-feladatok?.
7. lépés: Tábla lekérdezése a Databricks SQL-ből
Bárki, aki rendelkezik az USE CATALOG
aktuális katalógus engedélyével, az USE SCHEMA
aktuális sémára vonatkozó engedéllyel és SELECT
a tábla engedélyeivel, lekérdezheti a tábla tartalmát az előnyben részesített Databricks API-ból.
A Databricks SQL-ben lekérdezések végrehajtásához hozzá kell férnie egy futó SQL-raktárhoz.
Az oktatóanyag korábbi részében létrehozott tábla neve target_table
. Lekérdezheti az első cellában megadott katalógus és a patern e2e_lakehouse_<your-username>
adatbázis használatával. A Katalóguskezelővel megkeresheti a létrehozott adatobjektumokat.
További integrációk
További információ az Azure Databricks adatmérnöki integrációjáról és eszközeiről: