Sdílet prostřednictvím


Kurz: Spuštění kompletního analytického kanálu Lakehouse

V tomto kurzu se dozvíte, jak nastavit komplexní analytický kanál pro Azure Databricks Lakehouse.

Důležité

Tento kurz používá interaktivní poznámkové bloky k dokončení běžných úloh ETL v Pythonu v clusterech s podporou katalogu Unity. Pokud nepoužíváte katalog Unity, přečtěte si téma Spuštění první úlohy ETL v Azure Databricks.

Úkoly v tomto kurzu

Na konci tohoto článku se budete cítit pohodlně:

  1. Spuštění výpočetního clusteru s povoleným katalogem Unity
  2. Vytvoření poznámkového bloku Databricks
  3. Zápis a čtení dat z externího umístění katalogu Unity
  4. Konfigurace přírůstkového příjmu dat do tabulky katalogu Unity pomocí automatického zavaděče
  5. Spouštění buněk poznámkového bloku pro zpracování, dotazování a náhled dat
  6. Plánování poznámkového bloku jako úlohy Databricks
  7. Dotazování tabulek katalogu Unity z Databricks SQL

Azure Databricks poskytuje sadu nástrojů připravených pro produkční prostředí, které odborníkům na data umožňují rychle vyvíjet a nasazovat kanály extrakce, transformace a načítání (ETL). Katalog Unity umožňuje správci dat konfigurovat a zabezpečit přihlašovací údaje úložiště, externí umístění a databázové objekty pro uživatele v celé organizaci. Databricks SQL umožňuje analytikům spouštět dotazy SQL na stejné tabulky, které se používají v produkčních úlohách ETL, což umožňuje ve velkém měřítku business intelligence v reálném čase.

K sestavení kanálů ETL můžete také použít rozdílové živé tabulky. Databricks vytvořila dynamické tabulky Delta, aby se snížila složitost sestavování, nasazování a údržby produkčních kanálů ETL. Viz kurz: Spuštění prvního kanálu dynamických tabulek Delta.

Požadavky

Poznámka:

Pokud nemáte oprávnění ke kontrole clusteru, můžete většinu následujících kroků dokončit, pokud máte přístup ke clusteru.

Krok 1: Vytvoření clusteru

Pokud chcete provádět průzkumnou analýzu dat a přípravu dat, vytvořte cluster, který poskytuje výpočetní prostředky potřebné ke spouštění příkazů.

  1. Na bočním panelu klikněte na Ikona výpočetních prostředků Výpočty.
  2. Na bočním panelu klikněte na Nová ikona Nový a pak vyberte Cluster. Otevře se stránka Nový cluster nebo výpočetní prostředky.
  3. Zadejte jedinečný název clusteru.
  4. Vyberte přepínač s jedním uzlem.
  5. V rozevíracím seznamu Režim přístupu vyberte jednoho uživatele.
  6. Ujistěte se, že je vaše e-mailová adresa viditelná v poli Jeden uživatel .
  7. Vyberte požadovanou verzi modulu runtime Databricks 11.1 nebo vyšší, abyste mohli použít katalog Unity.
  8. Kliknutím na Vytvořit výpočetní prostředky vytvořte cluster.

Další informace o clusterech Databricks najdete v tématu Výpočty.

Krok 2: Vytvoření poznámkového bloku Databricks

Chcete-li vytvořit poznámkový blok v pracovním prostoru, klepněte na tlačítko Nová ikona Nový na bočním panelu a potom klepněte na příkaz Poznámkový blok. V pracovním prostoru se otevře prázdný poznámkový blok.

Další informace o vytváření a správě poznámkových bloků najdete v tématu Správa poznámkových bloků.

Krok 3: Zápis a čtení dat z externího umístění spravovaného katalogem Unity

Databricks doporučuje používat automatický zavaděč pro přírůstkové příjem dat. Auto Loader automaticky rozpozná a zpracuje nové soubory při jejich doručení do cloudového úložiště objektů.

Pomocí katalogu Unity můžete spravovat zabezpečený přístup k externím umístěním. Uživatelé nebo instanční objekty s oprávněními READ FILES k externímu umístění můžou k příjmu dat použít automatický zavaděč.

Za normálních okolností data přijdou do externího umístění kvůli zápisům z jiných systémů. V této ukázce můžete simulovat doručení dat tím, že do externího umístění zapíšete soubory JSON.

Zkopírujte následující kód do buňky poznámkového bloku. Nahraďte hodnotu catalog řetězce názvem katalogu a USE CATALOG oprávněnímiCREATE CATALOG. Nahraďte hodnotu external_location řetězce cestou pro externí umístění znakem READ FILES, WRITE FILESa CREATE EXTERNAL TABLE oprávněními.

Externí umístění se dají definovat jako celý kontejner úložiště, ale často odkazují na adresář vnořený do kontejneru.

Správný formát cesty k externímu umístění je "abfss://container_name@storage_account.dfs.core.windows.net/path/to/external_location".


 external_location = "<your-external-location>"
 catalog = "<your-catalog>"

 dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
 display(dbutils.fs.head(f"{external_location}/filename.txt"))
 dbutils.fs.rm(f"{external_location}/filename.txt")

 display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))

Spuštění této buňky by mělo vytisknout řádek, který čte 12 bajtů, vytiskne řetězec "Hello world!" a zobrazí všechny databáze, které jsou v katalogu. Pokud se vám nedaří tuto buňku spustit, ověřte, že jste v pracovním prostoru s povoleným katalogem Unity, a požádejte správce pracovního prostoru o správná oprávnění k dokončení tohoto kurzu.

Níže uvedený kód Pythonu používá vaši e-mailovou adresu k vytvoření jedinečné databáze v zadaném katalogu a jedinečného umístění úložiště v externím umístění. Spuštěním této buňky odeberete všechna data přidružená k tomuto kurzu, což vám umožní spustit tento příklad idempotentním způsobem. Třída je definována a vytvořena instance, kterou použijete k simulaci dávek dat přicházejících z připojeného systému do vašeho zdrojového externího umístění.

Zkopírujte tento kód do nové buňky v poznámkovém bloku a spusťte ho a nakonfigurujte prostředí.

Poznámka:

Proměnné definované v tomto kódu by vám měly umožnit bezpečné spuštění bez rizika konfliktu s existujícími prostředky pracovního prostoru nebo jinými uživateli. Omezená oprávnění k síti nebo úložišti způsobí chyby při provádění tohoto kódu; Požádejte správce pracovního prostoru o řešení těchto omezení.


from pyspark.sql.functions import col

# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"

spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")

spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")

# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)

# Define a class to load batches of data to source
class LoadData:

    def __init__(self, source):
        self.source = source

    def get_date(self):
        try:
            df = spark.read.format("json").load(source)
        except:
            return "2016-01-01"
        batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
        if batch_date.month == 3:
            raise Exception("Source data exhausted")
        return batch_date

    def get_batch(self, batch_date):
        return (
            spark.table("samples.nyctaxi.trips")
            .filter(col("tpep_pickup_datetime").cast("date") == batch_date)
        )

    def write_batch(self, batch):
        batch.write.format("json").mode("append").save(self.source)

    def land_batch(self):
        batch_date = self.get_date()
        batch = self.get_batch(batch_date)
        self.write_batch(batch)

RawData = LoadData(source)

Teď můžete získat dávku dat zkopírováním následujícího kódu do buňky a jeho spuštěním. Tuto buňku můžete spustit ručně až 60krát a aktivovat tak nové doručení dat.

RawData.land_batch()

Krok 4: Konfigurace automatického zavaděče pro příjem dat do katalogu Unity

Databricks doporučuje ukládat data pomocí Delta Lake. Delta Lake je opensourcová vrstva úložiště, která poskytuje transakce ACID a umožňuje datové jezero. Delta Lake je výchozí formát pro tabulky vytvořené v Databricks.

Pokud chcete nakonfigurovat automatický zavaděč pro příjem dat do tabulky Katalogu Unity, zkopírujte a vložte následující kód do prázdné buňky v poznámkovém bloku:

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .option("mergeSchema", "true")
  .toTable(table))

Další informace o automatickém zavaděči najdete v tématu Co je automatický zavaděč?.

Další informace o strukturovaném streamování pomocí katalogu Unity najdete v tématu Použití katalogu Unity se strukturovaným streamováním.

Krok 5: Zpracování dat a interakce s nimi

Poznámkové bloky spouštějí logické buňky po buňce. Pomocí těchto kroků spusťte logiku v buňce:

  1. Pokud chcete buňku, kterou jste dokončili v předchozím kroku, spustit, vyberte buňku a stiskněte SHIFT+ENTER.

  2. Pokud chcete zadat dotaz na tabulku, kterou jste právě vytvořili, zkopírujte následující kód a vložte ho do prázdné buňky a stisknutím kombinace kláves SHIFT+ENTER buňku spusťte.

    df = spark.read.table(table_name)
    
  3. Pokud chcete zobrazit náhled dat v datovém rámci, zkopírujte a vložte následující kód do prázdné buňky a potom buňku spusťte stisknutím kombinace kláves SHIFT+ENTER .

    display(df)
    

Další informace o interaktivních možnostech vizualizace dat najdete v tématu Vizualizace v poznámkových blocích Databricks.

Krok 6: Naplánování úlohy

Poznámkové bloky Databricks můžete spustit jako produkční skripty tak, že je přidáte jako úlohu do úlohy Databricks. V tomto kroku vytvoříte novou úlohu, kterou můžete aktivovat ručně.

Naplánování poznámkového bloku jako úkolu:

  1. Na pravé straně záhlaví klikněte na Plán .
  2. Zadejte jedinečný název pro název úlohy.
  3. Klikněte na Ruční.
  4. V rozevíracím seznamu Cluster vyberte cluster, který jste vytvořili v kroku 1.
  5. Klikněte na Vytvořit.
  6. V zobrazeném okně klikněte na Spustit.
  7. Pokud chcete zobrazit výsledky spuštění úlohy, klikněte na Externí odkaz ikonu vedle časového razítka posledního spuštění .

Další informace o úlohách najdete v tématu Co jsou úlohy Azure Databricks?

Krok 7: Dotazování tabulky z Databricks SQL

Každý, kdo má USE CATALOG oprávnění k aktuálnímu katalogu, USE SCHEMA oprávnění k aktuálnímu schématu a SELECT oprávnění v tabulce, může dotazovat obsah tabulky z preferovaného rozhraní Databricks API.

Ke spouštění dotazů v Databricks SQL potřebujete přístup ke spuštěné službě SQL Warehouse.

Tabulka, kterou jste vytvořili dříve v tomto kurzu, má název target_table. Můžete se na něj dotazovat pomocí katalogu, který jste zadali v první buňce a databázi s paternem e2e_lakehouse_<your-username>. Průzkumníka katalogu můžete použít k vyhledání datových objektů, které jste vytvořili.

Další integrace

Další informace o integracích a nástrojích pro přípravu dat pomocí Azure Databricks: