Condividi tramite


Esempi di codice per Databricks Connect per Scala

Nota

Questo articolo illustra Databricks Connect per Databricks Runtime 13.3 LTS e versioni successive.

Questo articolo fornisce esempi di codice che usano Databricks Connect per Scala. Databricks Connect consente di connettere gli IDE, i server notebook e le applicazioni personalizzate più diffusi ai cluster Azure Databricks. Consultare Cos’è Databricks Connect?. Per la versione Python di questo articolo, vedere Esempi di codice per Databricks Connect per Python.

Nota

Prima di iniziare a usare Databricks Connect, è necessario configurare il client Databricks Connect.

Databricks offre diverse applicazioni di esempio aggiuntive che illustrano come usare Databricks Connect. Vedere le applicazioni di esempio per il repository Databricks Connect in GitHub, in particolare:

È anche possibile usare gli esempi di codice più semplici seguenti per sperimentare con Databricks Connect. Questi esempi presuppongono che si stia usando l'autenticazione predefinita per la configurazione del client Databricks Connect.

Questo semplice esempio di codice esegue una query sulla tabella specificata e quindi mostra le prime 5 righe della tabella specificata. Per usare una tabella diversa, modificare la chiamata a spark.read.table.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession

object Main {
  def main(args: Array[String]): Unit = {
    val spark = DatabricksSession.builder().getOrCreate()
    val df = spark.read.table("samples.nyctaxi.trips")
    df.limit(5).show()
  }
}

Questo esempio di codice più lungo esegue le operazioni seguenti:

  1. Crea un dataframe in memoria.
  2. Crea una tabella con il nome zzz_demo_temps_table all'interno dello schema default. Se la tabella con questo nome esiste già, la tabella viene eliminata per prima. Per usare uno schema o una tabella diversa, modificare le chiamate a spark.sql, temps.write.saveAsTableo entrambe.
  3. Salva il contenuto del dataframe nella tabella.
  4. Esegue una query SELECT sul contenuto della tabella.
  5. Mostra il risultato della query.
  6. Elimina la tabella.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import java.time.LocalDate

object Main {
  def main(args: Array[String]): Unit = {
    val spark = DatabricksSession.builder().getOrCreate()

    // Create a Spark DataFrame consisting of high and low temperatures
    // by airport code and date.
    val schema = StructType(
      Seq(
        StructField("AirportCode", StringType, false),
        StructField("Date", DateType, false),
        StructField("TempHighF", IntegerType, false),
        StructField("TempLowF", IntegerType, false)
      )
    )

    val data = Seq(
      ( "BLI", LocalDate.of(2021, 4, 3), 52, 43 ),
      ( "BLI", LocalDate.of(2021, 4, 2), 50, 38),
      ( "BLI", LocalDate.of(2021, 4, 1), 52, 41),
      ( "PDX", LocalDate.of(2021, 4, 3), 64, 45),
      ( "PDX", LocalDate.of(2021, 4, 2), 61, 41),
      ( "PDX", LocalDate.of(2021, 4, 1), 66, 39),
      ( "SEA", LocalDate.of(2021, 4, 3), 57, 43),
      ( "SEA", LocalDate.of(2021, 4, 2), 54, 39),
      ( "SEA", LocalDate.of(2021, 4, 1), 56, 41)
    )

    val temps = spark.createDataFrame(data).toDF(schema.fieldNames: _*)

    // Create a table on the Databricks cluster and then fill
    // the table with the DataFrame 's contents.
    // If the table already exists from a previous run,
    // delete it first.
    spark.sql("USE default")
    spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table")
    temps.write.saveAsTable("zzz_demo_temps_table")

    // Query the table on the Databricks cluster, returning rows
    // where the airport code is not BLI and the date is later
    // than 2021-04-01.Group the results and order by high
    // temperature in descending order.
    val df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
      "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
      "GROUP BY AirportCode, Date, TempHighF, TempLowF " +
      "ORDER BY TempHighF DESC")
    df_temps.show()

    // Results:
    // +------------+-----------+---------+--------+
    // | AirportCode|       Date|TempHighF|TempLowF|
    // +------------+-----------+---------+--------+
    // |        PDX | 2021-04-03|      64 |     45 |
    // |        PDX | 2021-04-02|      61 |     41 |
    // |        SEA | 2021-04-03|      57 |     43 |
    // |        SEA | 2021-04-02|      54 |     39 |
    // +------------+-----------+---------+--------+

    // Clean up by deleting the table from the Databricks cluster.
    spark.sql("DROP TABLE zzz_demo_temps_table")
  }
}

Nota

Nell'esempio seguente viene descritto come usare la classe SparkSession nei casi in cui la classe DatabricksSession in Databricks Connect non è disponibile.

Questo esempio esegue una query sulla tabella specificata e restituisce le prime 5 righe. In questo esempio viene usata la SPARK_REMOTE variabile di ambiente per l'autenticazione.

import org.apache.spark.sql.{DataFrame, SparkSession}

object Main {
  def main(args: Array[String]): Unit = {
    getTaxis(getSpark()).show(5)
  }

  private def getSpark(): SparkSession = {
    SparkSession.builder().getOrCreate()
  }

  private def getTaxis(spark: SparkSession): DataFrame = {
    spark.read.table("samples.nyctaxi.trips")
  }
}