Azure Databricks ジョブで JAR を使用する
Java アーカイブまたは [JAR](https://en.wikipedia.org/wiki/JAR_(file_format)) ファイル形式は、一般的な ZIP ファイル形式に基づいており、多数の Java または Scala ファイルを 1 つに集約するために使用されます。 JAR タスクを使用すると、Azure Databricks ジョブで Java または Scala コードを迅速かつ確実にインストールできます。 この記事では、JAR と、JAR にパッケージ化されたアプリケーションを実行するジョブを作成する例を示します。 この例では、次の操作を行います。
- サンプル アプリケーションを定義する JAR プロジェクトを作成します。
- サンプル ファイルを JAR にバンドルします。
- JAR を実行するジョブを作成します。
- ジョブを実行し、結果を表示します。
始める前に
この例を実行するためには次が必要です。
- Java JAR の場合、Java Development Kit (JDK)。
- Scala JAR の場合、JDK と sbt。
手順 1: 例のローカル ディレクトリを作成する
サンプル コードと生成された成果物を保持するローカル ディレクトリを作成します (例: databricks_jar_test
)。
手順 2: JAR を作成する
Java または Scala を使用して JAR を作成するには、次の手順を実行します。
Java JAR を作成する
databricks_jar_test
フォルダーで、PrintArgs.java
という名前のファイルを次のコンテンツで作成します。import java.util.Arrays; public class PrintArgs { public static void main(String[] args) { System.out.println(Arrays.toString(args)); } }
PrintArgs.java
ファイルをコンパイルすると、それによりPrintArgs.class
ファイルが作成されます。javac PrintArgs.java
(省略可能) コンパイルされたプログラムを実行します。
java PrintArgs Hello World! # [Hello, World!]
PrintArgs.java
およびPrintArgs.class
ファイルと同じフォルダーに、META-INF
という名前のフォルダーを作成します。META-INF
フォルダーで、MANIFEST.MF
という名前のファイルを次の内容で作成します。 このファイルの末尾には、改行を追加してください。Main-Class: PrintArgs
databricks_jar_test
フォルダーのルートから、PrintArgs.jar
という名前の JAR を作成します。jar cvfm PrintArgs.jar META-INF/MANIFEST.MF *.class
(省略可能) それをテストするには、
databricks_jar_test
フォルダーのルートから JAR を実行します。java -jar PrintArgs.jar Hello World! # [Hello, World!]
Note
エラー
no main manifest attribute, in PrintArgs.jar
が発生した場合は、MANIFEST.MF
ファイルの末尾に改行を追加してから、JAR の作成と実行をもう一度試してください。ボリュームに
PrintArgs.jar
をアップロードします。 「Unity Catalog ボリュームにファイルをアップロードする」を参照してください。
Scala JAR を作成する
databricks_jar_test
フォルダーから、build.sbt
という名前の空のファイルを次のコンテンツで作成します。ThisBuild / scalaVersion := "2.12.14" ThisBuild / organization := "com.example" lazy val PrintArgs = (project in file(".")) .settings( name := "PrintArgs" )
databricks_jar_test
フォルダーから 、フォルダー構造src/main/scala/example
を作成します。example
フォルダーで、PrintArgs.scala
という名前のファイルを次のコンテンツで作成します。package example object PrintArgs { def main(args: Array[String]): Unit = { println(args.mkString(", ")) } }
プログラムをコンパイルします。
sbt compile
(省略可能) コンパイルされたプログラムを実行します。
sbt "run Hello World\!" # Hello, World!
databricks_jar_test/project
フォルダーで、assembly.sbt
という名前のファイルを次のコンテンツで作成します。addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.0.0")
databricks_jar_test
フォルダーのルートから、assembly
コマンドを実行します。このコマンドを実行すると、target
フォルダーの下に JAR が生成されます。sbt assembly
(省略可能) それをテストするには、
databricks_jar_test
フォルダーのルートから JAR を実行します。java -jar target/scala-2.12/PrintArgs-assembly-0.1.0-SNAPSHOT.jar Hello World! # Hello, World!
ボリュームに
PrintArgs-assembly-0.1.0-SNAPSHOT.jar
をアップロードします。 「Unity Catalog ボリュームにファイルをアップロードする」を参照してください。
手順 3. JAR を実行する Azure Databricks ジョブを作成する
- Azure Databricks のランディング ページに移動し、次のいずれかの操作を行います。
- サイドバーで、 [ワークフロー] をクリックし、 をクリックします。
- サイドバーで、 [新規] をクリックし、メニューから [ジョブ] を選択します。
- [タスク] タブに表示されるタスクのダイアログ ボックスで、[ジョブの名前を追加...] をジョブ名 (例:
JAR example
) に置き換えます。 - [タスク名] に、タスクの名前を入力します (たとえば、Java の場合は「
java_jar_task
」、または Scala の場合は「scala_jar_task
」)。 - [種類] で、[JAR] を選択します。
- [Main クラス] には、この例では Java の場合に「
PrintArgs
」または Scala の場合に「example.PrintArgs
」と入力します。 - [クラスター] で互換性のあるクラスターを選択します。 「Java ライブラリと Scala ライブラリのサポート」を参照してください。
- [依存ライブラリ] で、[+ 追加] をクリックします。
- [依存ライブラリの追加] ダイアログで、[ボリューム] が選択されている状態で、前の手順で JAR (
PrintArgs.jar
またはPrintArgs-assembly-0.1.0-SNAPSHOT.jar
) をアップロードした場所を [ボリューム ファイル パス] に入力するか、フィルター処理または参照して JAR を見つけます。 それを選択します。 - 追加をクリックします。
- [パラメーター] に、この例では「
["Hello", "World!"]
」と入力します。 - [追加] をクリックします。
手順 4: ジョブを実行し、ジョブの実行の詳細を表示する
をクリックしてワークフローを実行します。 実行の詳細を表示するには、[トリガーされた実行] ポップアップで [実行の表示] をクリックするか、ジョブ実行ビューで実行の [開始時刻] 列のリンクをクリックします。
実行が完了すると、[出力] パネルに、タスクに渡された引数を含む出力が表示されます。
JAR ジョブの出力サイズの制限
ジョブ出力 (stdout に出力されるログ出力など) には、20 MB のサイズ制限が適用されます。 出力の合計サイズがこれより大きい場合は実行が取り消され、失敗としてマークされます。
この制限が発生しないようにするために、Spark 構成 spark.databricks.driver.disableScalaOutput
を true
に設定して、ドライバーから Azure Databricks に stdout が返されないようにすることができます。 既定では、このフラグの値は false
です。 このフラグは、Scala の JAR ジョブと Scala のノートブックのセル出力を制御します。 フラグが有効になっている場合、ジョブの実行結果が Spark からクライアントに返されません。 フラグは、クラスターのログ ファイルに書き込まれるデータには影響を与えません。 このフラグを設定すると、ノートブックの結果が無効になるため、JAR ジョブのジョブ クラスターに対してのみ設定することをお勧めします。
推奨事項: 共有を使用する SparkContext
Azure Databricks はマネージド サービスなので、Apache Spark ジョブが正しく実行されるように、一部のコード変更が必要な場合があります。 JAR ジョブ プログラムでは、共有 SparkContext
API を使用して SparkContext
を取得する必要があります。 Azure Databricks によって SparkContext
が初期化されるため、new SparkContext()
を呼び出すプログラムは失敗します。 SparkContext
を取得するには、Azure Databricks によって作成された共有 SparkContext
のみを使用します。
val goodSparkContext = SparkContext.getOrCreate()
val goodSparkSession = SparkSession.builder().getOrCreate()
共有 SparkContext
を使用するときに避けるべきいくつかの方法もあります。
SparkContext.stop()
を呼び出さないでください。Main
プログラムの最後でSystem.exit(0)
またはsc.stop()
を呼び出さないでください。 これにより、未定義の動作が発生する可能性があります。
推奨事項: ジョブのクリーンに try-finally
ブロックを使用する
次の 2 つの部分で構成される JAR について考えます。
jobBody()
: ジョブの主要な部分が含まれます。jobCleanup()
: その関数が成功したか例外を返したかにかかわらず、jobBody()
の後に実行する必要があります。
たとえば、jobBody()
はテーブルを作成し、jobCleanup()
はそれらのテーブルを削除します。
クリーンアップ メソッドを確実に呼び出す安全な方法は、コードに try-finally
ブロックを配置することです。
try {
jobBody()
} finally {
jobCleanup()
}
sys.addShutdownHook(jobCleanup)
または次のコードを使用してクリーンアップをしようと "しないで" ください。
val cleanupThread = new Thread { override def run = jobCleanup() }
Runtime.getRuntime.addShutdownHook(cleanupThread)
Spark コンテナーの有効期間が Azure Databricks で管理される方法が原因で、シャットダウン フックが確実に実行されません。
JAR ジョブ パラメーターを構成する
JSON 文字列配列を使用して JAR ジョブにパラメーターを渡します。 Jobs API の新しいジョブの作成操作 (POST /jobs/create
) に渡される要求本文の spark_jar_task
オブジェクトを参照してください。 これらのパラメーターにアクセスするには、main
関数に渡された String
配列を調べます。
ライブラリの依存関係を管理する
Spark ドライバーには、オーバーライドできない特定のライブラリの依存関係があります。 ジョブによって競合するライブラリが追加される場合は、Spark ドライバー ライブラリの依存関係が優先されます。
ドライバー ライブラリの依存関係の完全なリストを取得するには、同じ Spark バージョンを使用して構成されたクラスター (または調査するドライバーを含むクラスター) に接続されているノートブック内で次のコマンドを実行します。
%sh
ls /databricks/jars
JAR のライブラリの依存関係を定義する場合、Spark と Hadoop を provided
依存関係として一覧表示することをお勧めします。 Maven で、指定された依存関係として Spark と Hadoop を追加します。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
<scope>provided</scope>
</dependency>
sbt
で、指定された依存関係として Spark と Hadoop を追加します。
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.0" % "provided"
libraryDependencies += "org.apache.hadoop" %% "hadoop-core" % "1.2.1" % "provided"
ヒント
実行しているバージョンに基づいて、依存関係に適切な Scala バージョンを指定します。
次のステップ
Azure Databricks ジョブの作成と実行の詳細については、「ワークフローのスケジューリングとオーケストレーション」をご覧ください。