适用于 Python 的 Databricks Connect 的高级用法

注意

本文介绍适用于 Databricks Runtime 14.0 及更高版本的 Databricks Connect。

本文介绍的主题超出了 Databricks Connect 的基本设置。

配置 Spark Connect 连接字符串

除了使用“配置与群集的连接”中所述的选项连接到群集之外,更高级的选项是使用 Spark Connect 连接字符串进行连接。 可在 remote 函数中传递该字符串,也可设置 SPARK_REMOTE 环境变量。

注意

只能使用 Databricks 个人访问令牌身份验证通过 Spark Connect 连接字符串进行连接。

使用 remote 函数设置连接字符串:

# Set the Spark Connect connection string in DatabricksSession.builder.remote.
from databricks.connect import DatabricksSession

workspace_instance_name = retrieve_workspace_instance_name()
token                   = retrieve_token()
cluster_id              = retrieve_cluster_id()

spark = DatabricksSession.builder.remote(
   f"sc://{workspace_instance_name}:443/;token={token};x-databricks-cluster-id={cluster_id}"
).getOrCreate()

或者,设置 SPARK_REMOTE 环境变量:

sc://<workspace-instance-name>:443/;token=<access-token-value>;x-databricks-cluster-id=<cluster-id>

然后如下所示初始化 DatabricksSession 类:

from databricks.connect import DatabricksSession

spark = DatabricksSession.builder.getOrCreate()

Pyspark shell

适用于 Python 的 Databricks Connect 附带一个 pyspark 二进制文件,该二级制文件是配置为使用 Databricks Connect 的 PySpark REPL (Spark shell)。

在没有其他参数的情况下启动时,shell 会从环境(例如, DATABRICKS_ 环境变量或 DEFAULT 配置文件)中选取默认凭据,以连接到 Azure Databricks 群集。 有关配置连接的信息,请参阅 Databricks Connect 的计算配置。

  1. 若要启动 Spark shell 并将其连接到正在运行的群集,请从已激活的 Python 虚拟环境运行以下命令之一:

    pyspark
    

    此时会显示 Spark shell,例如:

    Python 3.10 ...
    [Clang ...] on darwin
    Type "help", "copyright", "credits" or "license" for more information.
    Welcome to
         ____              __
        / __/__  ___ _____/ /__
       _\ \/ _ \/ _ `/ __/  '_/
      /__ / .__/\_,_/_/ /_/\_\   version 13.x.dev0
         /_/
    
    Using Python version 3.10 ...
    Client connected to the Spark Connect server at sc://...:.../;token=...;x-databricks-cluster-id=...
    SparkSession available as 'spark'.
    >>>
    

    启动 shell 后,该 spark 对象可在 Databricks 群集上运行 Apache Spark 命令。 运行简单的 PySpark 命令,例如 spark.range(1,10).show()。 如果未出现错误,则表示连接成功。

  2. 有关如何配合使用 Spark shell 和 Python 在计算上运行命令的信息,请参阅使用 Spark Shell 进行交互式分析

    使用内置的 spark 变量表示正在运行的群集上的 SparkSession,例如:

    >>> df = spark.read.table("samples.nyctaxi.trips")
    >>> df.show(5)
    +--------------------+---------------------+-------------+-----------+----------+-----------+
    |tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|fare_amount|pickup_zip|dropoff_zip|
    +--------------------+---------------------+-------------+-----------+----------+-----------+
    | 2016-02-14 16:52:13|  2016-02-14 17:16:04|         4.94|       19.0|     10282|      10171|
    | 2016-02-04 18:44:19|  2016-02-04 18:46:00|         0.28|        3.5|     10110|      10110|
    | 2016-02-17 17:13:57|  2016-02-17 17:17:55|          0.7|        5.0|     10103|      10023|
    | 2016-02-18 10:36:07|  2016-02-18 10:41:45|          0.8|        6.0|     10022|      10017|
    | 2016-02-22 14:14:41|  2016-02-22 14:31:52|         4.51|       17.0|     10110|      10282|
    +--------------------+---------------------+-------------+-----------+----------+-----------+
    only showing top 5 rows
    

    所有 Python 代码都在本地运行,而涉及 DataFrame 操作的 PySpark 代码在远程 Azure Databricks 工作区中的群集上运行,运行响应发送回给本地调用方。

  3. 若要停止 Spark shell,请按 Ctrl + dCtrl + z,或者运行命令 quit()exit()

其他 HTTP 标头

Databricks Connect 通过 HTTP/2 使用 gRPC 与 Databricks 群集通信。

某些高级用户可能选择在客户端和 Azure Databricks 群集之间安装代理服务,以便更好地控制来自其客户端的请求。

在某些情况下,代理可能需要 HTTP 请求中的自定义标头。

headers() 方法可用于将自定义标头添加到其 HTTP 请求。

spark = DatabricksSession.builder.header('x-custom-header', 'value').getOrCreate()

Certificates

如果群集依赖于自定义 SSL/TLS 证书来解析 Azure Databricks 工作区完全限定的域名 (FQDN),则必须在本地开发计算机上设置环境变量 GRPC_DEFAULT_SSL_ROOTS_FILE_PATH。 此环境变量必须设置为群集上已安装证书的完整路径。

例如,在 Python 代码中设置此环境变量,如下所示:

import os

os.environ["GRPC_DEFAULT_SSL_ROOTS_FILE_PATH"] = "/etc/ssl/certs/ca-bundle.crt"

有关设置环境变量的其他方式,请参阅操作系统的文档。

日志记录和调试日志

适用于 Python 的 Databricks Connect 使用标准 Python 日志记录生成日志。

日志将发送到标准错误流 (stderr),默认情况下,仅发出 WARN 级别和更高级别的日志

设置环境变量 SPARK_CONNECT_LOG_LEVEL=debug 将修改此默认值,并打印 DEBUG 级别及更高级别的所有日志消息。