Az első ETL-számítási feladat futtatása az Azure Databricksben

Megtudhatja, hogyan fejlesztheti és helyezheti üzembe az első kinyerési, átalakító és betöltési (ETL-) folyamatokat az Azure Databricks éles üzemre kész eszközeinek használatával az adatvezényléshez.

A cikk végére kényelmesen érezheti magát:

  1. Databricks teljes célú számítási fürt indítása.
  2. Databricks-jegyzetfüzet létrehozása.
  3. Növekményes adatbetöltés konfigurálása a Delta Lake-be automatikus betöltővel.
  4. Jegyzetfüzetcellák végrehajtása az adatok feldolgozásához, lekérdezéséhez és előnézetéhez.
  5. Jegyzetfüzet ütemezése Databricks-feladatként.

Ez az oktatóanyag interaktív jegyzetfüzeteket használ a Pythonban vagy a Scalában végzett gyakori ETL-feladatok elvégzéséhez.

A Delta Live Tables használatával ETL-folyamatokat is létrehozhat. A Databricks delta live tableset hozott létre az éles ETL-folyamatok létrehozásának, üzembe helyezésének és karbantartásának összetettségének csökkentése érdekében. Lásd az oktatóanyagot: Futtassa az első Delta Live Tables-folyamatot.

A Databricks Terraform-szolgáltatóval is létrehozhatja a cikk erőforrásait. Lásd: Fürtök, jegyzetfüzetek és feladatok létrehozása a Terraformmal.

Követelmények

  • A rendszer bejelentkezik egy Azure Databricks-munkaterületre.
  • Van engedélye fürt létrehozására.

Feljegyzés

Ha nem rendelkezik fürtvezérlési jogosultságokkal, akkor is elvégezheti az alábbi lépések többségét, amíg hozzáféréssel rendelkezik egy fürthöz.

1. lépés: Fürt létrehozása

Feltáró adatelemzés és adatelemzés elvégzéséhez hozzon létre egy fürtöt, amely biztosítja a parancsok végrehajtásához szükséges számítási erőforrásokat.

  1. Kattintson a Számítás gombra számítási ikonaz oldalsávon.
  2. A Számítás lapon kattintson a Fürt létrehozása elemre. Ekkor megnyílik az Új fürt lap.
  3. Adjon meg egy egyedi nevet a fürtnek, hagyja meg a fennmaradó értékeket az alapértelmezett állapotban, és kattintson a Fürt létrehozása gombra.

A Databricks-fürtökkel kapcsolatos további információkért lásd: Compute.

2. lépés: Databricks-jegyzetfüzet létrehozása

Ha hozzá szeretne kezdeni interaktív kód írásához és végrehajtásához az Azure Databricksben, hozzon létre egy jegyzetfüzetet.

  1. Kattintson Új ikonaz Oldalsáv Új elemére, majd a Jegyzetfüzet gombra.
  2. A Jegyzetfüzet létrehozása lapon:
    • Adjon meg egy egyedi nevet a jegyzetfüzetnek.
    • Győződjön meg arról, hogy az alapértelmezett nyelv Python vagy Scala.
    • Válassza ki az 1. lépésben létrehozott fürtöt a Fürt legördülő listából.
    • Kattintson a Létrehozás gombra.

Megnyílik egy jegyzetfüzet, amelynek tetején egy üres cella látható.

A jegyzetfüzetek létrehozásáról és kezeléséről további információt a Jegyzetfüzetek kezelése című témakörben talál.

3. lépés: Az automatikus betöltő konfigurálása adatok Delta Lake-be való betöltéséhez

A Databricks az Automatikus betöltő használatát javasolja a növekményes adatbetöltéshez. Az automatikus betöltő automatikusan észleli és feldolgozza az új fájlokat, amikor megérkeztek a felhőobjektum-tárolóba.

A Databricks azt javasolja, hogy tárolja az adatokat a Delta Lake-zel. A Delta Lake egy nyílt forráskód tárolási réteg, amely ACID-tranzakciókat biztosít, és lehetővé teszi a data lakehouse-t. A Databricksben létrehozott táblák alapértelmezett formátuma a Delta Lake.

Ha úgy szeretné konfigurálni az Automatikus betöltőt, hogy adatokat töltsen be egy Delta Lake-táblába, másolja és illessze be a következő kódot a jegyzetfüzet üres cellájába:

Python

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"

# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name))

Scala

// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._

// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"

// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)

// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(Trigger.AvailableNow)
  .toTable(table_name)

Feljegyzés

A kódban definiált változóknak lehetővé kell tenni, hogy biztonságosan végrehajtsa azt anélkül, hogy a meglévő munkaterületi eszközökkel vagy más felhasználókkal ütközhet. A korlátozott hálózati vagy tárolási engedélyek hibát okoznak a kód végrehajtásakor; A korlátozások elhárításához forduljon a munkaterület rendszergazdájához.

Az automatikus betöltőről további információt a Mi az automatikus betöltő? című témakörben talál.

4. lépés: Adatok feldolgozása és használata

A jegyzetfüzetek cellánként hajtják végre a logikai cellákat. A logika végrehajtása a cellában:

  1. Az előző lépésben befejezett cella futtatásához jelölje ki a cellát, és nyomja le a SHIFT+ENTER billentyűkombinációt.

  2. Az imént létrehozott táblázat lekérdezéséhez másolja és illessze be a következő kódot egy üres cellába, majd nyomja le a SHIFT+ENTER billentyűkombinációt a cella futtatásához.

    Python

    df = spark.read.table(table_name)
    

    Scala

    val df = spark.read.table(table_name)
    
  3. A DataFrame adatainak előnézetéhez másolja és illessze be a következő kódot egy üres cellába, majd nyomja le a SHIFT+ENTER billentyűkombinációt a cella futtatásához.

    Python

    display(df)
    

    Scala

    display(df)
    

Az adatok megjelenítésének interaktív lehetőségeiről további információt a Databricks-jegyzetfüzetek vizualizációi című témakörben talál.

5. lépés: Feladat ütemezése

A Databricks-jegyzetfüzeteket éles szkriptekként futtathatja, ha feladatként adja hozzá őket egy Databricks-feladathoz. Ebben a lépésben létrehoz egy új feladatot, amelyet manuálisan aktiválhat.

A jegyzetfüzet feladatként való ütemezése:

  1. Kattintson az Ütemezés gombra a fejlécsáv jobb oldalán.
  2. Adjon meg egy egyedi nevet a feladatnévnek.
  3. Kattintson a Kézi gombra.
  4. A Fürt legördülő listában válassza ki az 1. lépésben létrehozott fürtöt.
  5. Kattintson a Létrehozás gombra.
  6. A megjelenő ablakban kattintson a Futtatás gombra.
  7. A feladatfuttatás eredményeinek megtekintéséhez kattintson az Külső hivatkozás Utolsó futtatási időbélyeg melletti ikonra.

A feladatokról további információt az Azure Databricks-feladatok ismertetése című témakörben talál.

További integrációk

További információ az Azure Databricks adatmérnöki integrációjáról és eszközeiről: