Azure Databricks ジョブで JAR を使用する

Java アーカイブまたは [JAR](https://en.wikipedia.org/wiki/JAR_(file_format)) ファイル形式は、一般的な ZIP ファイル形式に基づいており、多数の Java または Scala ファイルを 1 つに集約するために使用されます。 JAR タスクを使用すると、Azure Databricks ジョブで Java または Scala コードを迅速かつ確実にインストールできます。 この記事では、JAR と、JAR にパッケージ化されたアプリケーションを実行するジョブを作成する例を示します。 この例では、次の操作を行います。

  • サンプル アプリケーションを定義する JAR プロジェクトを作成します。
  • サンプル ファイルを JAR にバンドルします。
  • JAR を実行するジョブを作成します。
  • ジョブを実行し、結果を表示します。

開始する前に

この例を実行するためには次が必要です。

  • Java JAR の場合、Java Development Kit (JDK)。
  • Scala JAR の場合、JDK と sbt。

手順 1: 例のローカル ディレクトリを作成する

サンプル コードと生成された成果物を保持するローカル ディレクトリを作成します (例: databricks_jar_test)。

手順 2: JAR を作成する

Java または Scala を使用して JAR を作成するには、次の手順を実行します。

Java JAR を作成する

  1. databricks_jar_test フォルダーで、PrintArgs.java という名前のファイルを次のコンテンツで作成します。

    import java.util.Arrays;
    
    public class PrintArgs {
      public static void main(String[] args) {
        System.out.println(Arrays.toString(args));
      }
    }
    
  2. PrintArgs.java ファイルをコンパイルすると、それにより PrintArgs.class ファイルが作成されます。

    javac PrintArgs.java
    
  3. (省略可能) コンパイルされたプログラムを実行します。

    java PrintArgs Hello World!
    
    # [Hello, World!]
    
  4. PrintArgs.java および PrintArgs.class ファイルと同じフォルダーに、META-INF という名前のフォルダーを作成します。

  5. META-INF フォルダーで、MANIFEST.MF という名前のファイルを次の内容で作成します。 このファイルの末尾には、改行を追加してください。

    Main-Class: PrintArgs
    
  6. databricks_jar_test フォルダーのルートから、PrintArgs.jar という名前の JAR を作成します。

    jar cvfm PrintArgs.jar META-INF/MANIFEST.MF *.class
    
  7. (省略可能) それをテストするには、databricks_jar_test フォルダーのルートから JAR を実行します。

    java -jar PrintArgs.jar Hello World!
    
    # [Hello, World!]
    

    Note

    エラー no main manifest attribute, in PrintArgs.jar が発生した場合は、MANIFEST.MF ファイルの末尾に改行を追加してから、JAR の作成と実行をもう一度試してください。

  8. ボリュームに PrintArgs.jar をアップロードします。 「Unity Catalog ボリュームにファイルをアップロードする」を参照してください。

Scala JAR を作成する

  1. databricks_jar_test フォルダーから、build.sbt という名前の空のファイルを次のコンテンツで作成します。

    ThisBuild / scalaVersion := "2.12.14"
    ThisBuild / organization := "com.example"
    
    lazy val PrintArgs = (project in file("."))
      .settings(
        name := "PrintArgs"
      )
    
  2. databricks_jar_test フォルダーから 、フォルダー構造 src/main/scala/example を作成します。

  3. example フォルダーで、PrintArgs.scala という名前のファイルを次のコンテンツで作成します。

    package example
    
    object PrintArgs {
      def main(args: Array[String]): Unit = {
        println(args.mkString(", "))
      }
    }
    
  4. プログラムをコンパイルします。

    sbt compile
    
  5. (省略可能) コンパイルされたプログラムを実行します。

    sbt "run Hello World\!"
    
    # Hello, World!
    
  6. databricks_jar_test/project フォルダーで、assembly.sbt という名前のファイルを次のコンテンツで作成します。

    addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.0.0")
    
  7. databricks_jar_test フォルダーのルートから、assembly コマンドを実行します。このコマンドを実行すると、 target フォルダーの下に JAR が生成されます。

    sbt assembly
    
  8. (省略可能) それをテストするには、databricks_jar_test フォルダーのルートから JAR を実行します。

    java -jar target/scala-2.12/PrintArgs-assembly-0.1.0-SNAPSHOT.jar Hello World!
    
    # Hello, World!
    
  9. ボリュームに PrintArgs-assembly-0.1.0-SNAPSHOT.jar をアップロードします。 「Unity Catalog ボリュームにファイルをアップロードする」を参照してください。

手順 3. JAR を実行する Azure Databricks ジョブを作成する

  1. Azure Databricks のランディング ページに移動し、次のいずれかの操作を行います。
    • サイドバーで、[ジョブ] アイコン[ワークフロー] をクリックし、[ジョブの作成] ボタン をクリックします。
    • サイドバーで、[新規] アイコン[新規] をクリックし、メニューから [ジョブ] を選択します。
  2. [タスク] タブに表示されるタスクのダイアログ ボックスで、[ジョブの名前を追加...] をジョブ名 (例: JAR example) に置き換えます。
  3. [タスク名] に、タスクの名前を入力します (たとえば、Java の場合は「java_jar_task」、または Scala の場合は「scala_jar_task」)。
  4. [種類] で、[JAR] を選択します。
  5. [Main クラス] には、この例では Java の場合に「PrintArgs」または Scala の場合に「example.PrintArgs」と入力します。
  6. [クラスター] で互換性のあるクラスターを選択します。 「Java ライブラリと Scala ライブラリのサポート」を参照してください。
  7. [依存ライブラリ] で、[+ 追加] をクリックします。
  8. [依存ライブラリの追加] ダイアログで、[ボリューム] が選択されている状態で、前の手順で JAR (PrintArgs.jar または PrintArgs-assembly-0.1.0-SNAPSHOT.jar) をアップロードした場所を [ボリューム ファイル パス] に入力するか、フィルター処理または参照して JAR を見つけます。 それを選択します。
  9. 追加をクリックします。
  10. [パラメーター] に、この例では「["Hello", "World!"]」と入力します。
  11. [追加] をクリックします。

手順 4: ジョブを実行し、ジョブの実行の詳細を表示する

[今すぐ実行] ボタン をクリックしてワークフローを実行します。 実行の詳細を表示するには、[トリガーされた実行] ポップアップで [実行の表示] をクリックするか、ジョブ実行ビューで実行の [開始時刻] 列のリンクをクリックします。

実行が完了すると、[出力] パネルに、タスクに渡された引数を含む出力が表示されます。

JAR ジョブの出力サイズの制限

ジョブ出力 (stdout に出力されるログ出力など) には、20 MB のサイズ制限が適用されます。 出力の合計サイズがこれより大きい場合は実行が取り消され、失敗としてマークされます。

この制限が発生しないようにするために、Spark 構成 spark.databricks.driver.disableScalaOutputtrue に設定して、ドライバーから Azure Databricks に stdout が返されないようにすることができます。 既定では、このフラグの値は false です。 このフラグは、Scala の JAR ジョブと Scala のノートブックのセル出力を制御します。 フラグが有効になっている場合、ジョブの実行結果が Spark からクライアントに返されません。 フラグは、クラスターのログ ファイルに書き込まれるデータには影響を与えません。 このフラグを設定すると、ノートブックの結果が無効になるため、JAR ジョブのジョブ クラスターに対してのみ設定することをお勧めします。

推奨事項: 共有を使用する SparkContext

Azure Databricks はマネージド サービスなので、Apache Spark ジョブが正しく実行されるように、一部のコード変更が必要な場合があります。 JAR ジョブ プログラムでは、共有 SparkContext API を使用して SparkContext を取得する必要があります。 Azure Databricks によって SparkContext が初期化されるため、new SparkContext() を呼び出すプログラムは失敗します。 SparkContext を取得するには、Azure Databricks によって作成された共有 SparkContext のみを使用します。

val goodSparkContext = SparkContext.getOrCreate()
val goodSparkSession = SparkSession.builder().getOrCreate()

共有 SparkContext を使用するときに避けるべきいくつかの方法もあります。

  • SparkContext.stop() を呼び出さないでください。
  • Main プログラムの最後で System.exit(0) または sc.stop() を呼び出さないでください。 これにより、未定義の動作が発生する可能性があります。

推奨事項: ジョブのクリーンに try-finally ブロックを使用する

次の 2 つの部分で構成される JAR について考えます。

  • jobBody(): ジョブの主要な部分が含まれます。
  • jobCleanup(): その関数が成功したか例外を返したかにかかわらず、jobBody() の後に実行する必要があります。

たとえば、jobBody() はテーブルを作成し、jobCleanup() はそれらのテーブルを削除します。

クリーンアップ メソッドを確実に呼び出す安全な方法は、コードに try-finally ブロックを配置することです。

try {
  jobBody()
} finally {
  jobCleanup()
}

sys.addShutdownHook(jobCleanup) または次のコードを使用してクリーンアップをしようと "しないで" ください。

val cleanupThread = new Thread { override def run = jobCleanup() }
Runtime.getRuntime.addShutdownHook(cleanupThread)

Spark コンテナーの有効期間が Azure Databricks で管理される方法が原因で、シャットダウン フックが確実に実行されません。

JAR ジョブ パラメーターを構成する

JSON 文字列配列を使用して JAR ジョブにパラメーターを渡します。 Jobs API の新しいジョブの作成操作 (POST /jobs/create) に渡される要求本文の spark_jar_task オブジェクトを参照してください。 これらのパラメーターにアクセスするには、main 関数に渡された String 配列を調べます。

ライブラリの依存関係を管理する

Spark ドライバーには、オーバーライドできない特定のライブラリの依存関係があります。 ジョブによって競合するライブラリが追加される場合は、Spark ドライバー ライブラリの依存関係が優先されます。

ドライバー ライブラリの依存関係の完全なリストを取得するには、同じ Spark バージョンを使用して構成されたクラスター (または調査するドライバーを含むクラスター) に接続されているノートブック内で次のコマンドを実行します。

%sh
ls /databricks/jars

JAR のライブラリの依存関係を定義する場合、Spark と Hadoop を provided 依存関係として一覧表示することをお勧めします。 Maven で、指定された依存関係として Spark と Hadoop を追加します。

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.3.0</version>
  <scope>provided</scope>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-core</artifactId>
  <version>1.2.1</version>
  <scope>provided</scope>
</dependency>

sbt で、指定された依存関係として Spark と Hadoop を追加します。

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.0" % "provided"
libraryDependencies += "org.apache.hadoop" %% "hadoop-core" % "1.2.1" % "provided"

ヒント

実行しているバージョンに基づいて、依存関係に適切な Scala バージョンを指定します。

次のステップ

Azure Databricks ジョブの作成と実行の詳細については、「Azure Databricks ジョブを作成および実行する」を参照してください。