Sdílet prostřednictvím


Spuštění první úlohy ETL v Azure Databricks

Zjistěte, jak pomocí nástrojů připravených pro produkční prostředí z Azure Databricks vyvíjet a nasazovat první kanály extrakce, transformace a načítání (ETL) pro orchestraci dat.

Na konci tohoto článku se budete cítit pohodlně:

  1. Spuštění výpočetního clusteru Databricks pro všechny účely
  2. Vytvoření poznámkového bloku Databricks
  3. Konfigurace přírůstkového příjmu dat do Delta Lake pomocí automatického zavaděče
  4. Spouštění buněk poznámkového bloku pro zpracování, dotazování a náhled dat
  5. Plánování poznámkového bloku jako úlohy Databricks

Tento kurz používá interaktivní poznámkové bloky k dokončení běžných úloh ETL v Pythonu nebo Scala.

K sestavení kanálů ETL můžete také použít rozdílové živé tabulky. Databricks vytvořila dynamické tabulky Delta, aby se snížila složitost sestavování, nasazování a údržby produkčních kanálů ETL. Viz kurz: Spuštění prvního kanálu dynamických tabulek Delta.

K vytvoření prostředků tohoto článku můžete použít také zprostředkovatele Databricks Terraform. Viz Vytváření clusterů, poznámkových bloků a úloh pomocí Terraformu.

Požadavky

Poznámka:

Pokud nemáte oprávnění ke kontrole clusteru, můžete většinu následujících kroků dokončit, pokud máte přístup ke clusteru.

Krok 1: Vytvoření clusteru

Pokud chcete provádět průzkumnou analýzu dat a přípravu dat, vytvořte cluster, který poskytuje výpočetní prostředky potřebné ke spouštění příkazů.

  1. Na bočním panelu klikněte na Ikona výpočetních prostředků Výpočty.
  2. Na stránce Compute klikněte na Vytvořit cluster. Otevře se stránka Nový cluster.
  3. Zadejte jedinečný název clusteru, ponechte zbývající hodnoty ve výchozím stavu a klikněte na Vytvořit cluster.

Další informace o clusterech Databricks najdete v tématu Výpočty.

Krok 2: Vytvoření poznámkového bloku Databricks

Chcete-li vytvořit poznámkový blok v pracovním prostoru, klepněte na tlačítko Nová ikona Nový na bočním panelu a potom klepněte na příkaz Poznámkový blok. V pracovním prostoru se otevře prázdný poznámkový blok.

Další informace o vytváření a správě poznámkových bloků najdete v tématu Správa poznámkových bloků.

Krok 3: Konfigurace automatického zavaděče pro příjem dat do Delta Lake

Databricks doporučuje používat automatický zavaděč pro přírůstkové příjem dat. Auto Loader automaticky rozpozná a zpracuje nové soubory při jejich doručení do cloudového úložiště objektů.

Databricks doporučuje ukládat data pomocí Delta Lake. Delta Lake je opensourcová vrstva úložiště, která poskytuje transakce ACID a umožňuje datové jezero. Delta Lake je výchozí formát pro tabulky vytvořené v Databricks.

Pokud chcete nakonfigurovat automatický zavaděč pro příjem dat do tabulky Delta Lake, zkopírujte a vložte do prázdné buňky v poznámkovém bloku následující kód:

Python

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"

# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name))

Scala

// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._

// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"

// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)

// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(Trigger.AvailableNow)
  .toTable(table_name)

Poznámka:

Proměnné definované v tomto kódu by vám měly umožnit bezpečné spuštění bez rizika konfliktu s existujícími prostředky pracovního prostoru nebo jinými uživateli. Omezená oprávnění k síti nebo úložišti způsobí chyby při provádění tohoto kódu; Požádejte správce pracovního prostoru o řešení těchto omezení.

Další informace o automatickém zavaděči najdete v tématu Co je automatický zavaděč?.

Krok 4: Zpracování dat a interakce s nimi

Poznámkové bloky spouštějí logické buňky po buňce. Spuštění logiky v buňce:

  1. Pokud chcete buňku, kterou jste dokončili v předchozím kroku, spustit, vyberte buňku a stiskněte SHIFT+ENTER.

  2. Pokud chcete zadat dotaz na tabulku, kterou jste právě vytvořili, zkopírujte následující kód a vložte ho do prázdné buňky a stisknutím kombinace kláves SHIFT+ENTER buňku spusťte.

    Python

    df = spark.read.table(table_name)
    

    Scala

    val df = spark.read.table(table_name)
    
  3. Pokud chcete zobrazit náhled dat v datovém rámci, zkopírujte a vložte následující kód do prázdné buňky a potom buňku spusťte stisknutím kombinace kláves SHIFT+ENTER .

    Python

    display(df)
    

    Scala

    display(df)
    

Další informace o interaktivních možnostech vizualizace dat najdete v tématu Vizualizace v poznámkových blocích Databricks.

Krok 5: Naplánování úlohy

Poznámkové bloky Databricks můžete spustit jako produkční skripty tak, že je přidáte jako úlohu do úlohy Databricks. V tomto kroku vytvoříte novou úlohu, kterou můžete aktivovat ručně.

Naplánování poznámkového bloku jako úkolu:

  1. Na pravé straně záhlaví klikněte na Plán .
  2. Zadejte jedinečný název pro název úlohy.
  3. Klikněte na Ruční.
  4. V rozevíracím seznamu Cluster vyberte cluster, který jste vytvořili v kroku 1.
  5. Klikněte na Vytvořit.
  6. V zobrazeném okně klikněte na Spustit.
  7. Pokud chcete zobrazit výsledky spuštění úlohy, klikněte na Externí odkaz ikonu vedle časového razítka posledního spuštění .

Další informace o úlohách najdete v tématu Co jsou úlohy Databricks?

Další integrace

Další informace o integracích a nástrojích pro přípravu dat pomocí Azure Databricks: