Przekształcanie danych przy użyciu działania przesyłania strumieniowego usługi Hadoop w Azure Data Factory

Uwaga

Ten artykuł dotyczy wersji 1 usługi Data Factory. Jeśli używasz bieżącej wersji usługi Data Factory, zobacz przekształcanie danych przy użyciu działania przesyłania strumieniowego usługi Hadoop w usłudze Data Factory.

Możesz użyć działania HDInsightStreamingActivity wywołać zadanie przesyłania strumieniowego hadoop z potoku Azure Data Factory. Poniższy fragment kodu JSON przedstawia składnię używania elementu HDInsightStreamingActivity w pliku JSON potoku.

Działanie przesyłania strumieniowego usługi HDInsight w potoku usługi Data Factory wykonuje programy przesyłania strumieniowego hadoop na własnym lub na żądanie opartego na systemie Windows/Linux klastra usługi HDInsight. Ten artykuł opiera się na artykule dotyczącym działań przekształcania danych , który przedstawia ogólne omówienie transformacji danych i obsługiwanych działań przekształcania.

Uwaga

Jeśli dopiero zaczynasz Azure Data Factory, zapoznaj się z artykułem Wprowadzenie do Azure Data Factory i wykonaj samouczek: Tworzenie pierwszego potoku danych przed przeczytaniem tego artykułu.

Przykład JSON

Klaster usługi HDInsight jest automatycznie wypełniany przykładowymi programami (wc.exe i cat.exe) oraz danymi (davinci.txt). Domyślnie nazwa kontenera używanego przez klaster usługi HDInsight to nazwa samego klastra. Jeśli na przykład nazwa klastra to myhdicluster, nazwa skojarzonego kontenera obiektów blob będzie myhdicluster.

{
    "name": "HadoopStreamingPipeline",
    "properties": {
        "description": "Hadoop Streaming Demo",
        "activities": [
            {
                "type": "HDInsightStreaming",
                "typeProperties": {
                    "mapper": "cat.exe",
                    "reducer": "wc.exe",
                    "input": "wasb://<nameofthecluster>@spestore.blob.core.windows.net/example/data/gutenberg/davinci.txt",
                    "output": "wasb://<nameofthecluster>@spestore.blob.core.windows.net/example/data/StreamingOutput/wc.txt",
                    "filePaths": [
                        "<nameofthecluster>/example/apps/wc.exe",
                        "<nameofthecluster>/example/apps/cat.exe"
                    ],
                    "fileLinkedService": "AzureStorageLinkedService",
                    "getDebugInfo": "Failure"
                },
                "outputs": [
                    {
                        "name": "StreamingOutputDataset"
                    }
                ],
                "policy": {
                    "timeout": "01:00:00",
                    "concurrency": 1,
                    "executionPriorityOrder": "NewestFirst",
                    "retry": 1
                },
                "scheduler": {
                    "frequency": "Day",
                    "interval": 1
                },
                "name": "RunHadoopStreamingJob",
                "description": "Run a Hadoop streaming job",
                "linkedServiceName": "HDInsightLinkedService"
            }
        ],
        "start": "2014-01-04T00:00:00Z",
        "end": "2014-01-05T00:00:00Z"
    }
}

Pamiętaj o następujących kwestiach:

  1. Ustaw wartość linkedServiceName na nazwę połączonej usługi wskazującej klaster usługi HDInsight, na którym jest uruchamiane zadanie mapreduce przesyłania strumieniowego.
  2. Ustaw typ działania na HDInsightStreaming.
  3. Dla właściwości maper określ nazwę pliku wykonywalnego mapowania. W tym przykładzie cat.exe jest plikiem wykonywalny maper.
  4. Dla właściwości redukcji określ nazwę pliku wykonywalnego reduktora. W tym przykładzie wc.exe jest plikiem wykonywalny reduktora.
  5. Dla właściwości typu wejściowego określ plik wejściowy (w tym lokalizację) dla mapowania. W przykładzie: : wasb://adfsample@<account name>.blob.core.windows.net/example/data/gutenberg/davinci.txtadfsample jest kontenerem obiektów blob, przykład/data/Gutenberg jest folderem, a davinci.txt jest obiektem blob.
  6. Dla właściwości typu danych wyjściowych określ plik wyjściowy (w tym lokalizację) dla reduktora. Dane wyjściowe zadania przesyłania strumieniowego usługi Hadoop są zapisywane w lokalizacji określonej dla tej właściwości.
  7. W sekcji filePaths określ ścieżki dla plików wykonywalnych mapatora i reduktora. W przykładzie: "adfsample/example/apps/wc.exe", adfsample to kontener obiektów blob, przykład/aplikacje to folder, a wc.exe to plik wykonywalny.
  8. Dla właściwości fileLinkedService określ połączoną usługę Azure Storage, która reprezentuje usługę Azure Storage zawierającą pliki określone w sekcji filePaths.
  9. Dla właściwości argumentów określ argumenty zadania przesyłania strumieniowego.
  10. Właściwość getDebugInfo jest opcjonalnym elementem. Po ustawieniu opcji Niepowodzenie dzienniki są pobierane tylko po awarii. Po ustawieniu opcji Zawsze dzienniki są zawsze pobierane niezależnie od stanu wykonywania.

Uwaga

Jak pokazano w przykładzie, należy określić wyjściowy zestaw danych dla działania przesyłania strumieniowego usługi Hadoop dla właściwości outputs . Ten zestaw danych jest tylko fikcyjnym zestawem danych wymaganym do napędzania harmonogramu potoku. Nie trzeba określać żadnego wejściowego zestawu danych dla działania dla właściwości inputs .

Przykład

Potok w tym przewodniku uruchamia program mapowania przesyłania strumieniowego/redukcji liczby wyrazów w klastrze usługi Azure HDInsight.

Połączone usługi

Połączona usługa Azure Storage

Najpierw utworzysz połączoną usługę, aby połączyć usługę Azure Storage używaną przez klaster usługi Azure HDInsight do fabryki danych Azure. Jeśli skopiujesz/wklejesz następujący kod, nie zapomnij zastąpić nazwy konta i klucza konta nazwą i kluczem usługi Azure Storage.

{
    "name": "StorageLinkedService",
    "properties": {
        "type": "AzureStorage",
        "typeProperties": {
            "connectionString": "DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>"
        }
    }
}

Połączona usługa Azure HDInsight

Następnie utworzysz połączoną usługę, aby połączyć klaster usługi Azure HDInsight z fabryką danych Azure. Jeśli skopiujesz/wklejesz następujący kod, zastąp nazwę klastra usługi HDInsight nazwą klastra usługi HDInsight, a następnie zmień wartości nazwy użytkownika i hasła.

{
    "name": "HDInsightLinkedService",
    "properties": {
        "type": "HDInsight",
        "typeProperties": {
            "clusterUri": "https://<HDInsight cluster name>.azurehdinsight.net",
            "userName": "admin",
            "password": "**********",
            "linkedServiceName": "StorageLinkedService"
        }
    }
}

Zestawy danych

Wyjściowy zestaw danych

Potok w tym przykładzie nie pobiera żadnych danych wejściowych. Należy określić wyjściowy zestaw danych dla działania przesyłania strumieniowego usługi HDInsight. Ten zestaw danych jest tylko fikcyjnym zestawem danych wymaganym do napędzania harmonogramu potoku.

{
    "name": "StreamingOutputDataset",
    "properties": {
        "published": false,
        "type": "AzureBlob",
        "linkedServiceName": "StorageLinkedService",
        "typeProperties": {
            "folderPath": "adftutorial/streamingdata/",
            "format": {
                "type": "TextFormat",
                "columnDelimiter": ","
            },
        },
        "availability": {
            "frequency": "Day",
            "interval": 1
        }
    }
}

Potok

Potok w tym przykładzie ma tylko jedno działanie o typie: HDInsightStreaming.

Klaster usługi HDInsight jest automatycznie wypełniany przykładowymi programami (wc.exe i cat.exe) oraz danymi (davinci.txt). Domyślnie nazwa kontenera używanego przez klaster usługi HDInsight to nazwa samego klastra. Jeśli na przykład nazwa klastra to myhdicluster, nazwa skojarzonego kontenera obiektów blob będzie myhdicluster.

{
    "name": "HadoopStreamingPipeline",
    "properties": {
        "description": "Hadoop Streaming Demo",
        "activities": [
            {
                "type": "HDInsightStreaming",
                "typeProperties": {
                    "mapper": "cat.exe",
                    "reducer": "wc.exe",
                    "input": "wasb://<blobcontainer>@spestore.blob.core.windows.net/example/data/gutenberg/davinci.txt",
                    "output": "wasb://<blobcontainer>@spestore.blob.core.windows.net/example/data/StreamingOutput/wc.txt",
                    "filePaths": [
                        "<blobcontainer>/example/apps/wc.exe",
                        "<blobcontainer>/example/apps/cat.exe"
                    ],
                    "fileLinkedService": "StorageLinkedService"
                },
                "outputs": [
                    {
                        "name": "StreamingOutputDataset"
                    }
                ],
                "policy": {
                    "timeout": "01:00:00",
                    "concurrency": 1,
                    "executionPriorityOrder": "NewestFirst",
                    "retry": 1
                },
                "scheduler": {
                    "frequency": "Day",
                    "interval": 1
                },
                "name": "RunHadoopStreamingJob",
                "description": "Run a Hadoop streaming job",
                "linkedServiceName": "HDInsightLinkedService"
            }
        ],
        "start": "2017-01-03T00:00:00Z",
        "end": "2017-01-04T00:00:00Z"
    }
}

Zobacz też