在 Azure Databricks 作业中使用 JAR
Java 存档或 JAR (https://en.wikipedia.org/wiki/JAR_(file_format)) 文件格式基于常用的 ZIP 文件格式,用于将多个 Java 或 Scala 文件聚合为一个。 使用 JAR 任务,可以确保在 Azure Databricks 作业中快速可靠地安装 Java 或 Scala 代码。 本文提供了一个示例,演示如何创建 JAR 以及如何创建一个运行在 JAR 中打包的应用程序的作业。 在此示例中,你将:
- 创建定义示例应用程序的 JAR 项目。
- 将示例文件捆绑到 JAR 中。
- 创建一个作业来运行 JAR。
- 运行作业并查看结果。
开始之前的准备工作
若要完成此示例,需要准备好以下各项:
- 对应于 Java JAR 的 Java 开发工具包 (JDK)。
- 对应于 Scala JAR 的 JDK 和 sbt。
步骤 1:为示例创建本地目录
创建一个本地目录用于保存示例代码和生成的项目,例如 databricks_jar_test
。
步骤 2:创建 JAR
按照以下说明使用 Java 或 Scala 创建 JAR。
创建 Java JAR
在
databricks_jar_test
文件夹中创建名为PrintArgs.java
的文件,其中包含以下内容:import java.util.Arrays; public class PrintArgs { public static void main(String[] args) { System.out.println(Arrays.toString(args)); } }
编译
PrintArgs.java
文件,以创建PrintArgs.class
文件:javac PrintArgs.java
(可选)运行编译的程序:
java PrintArgs Hello World! # [Hello, World!]
在
PrintArgs.java
和PrintArgs.class
文件所在的文件夹中,创建一个名为META-INF
的文件夹。在
META-INF
文件夹中,创建名为MANIFEST.MF
的文件,包含以下内容。 请务必在此文件末尾添加一个换行符:Main-Class: PrintArgs
从
databricks_jar_test
文件夹的根目录创建一个名为PrintArgs.jar
的 JAR:jar cvfm PrintArgs.jar META-INF/MANIFEST.MF *.class
(可选)若要测试它,请从
databricks_jar_test
文件夹的根目录运行 JAR:java -jar PrintArgs.jar Hello World! # [Hello, World!]
注意
如果遇到错误“
no main manifest attribute, in PrintArgs.jar
”,请务必在MANIFEST.MF
文件的末尾添加换行符,然后再次尝试创建并运行 JAR。将
PrintArgs.jar
上传到某个卷。 请参阅将文件上传到 Unity Catalog 卷。
创建 Scala JAR
在
databricks_jar_test
文件夹中创建名为build.sbt
的空文件,在其中添加以下内容:ThisBuild / scalaVersion := "2.12.14" ThisBuild / organization := "com.example" lazy val PrintArgs = (project in file(".")) .settings( name := "PrintArgs" )
在
databricks_jar_test
文件夹中,创建文件夹结构src/main/scala/example
。在
example
文件夹中创建名为PrintArgs.scala
的文件,其中包含以下内容:package example object PrintArgs { def main(args: Array[String]): Unit = { println(args.mkString(", ")) } }
编译程序:
sbt compile
(可选)运行编译的程序:
sbt "run Hello World\!" # Hello, World!
在
databricks_jar_test/project
文件夹中创建名为assembly.sbt
的文件,其中包含以下内容:addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.0.0")
从
databricks_jar_test
文件夹的根目录中,运行assembly
命令,该命令会在target
文件夹下生成 JAR:sbt assembly
(可选)若要测试它,请从
databricks_jar_test
文件夹的根目录运行 JAR:java -jar target/scala-2.12/PrintArgs-assembly-0.1.0-SNAPSHOT.jar Hello World! # Hello, World!
将
PrintArgs-assembly-0.1.0-SNAPSHOT.jar
上传到某个卷。 请参阅将文件上传到 Unity Catalog 卷。
步骤 3. 创建一个 Azure Databricks 作业来运行 JAR
- 转到 Azure Databricks 登陆页并执行以下操作之一:
- 在边栏中,单击“ 工作流”,然后单击“”。
- 在边栏中,单击 “新建”,然后从菜单中选择“作业”。
- 在“任务”选项卡上显示的任务对话框中,将“为作业添加名称...”替换为你的作业名称,例如
JAR example
。 - 对于“任务名称”,请输入任务的名称,例如
java_jar_task
(适用于 Java)或scala_jar_task
(适用于 Scala)。 - 对于“类型”,请选择“JAR”。
- 对于此示例,请输入
PrintArgs
(适用于 Java)或example.PrintArgs
(适用于 Scala)作为“Main 类”。 - 对于“群集”,选择兼容的群集。 请参阅 Java 和 Scala 库支持。
- 对于“依赖库”,请单击“+ 添加”。
- 在已选中“卷”的“添加依赖库”对话框中,将已在上一步上传了 JAR(
PrintArgs.jar
或PrintArgs-assembly-0.1.0-SNAPSHOT.jar
)的位置输入到“卷文件路径”中,或通过筛选或浏览来查找 JAR。 选择该文件夹。 - 单击“添加” 。
- 对于此示例,请输入
["Hello", "World!"]
作为“参数”。 - 单击 添加。
步骤 4:运行作业并查看作业运行详细信息
单击 以运行工作流。 若要查看运行详细信息,请在“已触发的运行”弹出窗口中单击“查看运行”,或者在作业运行视图中单击运行“开始时间”列中的链接。
运行完成后,输出会显示在“输出”面板中,其中包括传递给任务的参数。
JAR 作业的输出大小限制
作业输出(如发送到 stdout 的日志输出)的大小限制为 20MB。 如果总输出更大,将取消运行并将其标记为失败。
若要避免遇到此限制,可以通过将 spark.databricks.driver.disableScalaOutput
Spark 配置设置为 true
来阻止 stdout 从驱动程序返回到 Azure Databricks。 默认情况下,标志值为 false
。 该标志控制 Scala JAR 作业和 Scala 笔记本的单元格输出。 如果启用该标志,Spark 不会将作业执行结果返回给客户端。 该标志不影响写入群集日志文件中的数据。 Databricks 建议仅为 JAR 作业的作业群集设置此标志,因为此标志会禁用笔记本结果。
建议:使用共享的 SparkContext
由于 Azure Databricks 是一个托管服务,因此可能需要进行一些代码更改才能确保 Apache Spark 作业正常运行。 JAR 作业程序必须使用共享 SparkContext
API 来获取 SparkContext
。 由于 Azure Databricks 初始化 SparkContext
,因此调用 new SparkContext()
的程序会失败。 若要获取 SparkContext
,请只使用由 Azure Databricks 创建的共享 SparkContext
:
val goodSparkContext = SparkContext.getOrCreate()
val goodSparkSession = SparkSession.builder().getOrCreate()
在使用共享 SparkContext
时,还应避免使用几种方法。
- 请勿调用
SparkContext.stop()
。 - 请勿在
Main
程序的末尾调用System.exit(0)
或sc.stop()
。 这可能会导致未定义的行为。
建议:使用 try-finally
块进行作业清理
假设有一个由两部分组成的 JAR:
jobBody()
,包含作业的主要部分。jobCleanup()
必须在jobBody()
之后执行,不管该函数是成功还是返回了异常。
例如,jobBody()
创建表,而 jobCleanup()
会删除这些表。
若要确保调用清理方法,安全的方法是在代码中放置一个 try-finally
块:
try {
jobBody()
} finally {
jobCleanup()
}
不应尝试使用 sys.addShutdownHook(jobCleanup)
或以下代码进行清理:
val cleanupThread = new Thread { override def run = jobCleanup() }
Runtime.getRuntime.addShutdownHook(cleanupThread)
由于 Spark 容器的生存期在 Azure Databricks 中的管理方式,shutdown 挂钩的运行并不可靠。
配置 JAR 作业参数
使用 JSON 字符串数组将参数传递给 JAR 作业。 请参阅传递给作业 API 中spark_jar_task
操作 (POST /jobs/create
) 的请求正文中的 spark_jar_task
对象。 若要访问这些参数,请检查传入到 main
函数中的 String
数组。
管理库依赖项
Spark 驱动程序的某些库依赖项不能重写。 如果作业添加了有冲突的库,则以 Spark 驱动程序库依赖项优先。
若要获取驱动程序库依赖项的完整列表,请在附加到使用同一 Spark 版本配置的群集(或包含要检查的驱动程序的群集)的笔记本中运行以下命令:
%sh
ls /databricks/jars
为 JAR 定义库依赖项时,Databricks 建议将 Spark 和 Hadoop 列为 provided
依赖项。 在 Maven 中,添加 Spark 和 Hadoop 作为提供的依赖项:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
<scope>provided</scope>
</dependency>
在 sbt
中,添加 Spark 和 Hadoop 作为提供的依赖项:
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.0" % "provided"
libraryDependencies += "org.apache.hadoop" %% "hadoop-core" % "1.2.1" % "provided"
提示
为依赖项指定正确的 Scala 版本,具体取决于你所运行的版本。
后续步骤
要详细了解如何创建和运行 Azure Databricks 作业,请参阅计划和编排工作流。