在 Azure Databricks 作业中使用 JAR

Java 存档或 JAR (https://en.wikipedia.org/wiki/JAR_(file_format)) 文件格式基于常用的 ZIP 文件格式,用于将多个 Java 或 Scala 文件聚合为一个。 使用 JAR 任务,可以确保在 Azure Databricks 作业中快速可靠地安装 Java 或 Scala 代码。 本文提供了一个示例,演示如何创建 JAR 以及如何创建一个运行在 JAR 中打包的应用程序的作业。 在此示例中,你将:

  • 创建定义示例应用程序的 JAR 项目。
  • 将示例文件捆绑到 JAR 中。
  • 创建一个作业来运行 JAR。
  • 运行作业并查看结果。

开始之前的准备工作

若要完成此示例,需要准备好以下各项:

  • 对应于 Java JAR 的 Java 开发工具包 (JDK)。
  • 对应于 Scala JAR 的 JDK 和 sbt。

步骤 1:为示例创建本地目录

创建一个本地目录用于保存示例代码和生成的项目,例如 databricks_jar_test

步骤 2:创建 JAR

按照以下说明使用 Java 或 Scala 创建 JAR。

创建 Java JAR

  1. databricks_jar_test 文件夹中创建名为 PrintArgs.java 的文件,其中包含以下内容:

    import java.util.Arrays;
    
    public class PrintArgs {
      public static void main(String[] args) {
        System.out.println(Arrays.toString(args));
      }
    }
    
  2. 编译 PrintArgs.java 文件,以创建 PrintArgs.class 文件:

    javac PrintArgs.java
    
  3. (可选)运行编译的程序:

    java PrintArgs Hello World!
    
    # [Hello, World!]
    
  4. PrintArgs.javaPrintArgs.class 文件所在的文件夹中,创建一个名为 META-INF 的文件夹。

  5. META-INF 文件夹中,创建名为 MANIFEST.MF 的文件,包含以下内容。 请务必在此文件末尾添加一个换行符:

    Main-Class: PrintArgs
    
  6. databricks_jar_test 文件夹的根目录创建一个名为 PrintArgs.jar 的 JAR:

    jar cvfm PrintArgs.jar META-INF/MANIFEST.MF *.class
    
  7. (可选)若要测试它,请从 databricks_jar_test 文件夹的根目录运行 JAR:

    java -jar PrintArgs.jar Hello World!
    
    # [Hello, World!]
    

    注意

    如果遇到错误“no main manifest attribute, in PrintArgs.jar”,请务必在 MANIFEST.MF 文件的末尾添加换行符,然后再次尝试创建并运行 JAR。

  8. PrintArgs.jar 上传到某个卷。 请参阅将文件上传到 Unity Catalog 卷

创建 Scala JAR

  1. databricks_jar_test 文件夹中创建名为 build.sbt 的空文件,在其中添加以下内容:

    ThisBuild / scalaVersion := "2.12.14"
    ThisBuild / organization := "com.example"
    
    lazy val PrintArgs = (project in file("."))
      .settings(
        name := "PrintArgs"
      )
    
  2. databricks_jar_test 文件夹中,创建文件夹结构 src/main/scala/example

  3. example 文件夹中创建名为 PrintArgs.scala 的文件,其中包含以下内容:

    package example
    
    object PrintArgs {
      def main(args: Array[String]): Unit = {
        println(args.mkString(", "))
      }
    }
    
  4. 编译程序:

    sbt compile
    
  5. (可选)运行编译的程序:

    sbt "run Hello World\!"
    
    # Hello, World!
    
  6. databricks_jar_test/project 文件夹中创建名为 assembly.sbt 的文件,其中包含以下内容:

    addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.0.0")
    
  7. databricks_jar_test 文件夹的根目录中,运行 assembly 命令,该命令会在 target 文件夹下生成 JAR:

    sbt assembly
    
  8. (可选)若要测试它,请从 databricks_jar_test 文件夹的根目录运行 JAR:

    java -jar target/scala-2.12/PrintArgs-assembly-0.1.0-SNAPSHOT.jar Hello World!
    
    # Hello, World!
    
  9. PrintArgs-assembly-0.1.0-SNAPSHOT.jar 上传到某个卷。 请参阅将文件上传到 Unity Catalog 卷

步骤 3. 创建一个 Azure Databricks 作业来运行 JAR

  1. 转到 Azure Databricks 登陆页并执行以下操作之一:
    • 在边栏中,单击““工作流”图标 工作流”,然后单击“创建作业按钮
    • 在边栏中,单击 新建图标新建”,然后从菜单中选择“作业”。
  2. 在“任务”选项卡上显示的任务对话框中,将“为作业添加名称...”替换为你的作业名称,例如 JAR example
  3. 对于“任务名称”,请输入任务的名称,例如 java_jar_task(适用于 Java)或 scala_jar_task(适用于 Scala)。
  4. 对于“类型”,请选择“JAR”。
  5. 对于此示例,请输入 PrintArgs(适用于 Java)或 example.PrintArgs(适用于 Scala)作为“Main 类”。
  6. 对于“群集”,选择兼容的群集。 请参阅 Java 和 Scala 库支持
  7. 对于“依赖库”,请单击“+ 添加”。
  8. 在已选中“卷”的“添加依赖库”对话框中,将已在上一步上传了 JAR(PrintArgs.jarPrintArgs-assembly-0.1.0-SNAPSHOT.jar)的位置输入到“卷文件路径”中,或通过筛选或浏览来查找 JAR。 选择该文件夹。
  9. 单击“添加” 。
  10. 对于此示例,请输入 ["Hello", "World!"] 作为“参数”。
  11. 单击 添加

步骤 4:运行作业并查看作业运行详细信息

单击 立即运行按钮 以运行工作流。 若要查看运行详细信息,请在“已触发的运行”弹出窗口中单击“查看运行”,或者在作业运行视图中单击运行“开始时间”列中的链接。

运行完成后,输出会显示在“输出”面板中,其中包括传递给任务的参数。

JAR 作业的输出大小限制

作业输出(如发送到 stdout 的日志输出)的大小限制为 20MB。 如果总输出更大,将取消运行并将其标记为失败。

若要避免遇到此限制,可以通过将 spark.databricks.driver.disableScalaOutput Spark 配置设置为 true 来阻止 stdout 从驱动程序返回到 Azure Databricks。 默认情况下,标志值为 false。 该标志控制 Scala JAR 作业和 Scala 笔记本的单元格输出。 如果启用该标志,Spark 不会将作业执行结果返回给客户端。 该标志不影响写入群集日志文件中的数据。 Databricks 建议仅为 JAR 作业的作业群集设置此标志,因为此标志会禁用笔记本结果。

建议:使用共享的 SparkContext

由于 Azure Databricks 是一个托管服务,因此可能需要进行一些代码更改才能确保 Apache Spark 作业正常运行。 JAR 作业程序必须使用共享 SparkContext API 来获取 SparkContext。 由于 Azure Databricks 初始化 SparkContext,因此调用 new SparkContext() 的程序会失败。 若要获取 SparkContext,请只使用由 Azure Databricks 创建的共享 SparkContext

val goodSparkContext = SparkContext.getOrCreate()
val goodSparkSession = SparkSession.builder().getOrCreate()

在使用共享 SparkContext 时,还应避免使用几种方法。

  • 请勿调用 SparkContext.stop()
  • 请勿在 Main 程序的末尾调用 System.exit(0)sc.stop()。 这可能会导致未定义的行为。

建议:使用 try-finally 块进行作业清理

假设有一个由两部分组成的 JAR:

  • jobBody(),包含作业的主要部分。
  • jobCleanup() 必须在 jobBody() 之后执行,不管该函数是成功还是返回了异常。

例如,jobBody() 创建表,而 jobCleanup() 会删除这些表。

若要确保调用清理方法,安全的方法是在代码中放置一个 try-finally 块:

try {
  jobBody()
} finally {
  jobCleanup()
}

不应尝试使用 sys.addShutdownHook(jobCleanup) 或以下代码进行清理:

val cleanupThread = new Thread { override def run = jobCleanup() }
Runtime.getRuntime.addShutdownHook(cleanupThread)

由于 Spark 容器的生存期在 Azure Databricks 中的管理方式,shutdown 挂钩的运行并不可靠。

配置 JAR 作业参数

使用 JSON 字符串数组将参数传递给 JAR 作业。 请参阅传递给作业 API 中spark_jar_task操作 (POST /jobs/create) 的请求正文中的 spark_jar_task 对象。 若要访问这些参数,请检查传入到 main 函数中的 String 数组。

管理库依赖项

Spark 驱动程序的某些库依赖项不能重写。 如果作业添加了有冲突的库,则以 Spark 驱动程序库依赖项优先。

若要获取驱动程序库依赖项的完整列表,请在附加到使用同一 Spark 版本配置的群集(或包含要检查的驱动程序的群集)的笔记本中运行以下命令:

%sh
ls /databricks/jars

为 JAR 定义库依赖项时,Databricks 建议将 Spark 和 Hadoop 列为 provided 依赖项。 在 Maven 中,添加 Spark 和 Hadoop 作为提供的依赖项:

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.3.0</version>
  <scope>provided</scope>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-core</artifactId>
  <version>1.2.1</version>
  <scope>provided</scope>
</dependency>

sbt 中,添加 Spark 和 Hadoop 作为提供的依赖项:

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.0" % "provided"
libraryDependencies += "org.apache.hadoop" %% "hadoop-core" % "1.2.1" % "provided"

提示

为依赖项指定正确的 Scala 版本,具体取决于你所运行的版本。

后续步骤

要详细了解如何创建和运行 Azure Databricks 作业,请参阅计划和编排工作流