Wywoływanie programów Spark z potoków Azure Data Factory

Uwaga

Ten artykuł dotyczy wersji 1 usługi Azure Data Factory, która jest ogólnie dostępna. Jeśli używasz bieżącej wersji usługi Data Factory, zobacz Przekształcanie danych przy użyciu działania platformy Apache Spark w usłudze Data Factory.

Wprowadzenie

Działanie platformy Spark to jedno z działań przekształcania danych obsługiwanych przez usługę Data Factory. To działanie uruchamia określony program Spark w klastrze Spark w usłudze Azure HDInsight.

Ważne

  • Działanie platformy Spark nie obsługuje klastrów spark usługi HDInsight, które używają usługi Azure Data Lake Store jako magazynu podstawowego.
  • Działanie platformy Spark obsługuje tylko istniejące (własne) klastry platformy Spark w usłudze HDInsight. Nie obsługuje ona połączonej usługi HDInsight na żądanie.

Przewodnik: tworzenie potoku za pomocą działania platformy Spark

Poniżej przedstawiono typowe kroki tworzenia potoku fabryki danych przy użyciu działania platformy Spark:

  • Tworzenie fabryki danych.
  • Utwórz połączoną usługę Azure Storage, aby połączyć magazyn skojarzony z klastrem Spark usługi HDInsight z fabryką danych.
  • Utwórz połączoną usługę HDInsight, aby połączyć klaster Spark w usłudze HDInsight z fabryką danych.
  • Utwórz zestaw danych odwołujący się do połączonej usługi Storage. Obecnie należy określić wyjściowy zestaw danych dla działania, nawet jeśli nie ma generowanych danych wyjściowych.
  • Utwórz potok za pomocą działania platformy Spark, który odwołuje się do utworzonej połączonej usługi HDInsight. Działanie jest konfigurowane przy użyciu zestawu danych utworzonego w poprzednim kroku jako wyjściowego zestawu danych. Wyjściowy zestaw danych określa harmonogram (co godzinę, codziennie). W związku z tym należy określić wyjściowy zestaw danych, mimo że działanie naprawdę nie generuje danych wyjściowych.

Wymagania wstępne

  1. Utwórz konto magazynu ogólnego przeznaczenia, postępując zgodnie z instrukcjami w temacie Tworzenie konta magazynu.

  2. Utwórz klaster Spark w usłudze HDInsight, postępując zgodnie z instrukcjami w samouczku Tworzenie klastra Spark w usłudze HDInsight. Skojarz konto magazynu utworzone w kroku 1 z tym klastrem.

  3. Pobierz i przejrzyj plik skryptu języka Python test.py znajdujący się w lokalizacji https://adftutorialfiles.blob.core.windows.net/sparktutorial/test.py.

  4. Przekaż test.py do folderu pyFiles w kontenerze adfspark w magazynie obiektów blob. Utwórz kontener i folder, jeśli nie istnieją.

Tworzenie fabryki danych

Aby utworzyć fabrykę danych, wykonaj następujące kroki:

  1. Zaloguj się w witrynie Azure Portal.

  2. Wybierz pozycję Nowe>dane i analiza>Data Factory.

  3. W bloku Nowa fabryka danych w obszarze Nazwa wprowadź wartość SparkDF.

    Ważne

    Nazwa fabryki danych Azure musi być globalnie unikatowa. Jeśli zostanie wyświetlony błąd "Nazwa fabryki danych SparkDF jest niedostępna", zmień nazwę fabryki danych. Na przykład użyj nazwy yournameSparkDFdate i ponownie utwórz fabrykę danych. Aby uzyskać więcej informacji na temat reguł nazewnictwa, zobacz Data Factory: Naming rules (Fabryka danych: reguły nazewnictwa).

  4. W obszarze Subskrypcja wybierz subskrypcję platformy Azure, w ramach której chcesz utworzyć fabrykę danych.

  5. Wybierz istniejącą grupę zasobów lub utwórz grupę zasobów platformy Azure.

  6. Zaznacz pole wyboru Przypnij do pulpitu nawigacyjnego.

  7. Wybierz przycisk Utwórz.

    Ważne

    Aby utworzyć wystąpienia usługi Data Factory, musisz być członkiem roli współautora usługi Data Factory na poziomie subskrypcji/grupy zasobów.

  8. Na pulpicie nawigacyjnym Azure Portal zostanie wyświetlona fabryka danych.

  9. Po utworzeniu fabryki danych zostanie wyświetlona strona Fabryka danych z zawartością fabryki danych. Jeśli nie widzisz strony Fabryka danych , wybierz kafelek fabryki danych na pulpicie nawigacyjnym.

    Blok Fabryka danych

Tworzenie połączonych usług

W tym kroku utworzysz dwie połączone usługi. Jedna usługa łączy klaster Spark z fabryką danych, a druga usługa łączy magazyn z fabryką danych.

Tworzenie połączonej usługi Storage

W tym kroku opisano łączenie konta magazynu z fabryką danych. Zestaw danych utworzony w kroku w dalszej części tego przewodnika odwołuje się do tej połączonej usługi. Połączona usługa HDInsight zdefiniowana w następnym kroku odnosi się również do tej połączonej usługi.

  1. W bloku Fabryka danych wybierz pozycję Utwórz i wdróż. Zostanie wyświetlony Edytor fabryki danych.

  2. Wybierz pozycję Nowy magazyn danych, a następnie opcję Azure Storage.

    Nowy magazyn danych

  3. Skrypt JSON używany do tworzenia połączonej usługi Storage jest wyświetlany w edytorze.

    AzureStorageLinkedService

  4. Zastąp wartości nazwa konta i klucz konta nazwą i kluczem dostępu konta magazynu. Aby dowiedzieć się, jak uzyskać klucz dostępu do magazynu, zobacz Zarządzanie kluczami dostępu do konta magazynu.

  5. Aby wdrożyć połączoną usługę, wybierz pozycję Wdróż na pasku poleceń. Po pomyślnym wdrożeniu połączonej usługi okno Wersja robocza-1 zniknie. W widoku drzewa po lewej stronie pojawi się wartość AzureStorageLinkedService.

Tworzenie połączonej usługi HDInsight

W tym kroku utworzysz połączoną usługę HDInsight, aby połączyć klaster HDInsight Spark z fabryką danych. Klaster usługi HDInsight służy do uruchamiania programu Spark określonego w działaniu Spark potoku w tym przykładzie.

  1. W Edytorze fabryki danych wybierz pozycję Więcej>nowego klastra obliczeniowego usługi>HDInsight.

    Tworzenie połączonej usługi HDInsight

  2. Skopiuj i wklej poniższy fragment kodu do okna Wersja robocza-1. W edytorze JSON wykonaj następujące czynności:

    1. Określ identyfikator URI klastra Spark usługi HDInsight. Na przykład: https://<sparkclustername>.azurehdinsight.net/.

    2. Określ nazwę użytkownika, który ma dostęp do klastra Spark.

    3. Podaj hasło użytkownika.

    4. Określ połączoną usługę Storage skojarzona z klastrem SPARK usługi HDInsight. W tym przykładzie jest to AzureStorageLinkedService.

    {
        "name": "HDInsightLinkedService",
        "properties": {
            "type": "HDInsight",
            "typeProperties": {
                "clusterUri": "https://<sparkclustername>.azurehdinsight.net/",
                "userName": "admin",
                "password": "**********",
                "linkedServiceName": "AzureStorageLinkedService"
            }
        }
    }
    

    Ważne

    • Działanie platformy Spark nie obsługuje klastrów spark usługi HDInsight, które używają usługi Azure Data Lake Store jako magazynu podstawowego.
    • Działanie platformy Spark obsługuje tylko istniejące (własne) klastry platformy Spark w usłudze HDInsight. Nie obsługuje ona połączonej usługi HDInsight na żądanie.

    Aby uzyskać więcej informacji na temat połączonej usługi HDInsight, zobacz Połączona usługa HDInsight.

  3. Aby wdrożyć połączoną usługę, wybierz pozycję Wdróż na pasku poleceń.

Tworzenie wyjściowego zestawu danych

Wyjściowy zestaw danych określa harmonogram (co godzinę, codziennie). W związku z tym należy określić wyjściowy zestaw danych dla działania platformy Spark w potoku, mimo że działanie nie generuje żadnych danych wyjściowych. Określanie wejściowego zestawu danych dla działania jest opcjonalne.

  1. W edytorze fabryki danych wybierz pozycję Więcej>nowego zestawu danych>usługi Azure Blob Storage.

  2. Skopiuj i wklej poniższy fragment kodu do okna Wersja robocza-1. Fragment kodu JSON definiuje zestaw danych o nazwie OutputDataset. Ponadto należy określić, że wyniki są przechowywane w kontenerze obiektów blob o nazwie adfspark i folderze o nazwie pyFiles/output. Jak wspomniano wcześniej, ten zestaw danych jest fikcyjnym zestawem danych. Program Spark w tym przykładzie nie generuje żadnych danych wyjściowych. Sekcja dostępności określa, że wyjściowy zestaw danych jest generowany codziennie.

    {
        "name": "OutputDataset",
        "properties": {
            "type": "AzureBlob",
            "linkedServiceName": "AzureStorageLinkedService",
            "typeProperties": {
                "fileName": "sparkoutput.txt",
                "folderPath": "adfspark/pyFiles/output",
                "format": {
                    "type": "TextFormat",
                    "columnDelimiter": "\t"
                }
            },
            "availability": {
                "frequency": "Day",
                "interval": 1
            }
        }
    }
    
  3. Aby wdrożyć zestaw danych, wybierz pozycję Wdróż na pasku poleceń.

Tworzenie potoku

W tym kroku utworzysz potok z działaniem HDInsightSpark. W tym przypadku wyjściowy zestaw danych jest elementem wpływającym na ustawienia harmonogramu, więc musisz utworzyć wyjściowy zestaw danych nawet wtedy, gdy działanie nie generuje żadnych danych wyjściowych. Jeśli w działaniu nie są używane żadne dane wejściowe, możesz pominąć tworzenie zestawu danych wejściowych. W związku z tym w tym przykładzie nie określono żadnego wejściowego zestawu danych.

  1. W Edytorze fabryki danych wybierz pozycję Więcej>nowego potoku.

  2. Zastąp skrypt w oknie Draft-1 następującym skryptem:

    {
        "name": "SparkPipeline",
        "properties": {
            "activities": [
                {
                    "type": "HDInsightSpark",
                    "typeProperties": {
                        "rootPath": "adfspark\\pyFiles",
                        "entryFilePath": "test.py",
                        "getDebugInfo": "Always"
                    },
                    "outputs": [
                        {
                            "name": "OutputDataset"
                        }
                    ],
                    "name": "MySparkActivity",
                    "linkedServiceName": "HDInsightLinkedService"
                }
            ],
            "start": "2017-02-05T00:00:00Z",
            "end": "2017-02-06T00:00:00Z"
        }
    }
    

    Pamiętaj o następujących kwestiach:

    1. Właściwość type jest ustawiona na HDInsightSpark.

    2. Właściwość rootPath jest ustawiona na adfspark\pyFiles , gdzie adfspark jest kontenerem obiektów blob, a pyFiles jest folderem plików w tym kontenerze. W tym przykładzie magazyn obiektów blob jest tym, który jest skojarzony z klastrem Spark. Plik można przekazać do innego konta magazynu. Jeśli to zrobisz, utwórz połączoną usługę Storage, aby połączyć to konto magazynu z fabryką danych. Następnie określ nazwę połączonej usługi jako wartość właściwości sparkJobLinkedService . Aby uzyskać więcej informacji o tej właściwości i innych właściwościach obsługiwanych przez działanie platformy Spark, zobacz Właściwości działania platformy Spark.

    3. Właściwość entryFilePath jest ustawiona na test.py, czyli plik języka Python.

    4. Właściwość getDebugInfo jest ustawiona na Zawsze, co oznacza, że pliki dziennika są zawsze generowane (powodzenie lub niepowodzenie).

      Ważne

      Nie zalecamy ustawiania tej właściwości na Always w środowisku produkcyjnym, chyba że rozwiązujesz problem.

    5. Sekcja dane wyjściowe zawiera jeden wyjściowy zestaw danych. Musisz określić wyjściowy zestaw danych, nawet jeśli program Spark nie generuje żadnych danych wyjściowych. Wyjściowy zestaw danych napędza harmonogram potoku (co godzinę, codziennie).

    Aby uzyskać więcej informacji o właściwościach obsługiwanych przez działanie platformy Spark, zobacz sekcję Właściwości działania platformy Spark.

  3. Aby wdrożyć potok, wybierz pozycję Wdróż na pasku poleceń.

Monitorowanie potoku

  1. W bloku Fabryka danych wybierz pozycję Monitorowanie & zarządzaj , aby uruchomić aplikację monitorowania na innej karcie.

    Monitorowanie & kafelka Zarządzanie

  2. Zmień filtr Godzina rozpoczęcia u góry na 2.1.2017, a następnie wybierz pozycję Zastosuj.

  3. Zostanie wyświetlone tylko jedno okno działania, ponieważ między rozpoczęciem (2017-02-01) i czasem zakończenia (2017-02-02) potoku. Upewnij się, że wycinek danych jest w stanie Gotowe .

    Monitor the pipeline (Monitorowanie potoku)

  4. Na liście Okna działania wybierz przebieg działania, aby wyświetlić szczegółowe informacje o nim. Jeśli wystąpi błąd, zobaczysz szczegółowe informacje o nim w okienku po prawej stronie.

Weryfikowanie wyników

  1. Uruchom Jupyter Notebook dla klastra spark usługi HDInsight, przechodząc do https://CLUSTERNAME.azurehdinsight.net/jupyterwitryny . Możesz również otworzyć pulpit nawigacyjny klastra dla klastra SPARK usługi HDInsight, a następnie uruchomić Jupyter Notebook.

  2. Wybierz pozycję Nowy>PySpark , aby uruchomić nowy notes.

    Nowy notes Jupyter

  3. Uruchom następujące polecenie, kopiując i wklejając tekst, naciskając klawisze Shift+Enter na końcu drugiej instrukcji:

    %%sql
    
    SELECT buildingID, (targettemp - actualtemp) AS temp_diff, date FROM hvac WHERE date = \"6/1/13\"
    
  4. Upewnij się, że są widoczne dane z tabeli hvac.

    Wyniki zapytania Jupyter

Aby uzyskać szczegółowe instrukcje, zobacz sekcję Uruchamianie zapytania Spark SQL.

Rozwiązywanie problemów

Ponieważ ustawisz wartość getDebugInfo na Zawsze, w folderze pyFiles w kontenerze obiektów blob zostanie wyświetlony podfolder dziennika. Plik dziennika w folderze dziennika zawiera dodatkowe informacje. Ten plik dziennika jest szczególnie przydatny w przypadku wystąpienia błędu. W środowisku produkcyjnym może być konieczne ustawienie go na Błąd.

Aby uzyskać więcej informacji na temat rozwiązywania problemów, wykonaj następujące czynności:

  1. Przejdź do witryny https://<CLUSTERNAME>.azurehdinsight.net/yarnui/hn/cluster.

    Aplikacja interfejsu użytkownika usługi YARN

  2. Wybierz pozycję Dzienniki dla jednej z prób uruchomienia.

    Strona aplikacji

  3. Na stronie dziennika są widoczne następujące dodatkowe informacje o błędzie:

    Błąd dziennika

W poniższych sekcjach przedstawiono informacje o jednostkach fabryki danych do korzystania z klastra Spark i działania platformy Spark w fabryce danych.

Właściwości działania platformy Spark

Oto przykładowa definicja JSON potoku z działaniem platformy Spark:

{
    "name": "SparkPipeline",
    "properties": {
        "activities": [
            {
                "type": "HDInsightSpark",
                "typeProperties": {
                    "rootPath": "adfspark\\pyFiles",
                    "entryFilePath": "test.py",
                    "arguments": [ "arg1", "arg2" ],
                    "sparkConfig": {
                        "spark.python.worker.memory": "512m"
                    },
                    "getDebugInfo": "Always"
                },
                "outputs": [
                    {
                        "name": "OutputDataset"
                    }
                ],
                "name": "MySparkActivity",
                "description": "This activity invokes the Spark program",
                "linkedServiceName": "HDInsightLinkedService"
            }
        ],
        "start": "2017-02-01T00:00:00Z",
        "end": "2017-02-02T00:00:00Z"
    }
}

W poniższej tabeli opisano właściwości JSON używane w definicji JSON.

Właściwość Opis Wymagane
name Nazwa działania w potoku. Tak
description (opis) Tekst opisujący działanie. Nie
typ Ta właściwość musi być ustawiona na HDInsightSpark. Tak
linkedServiceName Nazwa połączonej usługi HDInsight, w której działa program Spark. Tak
rootPath Kontener obiektów blob i folder zawierający plik Spark. W nazwie pliku jest uwzględniana wielkość liter. Tak
entryFilePath Ścieżka względna do folderu głównego kodu/pakietu Spark. Tak
Classname Główna klasa Java/Spark aplikacji. Nie
Argumenty Lista argumentów wiersza polecenia programu Spark. Nie
proxyUser Konto użytkownika do personifikacji w celu wykonania programu Spark. Nie
sparkConfig Określ wartości właściwości konfiguracji platformy Spark wymienionych w konfiguracji platformy Spark: właściwości aplikacji. Nie
getDebugInfo Określa, kiedy pliki dziennika spark są kopiowane do magazynu używanego przez klaster usługi HDInsight (lub) określony przez sparkJobLinkedService. Dozwolone wartości to None, Always lub Failure. Wartość domyślna to Brak. Nie
sparkJobLinkedService Połączona usługa Storage, która przechowuje plik zadania platformy Spark, zależności i dzienniki. Jeśli nie określisz wartości dla tej właściwości, zostanie użyty magazyn skojarzony z klastrem usługi HDInsight. Nie

Struktura folderów

Działanie platformy Spark nie obsługuje skryptu wbudowanego, ponieważ działania pig i Hive są wykonywane. Zadania platformy Spark są również bardziej rozszerzalne niż zadania Pig/Hive. W przypadku zadań platformy Spark można podać wiele zależności, takich jak pakiety jar (umieszczone w klasie CLASSPATH języka Java), pliki języka Python (umieszczone w ścieżce PYTHONPATH) i inne pliki.

Utwórz następującą strukturę folderów w magazynie obiektów blob, do których odwołuje się połączona usługa HDInsight. Następnie przekaż pliki zależne do odpowiednich podfolderów w folderze głównym reprezentowanym przez parametr entryFilePath. Na przykład przekaż pliki języka Python do podfolderu pyFiles i plików jar do podfolderu plików jar folderu głównego. W czasie wykonywania usługa Data Factory oczekuje następującej struktury folderów w magazynie obiektów blob:

Ścieżka Opis Wymagane Typ
. Ścieżka główna zadania Spark w połączonej usłudze magazynu. Tak Folder
<zdefiniowany przez użytkownika > Ścieżka wskazująca plik wejściowy zadania platformy Spark. Tak Plik
./Słoiki Wszystkie pliki w tym folderze są przekazywane i umieszczane w ścieżce klas Java klastra. Nie Folder
./pyFiles Wszystkie pliki w tym folderze są przekazywane i umieszczane w ścieżce PYTHONPATH klastra. Nie Folder
./Pliki Wszystkie pliki w tym folderze są przekazywane i umieszczane w katalogu roboczym funkcji wykonawczej. Nie Folder
./Archiwum Wszystkie pliki w tym folderze są nieskompresowane. Nie Folder
./Dzienniki Folder, w którym są przechowywane dzienniki z klastra Spark. Nie Folder

Oto przykład magazynu zawierającego dwa pliki zadań platformy Spark w magazynie obiektów blob, do których odwołuje się połączona usługa HDInsight:

SparkJob1
    main.jar
    files
        input1.txt
        input2.txt
    jars
        package1.jar
        package2.jar
    logs

SparkJob2
    main.py
    pyFiles
        scrip1.py
        script2.py
    logs