适用于 Python 的 Databricks Connect 的代码示例

注意

本文介绍适用于 Databricks Runtime 13.3 LTS 及更高版本的 Databricks Connect。

本文提供使用适用于 Python 的 Databricks Connect 的代码示例。 Databricks Connect 使你能够将常用 IDE、笔记本服务器和自定义应用程序连接到 Azure Databricks 群集。 请参阅什么是 Databricks Partner Connect?。 有关本文的 Scala 版本,请参阅适用于 Scala 的 Databricks Connect 代码示例

注意

在开始使用 Databricks Connect 之前,必须先设置 Databricks Connect 客户端

Databricks 额外提供了几个示例应用,以演示如何使用 Databricks Connect。 请参阅 GitHub 中的 Databricks Connect 示例应用程序存储库,具体如下:

也可以使用以下更简单的代码示例来体验 Databricks Connect。 这些示例假定你对 Databricks Connect 客户端设置使用默认身份验证。

此简单代码示例将查询指定的表,然后显示指定的表的前 5 行。 若要使用其他表,请调整对 spark.read.table 的调用。

from databricks.connect import DatabricksSession

spark = DatabricksSession.builder.getOrCreate()

df = spark.read.table("samples.nyctaxi.trips")
df.show(5)

此较长代码示例执行以下操作:

  1. 创建内存中数据帧。
  2. default 架构中创建名为 zzz_demo_temps_table 的表。 如果已存在同名的表,则先删除该表。 若要使用其他架构或表,请调整对 spark.sql 和/或 temps.write.saveAsTable 的调用。
  3. 将数据帧的内容保存到表中。
  4. 对表的内容运行 SELECT 查询。
  5. 显示查询的结果。
  6. 删除表。
from databricks.connect import DatabricksSession
from pyspark.sql.types import *
from datetime import date

spark = DatabricksSession.builder.getOrCreate()

# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
  StructField('AirportCode', StringType(), False),
  StructField('Date', DateType(), False),
  StructField('TempHighF', IntegerType(), False),
  StructField('TempLowF', IntegerType(), False)
])

data = [
  [ 'BLI', date(2021, 4, 3), 52, 43],
  [ 'BLI', date(2021, 4, 2), 50, 38],
  [ 'BLI', date(2021, 4, 1), 52, 41],
  [ 'PDX', date(2021, 4, 3), 64, 45],
  [ 'PDX', date(2021, 4, 2), 61, 41],
  [ 'PDX', date(2021, 4, 1), 66, 39],
  [ 'SEA', date(2021, 4, 3), 57, 43],
  [ 'SEA', date(2021, 4, 2), 54, 39],
  [ 'SEA', date(2021, 4, 1), 56, 41]
]

temps = spark.createDataFrame(data, schema)

# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS zzz_demo_temps_table')
temps.write.saveAsTable('zzz_demo_temps_table')

# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " \
  "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
  "GROUP BY AirportCode, Date, TempHighF, TempLowF " \
  "ORDER BY TempHighF DESC")
df_temps.show()

# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode|      Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# |        PDX|2021-04-03|       64|      45|
# |        PDX|2021-04-02|       61|      41|
# |        SEA|2021-04-03|       57|      43|
# |        SEA|2021-04-02|       54|      39|
# +-----------+----------+---------+--------+

# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE zzz_demo_temps_table')

注意

以下示例介绍如何在 DatabricksSession 类不可用的环境中编写可在 Databricks Connect for Databricks Runtime 13.3 LTS 及更高版本之间移植的代码。

以下示例使用 DatabricksSession 类(如果 DatabricksSession 类不可用,则使用 SparkSession 类)来查询指定的表并返回前 5 行。 此示例使用 SPARK_REMOTE 环境变量进行身份验证。

from pyspark.sql import SparkSession, DataFrame

def get_spark() -> SparkSession:
  try:
    from databricks.connect import DatabricksSession
    return DatabricksSession.builder.getOrCreate()
  except ImportError:
    return SparkSession.builder.getOrCreate()

def get_taxis(spark: SparkSession) -> DataFrame:
  return spark.read.table("samples.nyctaxi.trips")

get_taxis(get_spark()).show(5)