Freigeben über


Codebeispiele für Databricks Connect für Scala

Hinweis

Dieser Artikel behandelt Databricks Connect für Databricks Runtime Version 13.3 LTS und höher.

In diesem Artikel finden Sie Codebeispiele, in denen Databricks Connect für Scala verwendet wird. Mit Databricks Connect können Sie beliebte IDEs, Notebookserver und benutzerdefinierte Anwendungen mit Azure Databricks-Clustern verbinden. Weitere Informationen finden Sie unter Was ist Databricks Connect?. Die Python-Version dieses Artikels finden Sie unter Codebeispiele für Databricks Connect für Python.

Hinweis

Bevor Sie beginnen, Databricks Connect zu verwenden, müssen Sie den Databricks Connect-Client einrichten.

Databricks bietet mehrere weitere Beispielanwendungen, die veranschaulichen, wie Databricks Connect verwendet wird. Sehen Sie sich das Repository für Beispielanwendungen für Databricks Connect auf GitHub an, insbesondere:

Sie können auch die folgenden einfacheren Codebeispiele verwenden, um mit Databricks Connect zu experimentieren. In diesen Beispielen wird davon ausgegangen, dass Sie die Standardauthentifizierung für die Einrichtung der Databricks Connect-Clients verwenden.

Dieses einfache Codebeispiel fragt die angegebene Tabelle ab, und zeigt dann die ersten 5 Zeilen der angegebenen Tabelle an. Um eine andere Tabelle zu verwenden, passen Sie den Aufruf auf spark.read.table an.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession

object Main {
  def main(args: Array[String]): Unit = {
    val spark = DatabricksSession.builder().getOrCreate()
    val df = spark.read.table("samples.nyctaxi.trips")
    df.limit(5).show()
  }
}

Dieses längere Codebeispiel führt Folgendes aus:

  1. Erstellt einen In-Memory-DataFrame.
  2. Erstellt eine Tabelle mit dem Namen zzz_demo_temps_table innerhalb des default-Schemas. Wenn die Tabelle mit diesem Namen bereits vorhanden ist, wird die Tabelle zuerst gelöscht. Um ein anderes Schema oder eine andere Tabelle zu verwenden, passen Sie die Aufrufe auf spark.sql, temps.write.saveAsTable oder beides an.
  3. Speichert die Inhalte des DataFrames in der Tabelle.
  4. Führt eine SELECT-Abfrage auf dem Inhalt der Tabelle aus.
  5. Zeigt das Ergebnis der Abfrage an.
  6. Löscht die Tabelle.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import java.time.LocalDate

object Main {
  def main(args: Array[String]): Unit = {
    val spark = DatabricksSession.builder().getOrCreate()

    // Create a Spark DataFrame consisting of high and low temperatures
    // by airport code and date.
    val schema = StructType(
      Seq(
        StructField("AirportCode", StringType, false),
        StructField("Date", DateType, false),
        StructField("TempHighF", IntegerType, false),
        StructField("TempLowF", IntegerType, false)
      )
    )

    val data = Seq(
      ( "BLI", LocalDate.of(2021, 4, 3), 52, 43 ),
      ( "BLI", LocalDate.of(2021, 4, 2), 50, 38),
      ( "BLI", LocalDate.of(2021, 4, 1), 52, 41),
      ( "PDX", LocalDate.of(2021, 4, 3), 64, 45),
      ( "PDX", LocalDate.of(2021, 4, 2), 61, 41),
      ( "PDX", LocalDate.of(2021, 4, 1), 66, 39),
      ( "SEA", LocalDate.of(2021, 4, 3), 57, 43),
      ( "SEA", LocalDate.of(2021, 4, 2), 54, 39),
      ( "SEA", LocalDate.of(2021, 4, 1), 56, 41)
    )

    val temps = spark.createDataFrame(data).toDF(schema.fieldNames: _*)

    // Create a table on the Databricks cluster and then fill
    // the table with the DataFrame 's contents.
    // If the table already exists from a previous run,
    // delete it first.
    spark.sql("USE default")
    spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table")
    temps.write.saveAsTable("zzz_demo_temps_table")

    // Query the table on the Databricks cluster, returning rows
    // where the airport code is not BLI and the date is later
    // than 2021-04-01.Group the results and order by high
    // temperature in descending order.
    val df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
      "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
      "GROUP BY AirportCode, Date, TempHighF, TempLowF " +
      "ORDER BY TempHighF DESC")
    df_temps.show()

    // Results:
    // +------------+-----------+---------+--------+
    // | AirportCode|       Date|TempHighF|TempLowF|
    // +------------+-----------+---------+--------+
    // |        PDX | 2021-04-03|      64 |     45 |
    // |        PDX | 2021-04-02|      61 |     41 |
    // |        SEA | 2021-04-03|      57 |     43 |
    // |        SEA | 2021-04-02|      54 |     39 |
    // +------------+-----------+---------+--------+

    // Clean up by deleting the table from the Databricks cluster.
    spark.sql("DROP TABLE zzz_demo_temps_table")
  }
}

Hinweis

Im folgenden Beispiel wird die Verwendung der SparkSession Klasse in Fällen beschrieben, in denen die DatabricksSession Klasse in Databricks Connect nicht verfügbar ist.

In diesem Beispiel wird die angegebene Tabelle abgefragt, und es werden die ersten 5 Zeilen zurückgegeben. In diesem Beispiel wird die SPARK_REMOTE Umgebungsvariable für die Authentifizierung verwendet.

import org.apache.spark.sql.{DataFrame, SparkSession}

object Main {
  def main(args: Array[String]): Unit = {
    getTaxis(getSpark()).show(5)
  }

  private def getSpark(): SparkSession = {
    SparkSession.builder().getOrCreate()
  }

  private def getTaxis(spark: SparkSession): DataFrame = {
    spark.read.table("samples.nyctaxi.trips")
  }
}