Compartir vía


Ejemplos de código para Databricks Connect para Scala

Nota:

Este artículo describe Databricks Connect para Databricks Runtime 13.3 LTS y versiones posteriores.

En este artículo se proporcionan ejemplos de código que usan Databricks Connect para Scala. Databricks Connect le permite conectar los clústeres de Azure Databricks a entornos de desarrollo integrado populares, servidores de cuadernos y otras aplicaciones personalizadas. Consulte ¿Qué es Databricks Connect?. Para obtener la versión de Python de este artículo, consulte Ejemplos de código para Databricks Connect para Python.

Nota:

Antes de empezar a usar Databricks Connect, debe configurar el cliente de Databricks Connect.

Databricks proporciona varias aplicaciones de ejemplo que muestran cómo usar Databricks Connect. Vea el repositorio de aplicaciones de ejemplo para Databricks Connect en GitHub, específicamente:

También puede usar los siguientes ejemplos de código más simples para experimentar con Databricks Connect. En estos ejemplos se supone que usa la autenticación predeterminada para la configuración de cliente de Databricks Connect.

Este ejemplo de código simple consulta la tabla especificada y, a continuación, muestra las cinco primeras filas de la tabla especificada. Para usar otra tabla, ajuste la llamada a spark.read.table.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession

object Main {
  def main(args: Array[String]): Unit = {
    val spark = DatabricksSession.builder().getOrCreate()
    val df = spark.read.table("samples.nyctaxi.trips")
    df.limit(5).show()
  }
}

Este ejemplo de código más largo hace lo siguiente:

  1. Crea un DataFrame en memoria.
  2. Crea una tabla con el nombre zzz_demo_temps_table dentro del esquema default. Si la tabla con este nombre ya existe, primero se elimina la tabla. Para usar un esquema o tabla diferente, ajuste las llamadas a spark.sql, temps.write.saveAsTable o ambas.
  3. Guarda el contenido de DataFrame en la tabla.
  4. Ejecuta una consulta SELECT en el contenido de la tabla.
  5. Muestra el resultado de la consulta.
  6. Elimina la tabla.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import java.time.LocalDate

object Main {
  def main(args: Array[String]): Unit = {
    val spark = DatabricksSession.builder().getOrCreate()

    // Create a Spark DataFrame consisting of high and low temperatures
    // by airport code and date.
    val schema = StructType(
      Seq(
        StructField("AirportCode", StringType, false),
        StructField("Date", DateType, false),
        StructField("TempHighF", IntegerType, false),
        StructField("TempLowF", IntegerType, false)
      )
    )

    val data = Seq(
      ( "BLI", LocalDate.of(2021, 4, 3), 52, 43 ),
      ( "BLI", LocalDate.of(2021, 4, 2), 50, 38),
      ( "BLI", LocalDate.of(2021, 4, 1), 52, 41),
      ( "PDX", LocalDate.of(2021, 4, 3), 64, 45),
      ( "PDX", LocalDate.of(2021, 4, 2), 61, 41),
      ( "PDX", LocalDate.of(2021, 4, 1), 66, 39),
      ( "SEA", LocalDate.of(2021, 4, 3), 57, 43),
      ( "SEA", LocalDate.of(2021, 4, 2), 54, 39),
      ( "SEA", LocalDate.of(2021, 4, 1), 56, 41)
    )

    val temps = spark.createDataFrame(data).toDF(schema.fieldNames: _*)

    // Create a table on the Databricks cluster and then fill
    // the table with the DataFrame 's contents.
    // If the table already exists from a previous run,
    // delete it first.
    spark.sql("USE default")
    spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table")
    temps.write.saveAsTable("zzz_demo_temps_table")

    // Query the table on the Databricks cluster, returning rows
    // where the airport code is not BLI and the date is later
    // than 2021-04-01.Group the results and order by high
    // temperature in descending order.
    val df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
      "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
      "GROUP BY AirportCode, Date, TempHighF, TempLowF " +
      "ORDER BY TempHighF DESC")
    df_temps.show()

    // Results:
    // +------------+-----------+---------+--------+
    // | AirportCode|       Date|TempHighF|TempLowF|
    // +------------+-----------+---------+--------+
    // |        PDX | 2021-04-03|      64 |     45 |
    // |        PDX | 2021-04-02|      61 |     41 |
    // |        SEA | 2021-04-03|      57 |     43 |
    // |        SEA | 2021-04-02|      54 |     39 |
    // +------------+-----------+---------+--------+

    // Clean up by deleting the table from the Databricks cluster.
    spark.sql("DROP TABLE zzz_demo_temps_table")
  }
}

Nota:

En el ejemplo siguiente, se describe cómo usar la clase SparkSession en los casos en los que la clase DatabricksSession de Databricks Connect no esté disponible.

En este ejemplo se consulta la tabla especificada y se devuelven las primeras 5 filas. Este ejemplo usa la variable de entorno SPARK_REMOTE para la autenticación.

import org.apache.spark.sql.{DataFrame, SparkSession}

object Main {
  def main(args: Array[String]): Unit = {
    getTaxis(getSpark()).show(5)
  }

  private def getSpark(): SparkSession = {
    SparkSession.builder().getOrCreate()
  }

  private def getTaxis(spark: SparkSession): DataFrame = {
    spark.read.table("samples.nyctaxi.trips")
  }
}