Azure Data Factory で Spark アクティビティを使用してクラウドのデータを変換する

適用対象: Azure Data Factory Azure Synapse Analytics

ヒント

企業向けのオールインワン分析ソリューション、Microsoft Fabric の Data Factory をお試しください。 Microsoft Fabric は、データ移動からデータ サイエンス、リアルタイム分析、ビジネス インテリジェンス、レポートまで、あらゆるものをカバーしています。 無料で新しい試用版を開始する方法について説明します。

このチュートリアルでは、Azure Portal を使用して Azure Data Factory パイプラインを作成します。 このパイプラインは、Spark アクティビティとオンデマンドの Azure HDInsight のリンクされたサービスを使用して、データを変換します。

このチュートリアルでは、以下の手順を実行します。

  • データ ファクトリを作成します。
  • Spark アクティビティを使用するパイプラインを作成します。
  • パイプラインの実行をトリガーする。
  • パイプラインの実行を監視します。

Azure サブスクリプションをお持ちでない場合は、開始する前に 無料アカウント を作成してください。

前提条件

Note

Azure を操作するには、Azure Az PowerShell モジュールを使用することをお勧めします。 作業を開始するには、Azure PowerShell のインストールに関する記事を参照してください。 Az PowerShell モジュールに移行する方法については、「AzureRM から Az への Azure PowerShell の移行」を参照してください。

  • Azure ストレージ アカウント。 Python スクリプトと入力ファイルを作成し、Azure Storage にアップロードします。 Spark プログラムからの出力は、このストレージ アカウントに格納されます。 オンデマンドの Spark クラスターでは、同じストレージ アカウントがプライマリ ストレージとして使用されます。

Note

HDInsight は、Standard レベルで、汎用ストレージ アカウントのみをサポートしています。 アカウントが Premium または BLOB のみのストレージ アカウントでないことを確認してください。

Python スクリプトを BLOB ストレージ アカウントにアップロードする

  1. 次の内容が含まれた、WordCount_Spark.py という名前の Python ファイルを作成します。

    import sys
    from operator import add
    
    from pyspark.sql import SparkSession
    
    def main():
        spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .getOrCreate()
    
        lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0])
        counts = lines.flatMap(lambda x: x.split(' ')) \
            .map(lambda x: (x, 1)) \
            .reduceByKey(add)
        counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount")
    
        spark.stop()
    
    if __name__ == "__main__":
        main()
    
  2. <storageAccountName> を Azure ストレージ アカウントの名前に置き換えます。 その後、ファイルを保存します。

  3. Azure BLOB ストレージで、adftutorial という名前のコンテナーを作成します (存在しない場合)。

  4. spark という名前のフォルダーを作成します。

  5. spark フォルダーの下に、script という名前のサブフォルダーを作成します。

  6. WordCount_Spark.py ファイルを script サブフォルダーにアップロードします。

入力ファイルをアップロードする

  1. minecraftstory.txt という名前のファイルを作成し、任意のテキストを入力しておきます。 このテキストの単語数が Spark プログラムによってカウントされます。
  2. spark フォルダーの下に、inputfiles という名前のサブフォルダーを作成します。
  3. inputfiles サブフォルダーに minecraftstory.txt ファイルをアップロードします。

Data Factory の作成

使うデータ ファクトリがまだない場合は、記事「クイック スタート: Azure portal を使用してデータ ファクトリを作成する」の手順に従って作成します。

リンクされたサービスを作成します

このセクションでは、2 つのリンクされたサービスを作成します。

  • Azure ストレージ アカウントをデータ ファクトリにリンクする、Azure Storage のリンクされたサービス。 このストレージは、オンデマンドの HDInsight クラスターによって使用されます。 ここには、実行される Spark スクリプトも含まれています。
  • オンデマンドの HDInsight のリンクされたサービス。 Azure Data Factory によって自動的に HDInsight クラスターが作成され、Spark プログラムが実行されます。 HDInsight クラスターは、事前に構成された時間だけアイドル状態になったら削除されます。

Azure Storage のリンクされたサービスを作成する

  1. ホーム ページの左側のパネルで [管理] タブに切り替えます。

    Screenshot that shows the Manage tab.

  2. ウィンドウの下部にある [接続] を選択して、 [+ 新規] を選択します。

    Buttons for creating a new connection

  3. [New Linked Service](新しいリンクされたサービス) ウィンドウで、[データ ストア]>[Azure Blob Storage] を選択し、[続行] を選択します。

    Selecting the "Azure Blob Storage" tile

  4. [ストレージ アカウント名] で一覧から名前を選択し、[保存] を選択します。

    Box for specifying the storage account name

オンデマンドの HDInsight のリンクされたサービスを作成する

  1. [+ 新規] ボタンをもう一度選択して、別のリンクされたサービスを作成します。

  2. [New Linked Service](新しいリンクされたサービス) ウィンドウで、[コンピューティング]>[Azure HDInsight] の順に選択し、[続行] を選択します。

    Selecting the "Azure HDInsight" tile

  3. [New Linked Service](新しいリンクされたサービス) ウィンドウで、次の手順を完了します。

    a. [名前] に「AzureHDInsightLinkedService」と入力します。

    b. [Type](タイプ)[On-demand HDInsight](オンデマンド HDInsight) が選択されていることを確認します。

    c. [Azure Storage Linked Service](Azure Storage のリンクされたサービス)[AzureBlobStorage1] を選択します。 このリンクされたサービスは、前の手順で作成したものです。 別の名前を使用した場合は、ここで適切な名前を指定します。

    d. [クラスターの種類][spark] を選択します。

    e. [サービス プリンシパル ID] に、HDInsight クラスターを作成するアクセス許可を備えたサービス プリンシパルの ID を入力します。

    このサービス プリンシパルは、サブスクリプションまたはクラスターが作成されるリソース グループの共同作成者ロールのメンバーである必要があります。 詳細については、「Microsoft Entra アプリケーションとサービス プリンシパルの作成」を参照してください。 [サービス プリンシパル ID] は "アプリケーション ID" に、 [サービス プリンシパル キー] は "クライアント シークレット" の値に相当します。

    f. [サービス プリンシパル キー] に、キーを入力します。

    g. [リソース グループ] に、データ ファクトリの作成時に使用したのと同じリソース グループを選択します。 Spark クラスターは、このリソース グループに作成されます。

    h. [OS の種類] を展開します。

    i. [クラスター ユーザー名] に名前を入力します。

    j. そのユーザーのクラスター パスワードを入力します。

    k. [完了] を選択します。

    HDInsight linked service settings

Note

Azure HDInsight には、サポートされる各 Azure リージョンで使用できるコアの合計数に制限があります。 オンデマンドの HDInsight のリンクされたサービスの場合、HDInsight クラスターは、プライマリ ストレージとして使用されるのと同じ Azure ストレージの場所に作成されます。 クラスターを正常に作成するための十分なコア クォータがあることを確認します。 詳細については、「Hadoop、Spark、Kafka などの HDInsight クラスターをセットアップする」を参照してください。

パイプラインを作成する

  1. + (正符号) ボタンを選択し、メニューの [パイプライン] を選択します。

    Buttons for creating a new pipeline

  2. [アクティビティ] ツールボックスで [HDInsight] を展開します。 [アクティビティ] ツールボックスからパイプライン デザイナー画面に Spark アクティビティをドラッグします。

    Dragging the Spark activity

  3. 下部の Spark アクティビティ ウィンドウのプロパティで、次の手順を完了します。

    a. [HDI Cluster](HDI クラスター) タブに切り替えます。

    b. (前の手順で作成した) AzureHDInsightLinkedService を選択します。

    Specifying the HDInsight linked service

  4. [Script/Jar](スクリプト/Jar) タブに切り替えて、次の手順を実行します。

    a. [Job Linked Service](ジョブのリンクされたサービス)[AzureBlobStorage1] を選択します。

    b. [ストレージを参照] を選択します。

    Specifying the Spark script on the "Script/Jar" tab

    c. adftutorial/spark/script フォルダーに移動します。WordCount_Spark.py を選択し、[完了] を選択します。

  5. パイプラインを検証するために、ツール バーの [検証] ボタンを選択します。 >> (右矢印) ボタンを選択して、検証ウィンドウを閉じます。

    "Validate" button

  6. [すべて公開] を選択します。 Data Factory UI により、エンティティ (リンクされたサービスとパイプライン) が Azure Data Factory サービスに公開されます。

    "Publish All" button

パイプラインの実行をトリガーする

ツール バーの [トリガーの追加] を選択し、 [Trigger Now](今すぐトリガー) を選択します。

"Trigger" and "Trigger Now" buttons

パイプラインの実行を監視します

  1. [監視] タブに切り替えます。パイプラインが実行されていることを確認します。 Spark クラスターの作成には約 20 分かかります。

  2. [最新の情報に更新] を定期的にクリックして、パイプラインの実行の状態を確認します。

    Tab for monitoring pipeline runs, with "Refresh" button

  3. パイプラインの実行に関連付けられているアクティビティの実行を表示するために、 [アクション] 列の [View Activity Runs](アクティビティの実行の表示) を選択します。

    Pipeline run status

    上部の [All Pipeline Runs](すべてのパイプラインの実行) リンクを選択すると、パイプラインの実行ビューに戻ることができます。

    "Activity Runs" view

出力を検証する

adftutorial コンテナーの spark/otuputfiles/wordcount フォルダーに出力ファイルが作成されていることを確認します。

Location of the output file

このファイルには、入力テキスト ファイルに含まれている単語と、各単語がファイル内に出現する回数が含まれています。 次に例を示します。

(u'This', 1)
(u'a', 1)
(u'is', 1)
(u'test', 1)
(u'file', 1)

このサンプルのパイプラインでは、Spark アクティビティとオンデマンドの HDInsight のリンクされたサービスを使用して、データを変換します。 以下の方法を学習しました。

  • データ ファクトリを作成します。
  • Spark アクティビティを使用するパイプラインを作成します。
  • パイプラインの実行をトリガーする。
  • パイプラインの実行を監視します。

仮想ネットワークにある Azure HDInsight クラスター上で Hive スクリプトを実行してデータを変換する方法については、次のチュートリアルに進んでください。