適用於 Scala 的 Databricks Connect 程式代碼範例
注意
本文涵蓋 Databricks Runtime 13.3 LTS 和更新版本適用的 Databricks Connect。
本文提供使用 Databricks Connect for Scala 的程式代碼範例。 Databricks Connect 可讓您將熱門的 IDE、Notebook 伺服器和自定義應用程式連線到 Azure Databricks 叢集。 請參閱什麼是 Databricks Connect?。 如需本文的 Python 版本,請參閱 Databricks Connect for Python 的程式碼範例。
注意
開始使用 Databricks Connect 之前,您必須先 設定 Databricks Connect 用戶端。
Databricks 提供數個額外的範例應用程式,示範如何使用 Databricks Connect。 請參閱 GitHub 中 Databricks Connect 存放庫的範例應用程式,特別是:
您也可以使用下列更簡單的程式代碼範例來實驗 Databricks Connect。 這些範例假設您使用 Databricks Connect 用戶端設定的預設驗證。
這個簡單的程式代碼範例會查詢指定的數據表,然後顯示指定的數據表的前 5 個數據列。 若要使用不同的數據表,請調整 對 spark.read.table
的呼叫。
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
object Main {
def main(args: Array[String]): Unit = {
val spark = DatabricksSession.builder().getOrCreate()
val df = spark.read.table("samples.nyctaxi.trips")
df.limit(5).show()
}
}
這個較長的程式代碼範例會執行下列動作:
- 建立記憶體內部 DataFrame。
- 使用架構內
default
的名稱zzz_demo_temps_table
建立數據表。 如果已有這個名稱的數據表存在,則會先刪除數據表。 若要使用不同的架構或數據表,請調整對spark.sql
、temps.write.saveAsTable
或兩者的呼叫。 - 將 DataFrame 的內容儲存至數據表。
SELECT
在數據表的內容上執行查詢。- 顯示查詢的結果。
- 刪除資料表。
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import java.time.LocalDate
object Main {
def main(args: Array[String]): Unit = {
val spark = DatabricksSession.builder().getOrCreate()
// Create a Spark DataFrame consisting of high and low temperatures
// by airport code and date.
val schema = StructType(
Seq(
StructField("AirportCode", StringType, false),
StructField("Date", DateType, false),
StructField("TempHighF", IntegerType, false),
StructField("TempLowF", IntegerType, false)
)
)
val data = Seq(
( "BLI", LocalDate.of(2021, 4, 3), 52, 43 ),
( "BLI", LocalDate.of(2021, 4, 2), 50, 38),
( "BLI", LocalDate.of(2021, 4, 1), 52, 41),
( "PDX", LocalDate.of(2021, 4, 3), 64, 45),
( "PDX", LocalDate.of(2021, 4, 2), 61, 41),
( "PDX", LocalDate.of(2021, 4, 1), 66, 39),
( "SEA", LocalDate.of(2021, 4, 3), 57, 43),
( "SEA", LocalDate.of(2021, 4, 2), 54, 39),
( "SEA", LocalDate.of(2021, 4, 1), 56, 41)
)
val temps = spark.createDataFrame(data).toDF(schema.fieldNames: _*)
// Create a table on the Databricks cluster and then fill
// the table with the DataFrame 's contents.
// If the table already exists from a previous run,
// delete it first.
spark.sql("USE default")
spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table")
temps.write.saveAsTable("zzz_demo_temps_table")
// Query the table on the Databricks cluster, returning rows
// where the airport code is not BLI and the date is later
// than 2021-04-01.Group the results and order by high
// temperature in descending order.
val df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
"GROUP BY AirportCode, Date, TempHighF, TempLowF " +
"ORDER BY TempHighF DESC")
df_temps.show()
// Results:
// +------------+-----------+---------+--------+
// | AirportCode| Date|TempHighF|TempLowF|
// +------------+-----------+---------+--------+
// | PDX | 2021-04-03| 64 | 45 |
// | PDX | 2021-04-02| 61 | 41 |
// | SEA | 2021-04-03| 57 | 43 |
// | SEA | 2021-04-02| 54 | 39 |
// +------------+-----------+---------+--------+
// Clean up by deleting the table from the Databricks cluster.
spark.sql("DROP TABLE zzz_demo_temps_table")
}
}
注意
下列範例說明如何在 Databricks Connect 中的類別無法使用的情況下,使用 SparkSession
類別 DatabricksSession
。
這個範例會查詢指定的數據表,並傳回前 5 個數據列。 此範例會 SPARK_REMOTE
使用環境變數進行驗證。
import org.apache.spark.sql.{DataFrame, SparkSession}
object Main {
def main(args: Array[String]): Unit = {
getTaxis(getSpark()).show(5)
}
private def getSpark(): SparkSession = {
SparkSession.builder().getOrCreate()
}
private def getTaxis(spark: SparkSession): DataFrame = {
spark.read.table("samples.nyctaxi.trips")
}
}