다음을 통해 공유


Scala용 Databricks Connect에 대한 코드 예제

참고 항목

이 문서에서는 Databricks Runtime 13.3 LTS 이상에 대한 Databricks Connect에 대해 설명합니다.

이 문서에서는 Scala용 Databricks Connect를 사용하는 코드 예제를 제공합니다. Databricks Connect를 사용하면 인기 있는 IDE, Notebook 서버 및 사용자 지정 애플리케이션을 Azure Databricks 클러스터에 연결할 수 있습니다. Databricks Connect란?을 참조하세요. 이 문서의 Python 버전은 Python용 Databricks Connect에 대한 코드 예제를 참조 하세요.

참고 항목

Databricks Connect 사용을 시작하기 전에 Databricks Connect 클라이언트를 하세요.

Databricks는 Databricks Connect를 사용하는 방법을 보여 주는 몇 가지 추가 예제 애플리케이션을 제공합니다. 특히 GitHub에서 Databricks Connect 리포지토리에 대한 예제 애플리케이션을 참조하세요.

다음 간단한 코드 예제를 사용하여 Databricks Connect를 실험할 수도 있습니다. 이러한 예제에서는 Databricks Connect 클라이언트 설정에 기본 인증을 사용하고 있다고 가정합니다.

이 간단한 코드 예제에서는 지정된 table 쿼리한 다음 지정된 table처음 5개 행을 보여 줍니다. 다른 table를 사용하려면 spark.read.table호출을 조정합니다.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession

object Main {
  def main(args: Array[String]): Unit = {
    val spark = DatabricksSession.builder().getOrCreate()
    val df = spark.read.table("samples.nyctaxi.trips")
    df.limit(5).show()
  }
}

이 긴 코드 예제는 다음을 수행합니다.

  1. 메모리 내 데이터 프레임을 만듭니다.
  2. default schema내에 이름이 zzz_demo_temps_table인 table을 만듭니다. 이 이름의 table 이미 있는 경우 table 먼저 삭제됩니다. 다른 schema 또는 table을 사용하려면, spark.sql, temps.write.saveAsTable, 또는 둘 다에 대한 호출을 조정할 수 있습니다.
  3. DataFrame의 내용을 table에 저장합니다.
  4. table내용에 대해 SELECT 쿼리를 실행합니다.
  5. 쿼리의 결과를 표시합니다.
  6. table삭제합니다.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import java.time.LocalDate

object Main {
  def main(args: Array[String]): Unit = {
    val spark = DatabricksSession.builder().getOrCreate()

    // Create a Spark DataFrame consisting of high and low temperatures
    // by airport code and date.
    val schema = StructType(
      Seq(
        StructField("AirportCode", StringType, false),
        StructField("Date", DateType, false),
        StructField("TempHighF", IntegerType, false),
        StructField("TempLowF", IntegerType, false)
      )
    )

    val data = Seq(
      ( "BLI", LocalDate.of(2021, 4, 3), 52, 43 ),
      ( "BLI", LocalDate.of(2021, 4, 2), 50, 38),
      ( "BLI", LocalDate.of(2021, 4, 1), 52, 41),
      ( "PDX", LocalDate.of(2021, 4, 3), 64, 45),
      ( "PDX", LocalDate.of(2021, 4, 2), 61, 41),
      ( "PDX", LocalDate.of(2021, 4, 1), 66, 39),
      ( "SEA", LocalDate.of(2021, 4, 3), 57, 43),
      ( "SEA", LocalDate.of(2021, 4, 2), 54, 39),
      ( "SEA", LocalDate.of(2021, 4, 1), 56, 41)
    )

    val temps = spark.createDataFrame(data).toDF(schema.fieldNames: _*)

    // Create a table on the Databricks cluster and then fill
    // the table with the DataFrame 's contents.
    // If the table already exists from a previous run,
    // delete it first.
    spark.sql("USE default")
    spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table")
    temps.write.saveAsTable("zzz_demo_temps_table")

    // Query the table on the Databricks cluster, returning rows
    // where the airport code is not BLI and the date is later
    // than 2021-04-01.Group the results and order by high
    // temperature in descending order.
    val df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
      "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
      "GROUP BY AirportCode, Date, TempHighF, TempLowF " +
      "ORDER BY TempHighF DESC")
    df_temps.show()

    // Results:
    // +------------+-----------+---------+--------+
    // | AirportCode|       Date|TempHighF|TempLowF|
    // +------------+-----------+---------+--------+
    // |        PDX | 2021-04-03|      64 |     45 |
    // |        PDX | 2021-04-02|      61 |     41 |
    // |        SEA | 2021-04-03|      57 |     43 |
    // |        SEA | 2021-04-02|      54 |     39 |
    // +------------+-----------+---------+--------+

    // Clean up by deleting the table from the Databricks cluster.
    spark.sql("DROP TABLE zzz_demo_temps_table")
  }
}

참고 항목

다음 예제에서는 Databricks Connect의 DatabricksSession 클래스를 사용할 수 없는 where 경우 SparkSession 클래스를 사용하는 방법을 설명합니다.

다음은 지정한 table 쿼리하고 처음 5개 행을 반환하는 예제입니다. 이 예제에서는 인증에 SPARK_REMOTE 환경 변수를 사용합니다.

import org.apache.spark.sql.{DataFrame, SparkSession}

object Main {
  def main(args: Array[String]): Unit = {
    getTaxis(getSpark()).show(5)
  }

  private def getSpark(): SparkSession = {
    SparkSession.builder().getOrCreate()
  }

  private def getTaxis(spark: SparkSession): DataFrame = {
    spark.read.table("samples.nyctaxi.trips")
  }
}