Übung: Integrieren eines Notebooks mit Azure Synapse-Pipelines

Abgeschlossen

In dieser Lerneinheit erstellen Sie ein Azure Synapse Spark-Notebook, um von einem Zuordnungsdatenfluss geladene Daten zu analysieren und zu transformieren. Anschließend speichern Sie die Daten in einem Data Lake. Sie erstellen eine Parameterzelle, die einen Zeichenfolgenparameter akzeptiert, der den Ordnernamen für die Daten definiert, die das Notebook in den Data Lake schreibt.

Dann fügen Sie dieses Notebook einer Synapse-Pipeline hinzu und übergeben die eindeutige Ausführungs-ID der Pipeline an den Notebookparameter, damit Sie die Pipelineausführung später mit den von der Aktivität „Notebook“ gespeicherten Daten korrelieren können.

Schließlich verwenden Sie den Hub Überwachung in Synapse Studio, um die Pipelineausführung zu überwachen, die Ausführungs-ID abzurufen und dann die entsprechenden Dateien zu suchen, die im Data Lake gespeichert sind.

Informationen zu Apache Spark und Notebooks

Apache Spark ist ein Framework für die Parallelverarbeitung, das In-Memory-Verarbeitung unterstützt, um die Leistung von Big Data-Analyseanwendungen zu steigern. Apache Spark in Azure Synapse Analytics ist eine der cloudbasierten Apache Spark-Implementierungen von Microsoft.

Ein Apache Spark-Notebook in Synapse Studio ist eine Webschnittstelle, mit der Sie Dateien erstellen können, die Livecode, Visualisierungen und narrativen Text enthalten. Notebooks sind ein guter Ausgangspunkt, um Ideen zu überprüfen und schnelle Experimente zu verwenden, um Erkenntnisse aus Ihren Daten zu gewinnen. Notebooks werden auch häufig bei der Datenvorbereitung, Datenvisualisierung, Machine Learning und andere Big Data-Szenarien verwendet.

Erstellen eines Synapse Spark-Notebooks

Angenommen, Sie haben einen Zuordnungsdatenfluss in Synapse Analytics erstellt, um Benutzerprofildaten zu verarbeiten, zu verknüpfen und zu importieren. Nun möchten Sie für die letzten zwölf Monate die fünf bevorzugten, beliebtesten und meistgekauften Produkte aller Benutzer*innen finden. Anschließend möchten Sie die fünf beliebtesten Produkte insgesamt berechnen.

In dieser Übung erstellen Sie ein Synapse Spark-Notebook, um diese Berechnungen anzustellen.

  1. Öffnen Synapse Analytics Studio (https://web.azuresynapse.net/), und navigieren Sie zum Hub https://web.azuresynapse.net/.

    Das Menüelement „Daten“ ist hervorgehoben.

  2. Wählen Sie die Registerkarte Verknüpft(1) aus, und erweitern Sie das primäre Data Lake-Speicherkonto (2) unterhalb von Azure Data Lake Storage Gen2. Wählen Sie den Container wwi-02(3) aus, und öffnen Sie den Ordner top-products(4). Klicken Sie mit der rechten Maustaste auf eine beliebige Parquet-Datei (5), und wählen Sie das Menüelement Neues Notebook(6) und dann In Datenframe laden (7) aus. Wenn der Ordner nicht angezeigt wird, klicken Sie auf Refresh.

    Die Parquet-Datei und die Option „Neues Notebook“ sind hervorgehoben.

  3. Vergewissern Sie sich, dass das Notebook an Ihren Spark-Pool angefügt ist.

    Das Menüelement „An Spark-Pool anfügen“ ist hervorgehoben.

  4. Ersetzen Sie den Parquet-Dateinamen durch *.parquet (1), um alle Parquet-Dateien im Ordner top-products auszuwählen. Der Pfad sollte z. B. in etwa wie der folgende lauten: abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top-products/*.parquet.

    Der Dateiname ist hervorgehoben.

  5. Wählen Sie auf der Symbolleiste des Notebooks Alle ausführen aus, um das Notebook auszuführen.

    Die Zellenergebnisse werden angezeigt.

    Hinweis

    Wenn Sie ein Notebook zum ersten Mal in einem Spark-Pool ausführen, erstellt Synapse eine neue Sitzung. Dieser Vorgang kann etwa drei bis fünf Minuten dauern.

    Hinweis

    Wenn Sie nur die Zelle ausführen möchten, zeigen Sie auf diese, und klicken Sie links neben der Zelle auf das Symbol Zelle ausführen, oder wählen Sie die Zelle aus, und drücken Sie STRG+EINGABETASTE.

  6. Erstellen Sie darunter eine neue Zelle, indem Sie auf die Schaltfläche + und dann auf das Element + (Codezelle) klicken. Die Schaltfläche + befindet sich unter der Notebookzelle auf der linken Seite. Alternativ können Sie auch das Menü + Zelle in der Notebooksymbolleiste erweitern und das Element Codezelle auswählen.

    Die Menüoption „Code hinzufügen“ ist hervorgehoben.

  7. Führen Sie den folgenden Befehl in der neuen Zelle aus, um einen neuen Dataframe namens topPurchases aufzufüllen, eine neue temporäre Ansicht namens top_purchases zu erstellen, und die ersten 100 Zeilen anzuzeigen:

    topPurchases = df.select(
        "UserId", "ProductId",
        "ItemsPurchasedLast12Months", "IsTopProduct",
        "IsPreferredProduct")
    
    # Populate a temporary view so we can query from SQL
    topPurchases.createOrReplaceTempView("top_purchases")
    
    topPurchases.show(100)
    

    Die Ausgabe sollte in etwa wie folgt aussehen:

    +------+---------+--------------------------+------------+------------------+
    |UserId|ProductId|ItemsPurchasedLast12Months|IsTopProduct|IsPreferredProduct|
    +------+---------+--------------------------+------------+------------------+
    |   148|     2717|                      null|       false|              true|
    |   148|     4002|                      null|       false|              true|
    |   148|     1716|                      null|       false|              true|
    |   148|     4520|                      null|       false|              true|
    |   148|      951|                      null|       false|              true|
    |   148|     1817|                      null|       false|              true|
    |   463|     2634|                      null|       false|              true|
    |   463|     2795|                      null|       false|              true|
    |   471|     1946|                      null|       false|              true|
    |   471|     4431|                      null|       false|              true|
    |   471|      566|                      null|       false|              true|
    |   471|     2179|                      null|       false|              true|
    |   471|     3758|                      null|       false|              true|
    |   471|     2434|                      null|       false|              true|
    |   471|     1793|                      null|       false|              true|
    |   471|     1620|                      null|       false|              true|
    |   471|     1572|                      null|       false|              true|
    |   833|      957|                      null|       false|              true|
    |   833|     3140|                      null|       false|              true|
    |   833|     1087|                      null|       false|              true|
    
  8. Führen Sie den folgenden Befehl in einer neuen Zelle aus, um mithilfe von SQL eine neue temporäre Ansicht zu erstellen:

    %%sql
    
    CREATE OR REPLACE TEMPORARY VIEW top_5_products
    AS
        select UserId, ProductId, ItemsPurchasedLast12Months
        from (select *,
                    row_number() over (partition by UserId order by ItemsPurchasedLast12Months desc) as seqnum
            from top_purchases
            ) a
        where seqnum <= 5 and IsTopProduct == true and IsPreferredProduct = true
        order by a.UserId
    

    Hinweis

    Für diese Abfrage gibt es keine Ausgabe.

    Die Abfrage verwendet die temporäre Ansicht top_purchases als Quelle und wendet eine row_number() over-Methode an, um eine Zeilennummer für die Datensätze der einzelnen Benutzer anzuwenden, wobei ItemsPurchasedLast12Months am größten ist. Die where-Klausel filtert die Ergebnisse, sodass wir nur bis zu fünf Produkte abrufen, bei denen sowohl IsTopProduct als auch IsPreferredProduct auf „true“ festgelegt sind. Dadurch erhalten wir die fünf meistgekauften Produkte für jeden Benutzer, bei denen diese Produkte auch entsprechend ihrem Benutzerprofil, das in Azure Cosmos DB gespeichert ist, als ihre bevorzugten Produkte identifiziert werden.

  9. Führen Sie den folgenden Befehl in einer neuen Zelle aus, um einen neuen Dataframe zu erstellen und anzuzeigen, in dem die Ergebnisse der temporären Ansicht top_5_products gespeichert werden, die Sie in der vorherigen Zelle erstellt haben:

    top5Products = sqlContext.table("top_5_products")
    
    top5Products.show(100)
    

    Es sollte eine Ausgabe ähnlich der folgenden angezeigt werden, in der die fünf bevorzugten Produkte pro Benutzer angezeigt werden:

    Die fünf bevorzugten Produkte werden pro Benutzer angezeigt.

  10. Berechnen Sie die fünf wichtigsten Produkte insgesamt, basierend auf den Produkten, die sowohl von Kunden bevorzugt, als auch am meisten gekauft wurden. Führen Sie hierzu den folgenden Befehl in einer neuen Zelle aus:

    top5ProductsOverall = (top5Products.select("ProductId","ItemsPurchasedLast12Months")
        .groupBy("ProductId")
        .agg( sum("ItemsPurchasedLast12Months").alias("Total") )
        .orderBy( col("Total").desc() )
        .limit(5))
    
    top5ProductsOverall.show()
    

    In dieser Zelle haben wir die fünf bevorzugten Produkte nach Produkt-ID geordnet, die Gesamtanzahl der in den letzten 12 Monaten gekauften Artikel addiert, diesen Wert in absteigender Reihenfolge sortiert, und die ersten fünf Ergebnisse zurückgegeben. Die Ausgabe sollte in etwa wie folgt aussehen:

    +---------+-----+
    |ProductId|Total|
    +---------+-----+
    |     2107| 4538|
    |     4833| 4533|
    |      347| 4523|
    |     3459| 4233|
    |     4246| 4155|
    +---------+-----+
    

Erstellen einer Parameterzelle

Azure Synapse-Pipelines suchen nach der Parameterzelle und behandeln diese Zelle als Standard für die Parameter, die zur Ausführungszeit übergeben werden. Die Ausführungs-Engine fügt eine neue Zelle mit Eingabeparametern unter der Parameterzelle hinzu, um die Standardwerte zu überschreiben. Wenn keine Parameterzelle angegeben ist, wird die Zelle ganz oben im Notebook eingefügt.

  1. Wir führen dieses Notebook aus einer Pipeline aus. Wir möchten einen Parameter übergeben, der einen runId-Variablenwert festlegt, der zum Benennen der Parquet-Datei verwendet wird. Führen Sie den folgenden Befehl in einer neuen Zelle aus:

    import uuid
    
    # Generate random GUID
    runId = uuid.uuid4()
    

    Wir verwenden die in Spark integrierte uuid-Bibliothek, um eine zufällige GUID zu generieren. Wir möchten die Variable runId mit einem Parameter überschreiben, der von der Pipeline übergeben wird. Dazu müssen wir diese Zelle als Parameterzelle umschalten.

  2. Wählen Sie oben rechts in der Zelle (1) auf die Auslassungspunkte (...) und dann auf Toggle parameter cell (2) (Parameterzelle umschalten).

    Die Menüoption ist hervorgehoben.

    Nachdem Sie diese Option umgeschaltet haben, ist das Tag Parameters an die Zelle angefügt.

    Die Zelle ist so konfiguriert, dass sie Parameter akzeptiert.

  3. Fügen Sie den folgenden Code in eine neue Zelle ein, um die Variable runId als Parquet-Dateinamen im Pfad /top5-products/ im primären Data Lake-Konto zu verwenden. Ersetzen Sie YOUR_DATALAKE_NAME im Pfad mit dem Namen Ihres primären Data Lake-Kontos. Diesen finden Sie oben auf der Seite in Zelle 1(1). Kopieren Sie das Data Lake-Speicherkonto aus dem Pfad (2). Fügen Sie diesen Wert als Ersatz für YOUR_DATALAKE_NAME in den Pfad YOUR_DATALAKE_NAME in die neue Zelle ein, und führen Sie den Befehl in der Zelle aus.

    %%pyspark
    
    top5ProductsOverall.write.parquet('abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top5-products/' + str(runId) + '.parquet')
    

    Der Pfad wird mit dem Namen des primären Data Lake-Kontos aktualisiert.

  4. Überprüfen Sie, ob die Datei in den Data Lake geschrieben wurde. Navigieren Sie zum Hub Daten, und wählen Sie die Registerkarte Verknüpft(1) aus. Erweitern Sie das primäre Data-Lake-Speicherkonto, und wählen Sie den Container wwi-02(2) aus. Navigieren Sie zum Ordner top5-products(3). Im Verzeichnis sollte ein Ordner für die Parquet-Datei mit einer GUID als Dateiname (4) angezeigt werden.

    Die Parquet-Datei ist hervorgehoben.

    Die Parquet-Schreibmethode für den Dataframe in der Notebookzelle hat dieses Verzeichnis erstellt, da es noch nicht vorhanden war.

Hinzufügen des Notebooks zu einer Synapse-Pipeline

Wir kommen noch einmal zurück auf den Zuordnungsdatenfluss vom Beginn der Übung: Nehmen wir an, dass Sie dieses Notebook ausführen möchten, nachdem der Datenfluss im Rahmen Ihres Orchestrierungsprozesses ausgeführt wurde. Dazu fügen Sie dieses Notebook einer Pipeline als neue Notebook-Aktivität hinzu.

  1. Kehren Sie zum Notebook zurück. Klicken Sie oben rechts im Notebook auf Eigenschaften (1), und geben Sie dann Calculate Top 5 Products bei Name (2) ein.

    Das Blatt „Eigenschaften“ wird angezeigt.

  2. Klicken Sie oben rechts im Notebook auf Add to pipeline (Zu Pipeline hinzufügen) (1) und dann auf Existing pipeline (2) (Vorhandene Pipeline).

    Die Schaltfläche „Zur Pipeline hinzufügen“ ist hervorgehoben.

  3. Wählen Sie die Pipeline Write User Profile Data to ASA (Benutzerprofildaten in ASA schreiben) (1) aus, und klicken Sie auf Hinzufügen *(2).

    Die Pipeline ist ausgewählt.

  4. Synapse Studio fügt der Pipeline die Notebook-Aktivität hinzu. Ordnen Sie die Notebook-Aktivität neu an, sodass sie sich rechts neben der Datenflussaktivität befindet. Wählen Sie die Aktivität „Datenfluss“ aus, und ziehen Sie für die Pipelineverbindung ein grünes Feld der Aktivität Erfolg zur Aktivität „Notebook“.

    Der grüne Pfeil ist hervorgehoben.

    Der Pfeil der Aktivität „Erfolg“ weist die Pipeline an, die Aktivität „Notebook“ auszuführen, nachdem die Aktivität „Datenfluss“ erfolgreich ausgeführt wurde.

  5. Wählen Sie die Aktivität „Notebook“ (1) und dann die Registerkarte Einstellungen(2) aus, erweitern Sie die Basisparameter (3), und klicken Sie auf + Neu (4). Geben runId Sie in das Feld runId(5) ein. Wählen Sie für Typ (6) die Option Zeichenfolge aus. Wählen Sie für den Wert die Option Dynamischen Inhalt hinzufügen (7) aus.

    Die Einstellungen werden angezeigt.

  6. Wählen Sie unter Systemvariablen (1) die Option Pipelineausführungs-ID aus. Dadurch wird @pipeline().RunId dem dynamischen Inhaltsfeld @pipeline().RunId hinzugefügt. Klicken Sie auf Fertig stellen (3), um das Dialogfeld zu schließen.

    Das Formular für dynamischen Inhalt wird angezeigt.

    Der Wert „Pipelineausführungs-ID“ ist eine eindeutige GUID, die jeder Pipelineausführung zugewiesen wird. Wir verwenden diesen Wert für den Namen der Parquet-Datei, indem wir diesen Wert als runId Notebook-Parameter übergeben. Wir können dann den Ausführungsverlauf der Pipeline durchsuchen, und die spezifische Parquet-Datei finden, die für die jeweilige Pipelineausführung erstellt wird.

  7. Wählen Sie Alle veröffentlichen und dann Veröffentlichen aus, um Ihre Änderungen zu speichern.

    „Alle veröffentlichen“ ist hervorgehoben.

  8. Wählen Sie nach Abschluss der Veröffentlichung Trigger hinzufügen (1) und dann Jetzt auslösen (2) aus, um die aktualisierte Pipeline auszuführen.

    Das Menüelement für Trigger ist hervorgehoben.

  9. Wählen Sie OK aus, um den Trigger auszuführen.

    Die Schaltfläche „OK“ ist hervorgehoben.

Überwachen der Pipelineausführung

Mit dem Hub Überwachung können Sie aktuelle und vergangene Aktivitäten für SQL, Apache Spark und Pipelines überwachen.

  1. Wechseln Sie zum Hub Überwachen.

    Das Menüelement „Überwachen“ ist ausgewählt.

  2. Klicken Sie auf Pipelineausführungen (1), und warten Sie, bis die Pipeline erfolgreich ausgeführt wurde (2). Möglicherweise müssen Sie die Ansicht aktualisieren (3).

    Die Pipelineausführung war erfolgreich.

  3. Wählen Sie den Namen der Pipeline aus, um die Aktivitätsausführungen der Pipeline anzuzeigen.

    Der Pipelinename ist ausgewählt.

  4. Beachten Sie sowohl die Aktivität Datenfluss als auch die neue Aktivität Notebook(1). Notieren Sie sich den Wert der Pipelineausführungs-ID(2). Diesen vergleichen wir mit dem Vom Notebook generierten Parquet-Dateinamen. Wählen Sie den Notebooknamen Calculate Top 5 Products (Die 5 wichtigsten Produkte berechnen) aus, um dessen Details anzuzeigen (3).

    Die Details zur Pipelineausführung werden angezeigt.

  5. Hier sehen wir die Details zur Notebookausführung. Sie können auf Wiedergabe (1) klicken, um den Fortschritts für Aufträge (2) wiederzugeben. Unten können Sie die Diagnose und Protokolle mit unterschiedlichen Filteroptionen (3) anzeigen. Auf der rechten Seite können wir die Ausführungsdetails wie Dauer, Livy-ID, Details zum Spark-Pool usw. anzeigen. Klicken Sie für einen Auftrag auf den Link Details anzeigen, um die zugehörigen Details (5) anzuzeigen.

    Die Ausführungsdetails werden angezeigt.

  6. Die Benutzeroberfläche der Spark-Anwendung wird in einer neuen Registerkarte geöffnet, auf der die Phasendetails angezeigt werden. Erweitern Sie die DAG-Visualisierung, um die Phasendetails anzuzeigen.

    Die Details zur Spark-Phase werden angezeigt.

  7. Kehren Sie zum Hub Daten zurück.

    Der Hub „Daten“.

  8. Wählen Sie die Registerkarte Verknüpft(1) und dann den Container wwi-02(2) im primären Data-Lake-Speicherkonto aus. Navigieren Sie zum Ordner top5-products(3), und überprüfen Sie, ob ein Ordner für die Parquet-Datei vorhanden ist, deren Name der Pipelineausführungs-ID entspricht.

    Die Datei ist hervorgehoben.

    Wie Sie sehen können, haben wir eine Datei, deren Name der zuvor notierten Pipelineausführungs-ID entspricht:

    Die ID der Pipelineausführung ist hervorgehoben.

    Diese Werte stimmen überein, da wir die Pipelineausführungs-ID an den runId Parameter für die Notebook-Aktivität übergeben haben.