Ćwiczenie — integrowanie notesu w potokach usługi Azure Synapse

Ukończone

W tej lekcji utworzysz notes usługi Azure Synapse Spark do analizowania i przekształcania danych załadowanych przez przepływ danych mapowania oraz przechowywania danych w usłudze Data Lake. Utworzysz komórkę parametru, która akceptuje parametr ciągu, który definiuje nazwę folderu dla danych zapisywanych w notesie w usłudze Data Lake.

Następnie dodasz ten notes do potoku usługi Synapse i przekaż unikatowy identyfikator uruchomienia potoku do parametru notesu, aby później skorelować przebieg potoku z danymi zapisanymi przez działanie notesu.

Na koniec użyjesz centrum Monitor w programie Synapse Studio do monitorowania przebiegu potoku, uzyskania identyfikatora przebiegu, a następnie zlokalizowania odpowiednich plików przechowywanych w usłudze Data Lake.

Informacje o platformie Apache Spark i notesach

Apache Spark jest platformą przetwarzania równoległego, która obsługuje przetwarzanie w pamięci w celu zwiększania wydajności aplikacji do analizy danych big data. Platforma Apache Spark w usłudze Azure Synapse Analytics to jedna z implementacji platformy Apache Spark oferowanych przez firmę Microsoft w chmurze.

Notes platformy Apache Spark w programie Synapse Studio to interfejs internetowy umożliwiający tworzenie plików zawierających kod na żywo, wizualizacje i tekst narracji. Notesy to dobre miejsce do weryfikowania pomysłów i przeprowadzania krótkich eksperymentów w celu uzyskania szczegółowych informacji na podstawie danych. Notesy są również szeroko używane w scenariuszach przygotowywania danych, wizualizacji danych, uczenia maszynowego i innych scenariuszy danych big data.

Tworzenie notesu usługi Synapse Spark

Załóżmy, że utworzono przepływ danych mapowania w usłudze Synapse Analytics w celu przetwarzania, dołączania i importowania danych profilu użytkownika. Teraz chcesz znaleźć pięć najlepszych produktów dla każdego użytkownika, na podstawie tych, które są zarówno preferowane, jak i najlepsze, i mają najwięcej zakupów w ciągu ostatnich 12 miesięcy. Następnie chcesz obliczyć pięć najlepszych produktów.

W tym ćwiczeniu utworzysz notes usługi Synapse Spark, aby wykonać te obliczenia.

  1. Otwórz program Synapse Analytics Studio (https://web.azuresynapse.net/) i przejdź do centrum danych .

    The Data menu item is highlighted.

  2. Wybierz kartę Połączono (1), a następnie rozwiń podstawowe konto usługi Data Lake Storage (2) poniżej usługi Azure Data Lake Storage Gen2. Wybierz kontener wwi-02 (3) i otwórz folder top-products (4). Kliknij prawym przyciskiem myszy dowolny plik Parquet (5), wybierz element menu Nowy notes (6), a następnie wybierz polecenie Załaduj do ramki danych (7). Jeśli nie widzisz folderu, wybierz pozycję Refresh.

    The Parquet file and new notebook option are highlighted.

  3. Upewnij się, że notes jest dołączony do puli platformy Spark.

    The attach to Spark pool menu item is highlighted.

  4. Zastąp nazwę pliku Parquet wartością *.parquet(1), aby wybrać wszystkie pliki Parquet w folderze top-products . Na przykład ścieżka powinna być podobna do: abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top-products/*.parquet.

    The filename is highlighted.

  5. Wybierz pozycję Uruchom wszystko na pasku narzędzi notesu, aby wykonać notes.

    The cell results are displayed.

    Uwaga

    Przy pierwszym uruchomieniu notesu w puli Spark usługa Synapse tworzy nową sesję. Może to potrwać od około 3 do 5 minut.

    Uwaga

    Aby uruchomić tylko komórkę, umieść kursor na komórce i wybierz ikonę Uruchom komórkę po lewej stronie komórki lub wybierz komórkę, a następnie naciśnij klawisze Ctrl+Enter.

  6. Utwórz nową komórkę poniżej, wybierając + przycisk i wybierając element komórki Kod. Przycisk + znajduje się pod komórką notesu po lewej stronie. Alternatywnie możesz również rozwinąć menu + Komórka na pasku narzędzi Notes i wybrać element komórki Kod.

    The Add Code menu option is highlighted.

  7. Uruchom następujące polecenie w nowej komórce, aby wypełnić nową ramkę danych o nazwie topPurchases, utworzyć nowy widok tymczasowy o nazwie top_purchasesi wyświetlić pierwsze 100 wierszy:

    topPurchases = df.select(
        "UserId", "ProductId",
        "ItemsPurchasedLast12Months", "IsTopProduct",
        "IsPreferredProduct")
    
    # Populate a temporary view so we can query from SQL
    topPurchases.createOrReplaceTempView("top_purchases")
    
    topPurchases.show(100)
    

    Dane wyjściowe powinny wyglądać mniej więcej tak:

    +------+---------+--------------------------+------------+------------------+
    |UserId|ProductId|ItemsPurchasedLast12Months|IsTopProduct|IsPreferredProduct|
    +------+---------+--------------------------+------------+------------------+
    |   148|     2717|                      null|       false|              true|
    |   148|     4002|                      null|       false|              true|
    |   148|     1716|                      null|       false|              true|
    |   148|     4520|                      null|       false|              true|
    |   148|      951|                      null|       false|              true|
    |   148|     1817|                      null|       false|              true|
    |   463|     2634|                      null|       false|              true|
    |   463|     2795|                      null|       false|              true|
    |   471|     1946|                      null|       false|              true|
    |   471|     4431|                      null|       false|              true|
    |   471|      566|                      null|       false|              true|
    |   471|     2179|                      null|       false|              true|
    |   471|     3758|                      null|       false|              true|
    |   471|     2434|                      null|       false|              true|
    |   471|     1793|                      null|       false|              true|
    |   471|     1620|                      null|       false|              true|
    |   471|     1572|                      null|       false|              true|
    |   833|      957|                      null|       false|              true|
    |   833|     3140|                      null|       false|              true|
    |   833|     1087|                      null|       false|              true|
    
  8. Uruchom następujące polecenie w nowej komórce, aby utworzyć nowy widok tymczasowy przy użyciu języka SQL:

    %%sql
    
    CREATE OR REPLACE TEMPORARY VIEW top_5_products
    AS
        select UserId, ProductId, ItemsPurchasedLast12Months
        from (select *,
                    row_number() over (partition by UserId order by ItemsPurchasedLast12Months desc) as seqnum
            from top_purchases
            ) a
        where seqnum <= 5 and IsTopProduct == true and IsPreferredProduct = true
        order by a.UserId
    

    Uwaga

    Brak danych wyjściowych dla tego zapytania.

    Zapytanie używa widoku tymczasowego top_purchases jako źródła i stosuje metodę row_number() over , aby zastosować numer wiersza dla rekordów dla każdego użytkownika, gdzie ItemsPurchasedLast12Months jest największy. Klauzula where filtruje wyniki, więc pobieramy tylko do pięciu produktów, w których oba IsTopProduct i IsPreferredProduct są ustawione na wartość true. Daje to nam pięć najczęściej zakupionych produktów dla każdego użytkownika, w którym te produkty są również identyfikowane jako ulubione produkty, zgodnie z profilem użytkownika przechowywanym w usłudze Azure Cosmos DB.

  9. Uruchom następujące polecenie w nowej komórce, aby utworzyć i wyświetlić nową ramkę danych, która przechowuje wyniki widoku tymczasowego top_5_products utworzonego w poprzedniej komórce:

    top5Products = sqlContext.table("top_5_products")
    
    top5Products.show(100)
    

    Powinny zostać wyświetlone dane wyjściowe podobne do następujących, które wyświetlają pięć preferowanych produktów na użytkownika:

    The top five preferred products are displayed per user.

  10. Oblicz pięć najlepszych produktów w oparciu o te, które są preferowane przez klientów i kupowane najwięcej. W tym celu uruchom następujące polecenie w nowej komórce:

    top5ProductsOverall = (top5Products.select("ProductId","ItemsPurchasedLast12Months")
        .groupBy("ProductId")
        .agg( sum("ItemsPurchasedLast12Months").alias("Total") )
        .orderBy( col("Total").desc() )
        .limit(5))
    
    top5ProductsOverall.show()
    

    W tej komórce pogrupowaliśmy pięć preferowanych produktów według identyfikatora produktu, podsumowaliśmy łączną liczbę elementów zakupionych w ciągu ostatnich 12 miesięcy, posortowaliśmy tę wartość w kolejności malejącej i zwróciliśmy pięć pierwszych wyników. Dane wyjściowe powinny być podobne do następujących:

    +---------+-----+
    |ProductId|Total|
    +---------+-----+
    |     2107| 4538|
    |     4833| 4533|
    |      347| 4523|
    |     3459| 4233|
    |     4246| 4155|
    +---------+-----+
    

Tworzenie komórki parametru

Potoki usługi Azure Synapse wyszukują komórkę parametrów i traktują tę komórkę jako wartości domyślne dla parametrów przekazanych w czasie wykonywania. Aparat wykonywania doda nową komórkę pod komórką parameters z parametrami wejściowymi, aby zastąpić wartości domyślne. Jeśli komórka parametrów nie zostanie wyznaczona, w górnej części notesu zostanie wstawiona wstrzymywane komórki.

  1. Ten notes zostanie wykonany z potoku. Chcemy przekazać parametr, który ustawia wartość zmiennej runId , która będzie używana do nazywania pliku Parquet. Uruchom następujące polecenie w nowej komórce:

    import uuid
    
    # Generate random GUID
    runId = uuid.uuid4()
    

    Używamy biblioteki dostarczanej uuid z platformą Spark w celu wygenerowania losowego identyfikatora GUID. Chcemy zastąpić runId zmienną parametrem przekazywanym przez potok. Aby to zrobić, musimy przełączyć to jako komórkę parametru.

  2. Wybierz wielokropek akcji (...) w prawym górnym rogu komórki (1), a następnie wybierz pozycję Przełącz komórkę parametru (2).

    The menu item is highlighted.

    Po przełączeniu tej opcji zobaczysz tag Parametry w komórce.

    The cell is configured to accept parameters.

  3. Wklej następujący kod w nowej komórce, aby użyć runId zmiennej jako nazwy pliku Parquet w ścieżce na /top5-products/ koncie podstawowego magazynu data lake. Zastąp YOUR_DATALAKE_NAME ciąg w ścieżce nazwą podstawowego konta usługi Data Lake. Aby to znaleźć, przewiń w górę do komórki 1 w górnej części strony (1). Skopiuj konto magazynu data lake ze ścieżki (2). Wklej tę wartość jako zamiennik YOUR_DATALAKE_NAME w ścieżce (3) wewnątrz nowej komórki, a następnie uruchom polecenie w komórce.

    %%pyspark
    
    top5ProductsOverall.write.parquet('abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top5-products/' + str(runId) + '.parquet')
    

    The path is updated with the name of the primary data lake account.

  4. Sprawdź, czy plik został zapisany w usłudze Data Lake. Przejdź do centrum danych i wybierz kartę Połączone (1). Rozwiń podstawowe konto magazynu typu data lake, a następnie wybierz kontener wwi-02 (2). Przejdź do folderu top5-products (3). Powinien zostać wyświetlony folder dla pliku Parquet w katalogu z identyfikatorem GUID jako nazwą pliku (4).

    The parquet file is highlighted.

    Metoda zapisu Parquet w ramce danych w komórce notesu utworzyła ten katalog, ponieważ wcześniej nie istniała.

Dodawanie notesu do potoku usługi Synapse

Nawiązując do Przepływ danych mapowania opisanego na początku ćwiczenia, załóżmy, że chcesz wykonać ten notes po uruchomieniu Przepływ danych w ramach procesu aranżacji. W tym celu należy dodać ten notes do potoku jako nowe działanie notesu.

  1. Wróć do notesu. Wybierz pozycję Właściwości (1) w prawym górnym rogu notesu, a następnie wprowadź Calculate Top 5 Products nazwę (2).

    The properties blade is displayed.

  2. Wybierz pozycję Dodaj do potoku (1) w prawym górnym rogu notesu, a następnie wybierz pozycję Istniejący potok (2).

    The add to pipeline button is highlighted.

  3. Wybierz potok Zapisywania danych profilu użytkownika w potoku USŁUGI ASA (1),a następnie wybierz pozycję Dodaj *(2).

    The pipeline is selected.

  4. Program Synapse Studio dodaje działanie Notes do potoku. Zmień kolejność działania Notes, tak aby znajduje się po prawej stronie działania Przepływu danych. Wybierz działanie Przepływ danych i przeciągnij zielone pole połączenia potoku działania Powodzenie do działania Notes.

    The green arrow is highlighted.

    Strzałka działania Powodzenie instruuje potok, aby uruchamiał działanie notesu po pomyślnym uruchomieniu działania przepływu danych.

  5. Wybierz działanie Notes (1), a następnie wybierz kartę Ustawienia (2), rozwiń węzeł Parametry podstawowe (3), a następnie wybierz pozycję + Nowy (4). Wprowadź runId w polu Nazwa (5). Wybierz pozycję Ciąg dla typu (6). W polu Wartość wybierz pozycję Dodaj zawartość dynamiczną (7).

    The settings are displayed.

  6. Wybierz pozycję Identyfikator uruchomienia potoku w obszarze Zmienne systemowe (1). Spowoduje to dodanie @pipeline().RunId do pola zawartości dynamicznej (2). Wybierz przycisk Zakończ (3), aby zamknąć okno dialogowe.

    The dynamic content form is displayed.

    Wartość Identyfikator przebiegu potoku jest unikatowym identyfikatorem GUID przypisanym do każdego uruchomienia potoku. Użyjemy tej wartości jako nazwy pliku Parquet, przekazując tę wartość jako parametr notesu runId . Następnie możemy przejrzeć historię uruchamiania potoku i znaleźć określony plik Parquet utworzony dla każdego uruchomienia potoku.

  7. Wybierz pozycję Opublikuj wszystko , a następnie pozycję Publikuj , aby zapisać zmiany.

    Publish all is highlighted.

  8. Po zakończeniu publikowania wybierz pozycję Dodaj wyzwalacz (1), a następnie pozycję Wyzwól teraz (2), aby uruchomić zaktualizowany potok.

    The trigger menu item is highlighted.

  9. Wybierz przycisk OK , aby uruchomić wyzwalacz.

    The OK button is highlighted.

Monitorowanie działania potoku

Centrum Monitor umożliwia monitorowanie bieżących i historycznych działań dla usług SQL, Apache Spark i Pipelines.

  1. Przejdź do centrum Monitor .

    The Monitor hub menu item is selected.

  2. Wybierz pozycję Uruchomienia potoku (1)i poczekaj na pomyślne ukończenie przebiegu potoku (2). Może być konieczne odświeżenie widoku (3).

    The pipeline run succeeded.

  3. Wybierz nazwę potoku, aby wyświetlić uruchomienia działania potoku.

    The pipeline name is selected.

  4. Zwróć uwagę zarówno na działanie Przepływu danych, jak i nowe działanie notesu (1).. Zanotuj wartość identyfikatora przebiegu potoku (2). Porównamy to z nazwą pliku Parquet wygenerowaną przez notes. Wybierz nazwę Calculate Top 5 Products Notebook (Oblicz 5 pierwszych produktów), aby wyświetlić jego szczegóły (3).

    The pipeline run details are displayed.

  5. W tym miejscu zobaczymy szczegóły uruchomienia notesu. Możesz wybrać pozycję Odtwarzanie (1), aby obejrzeć odtwarzanie postępu w ramach zadań (2). W dolnej części można wyświetlić opcje diagnostyki i dzienników z różnymi opcjami filtrowania (3). Po prawej stronie możemy wyświetlić szczegóły przebiegu, takie jak czas trwania, identyfikator usługi Livy, szczegóły puli platformy Spark itd. Wybierz link Wyświetl szczegóły zadania, aby wyświetlić jego szczegóły (5).

    The run details are displayed.

  6. Interfejs użytkownika aplikacji platformy Spark zostanie otwarty na nowej karcie, na której można wyświetlić szczegóły etapu. Rozwiń wizualizację języka DAG, aby wyświetlić szczegóły etapu.

    The Spark stage details are displayed.

  7. Wróć do centrum danych .

    Data hub.

  8. Wybierz kartę Połączono (1), a następnie wybierz kontener wwi-02 (2) na podstawowym koncie magazynu typu data lake, przejdź do folderu top5-products (3) i sprawdź, czy istnieje folder dla pliku Parquet, którego nazwa jest zgodna z identyfikatorem uruchomienia potoku.

    The file is highlighted.

    Jak widać, mamy plik, którego nazwa jest zgodna z identyfikatorem uruchomienia potoku, który wcześniej zanotowaliśmy:

    The Pipeline run ID is highlighted.

    Te wartości są zgodne, ponieważ przekazano identyfikator uruchomienia potoku do parametru runId działania Notes.