다음을 통해 공유


Scala용 Databricks Connect의 사용자 정의 함수

메모

이 문서에서는 Databricks Runtime 14.1 이상용 Databricks Connect에 대해 설명합니다.

이 문서에서는 Databricks Connect for Scala를 사용하여 사용자 정의 함수를 실행하는 방법을 설명합니다. Databricks Connect를 사용하면 인기 있는 IDE, Notebook 서버 및 사용자 지정 애플리케이션을 Azure Databricks 클러스터에 연결할 수 있습니다. Python 버전의 이 문서에 대해서는 Python용 Databricks Connect의 사용자 정의 함수을 참조하세요.

메모

Databricks Connect 사용을 시작하기 전에 Databricks Connect 클라이언트를 설정할 필요가 있습니다.

Databricks Runtime 14.1 이상의 경우 Scala용 Databricks Connect는 UDF(사용자 정의 함수) 실행을 지원합니다.

UDF를 실행하려면 UDF에 필요한 컴파일된 클래스 및 JAR을 클러스터에 업로드해야 합니다. addCompiledArtifacts() API를 사용하여 업로드해야 하는 컴파일된 클래스 및 JAR 파일을 지정할 수 있습니다.

메모

클라이언트에서 사용하는 Scala는 Azure Databricks 클러스터의 Scala 버전과 일치해야 합니다. 클러스터의 Scala 버전을 확인하려면 Databricks Runtime 릴리스 노트 버전과 호환성에서 클러스터의 Databricks 런타임 버전에 대한 "시스템 환경" 섹션을 참조하세요.

다음 Scala 프로그램은 열의 값을 제곱하는 간단한 UDF를 설정합니다.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    def squared(x: Int): Int = x * x

    val squared_udf = udf(squared _)

    spark.range(3)
      .withColumn("squared", squared_udf(col("id")))
      .select("squared")
      .show()
  }
}

위의 예제에서는 UDF가 Main내에 완전히 포함되어 있으므로 컴파일된 Main 아티팩트만 추가됩니다. UDF가 다른 클래스에 분산되거나 외부 라이브러리(예: JAR)를 사용하는 경우 이러한 모든 라이브러리도 포함되어야 합니다.

Spark 세션이 이미 초기화되면 spark.addArtifact() API를 사용하여 추가로 컴파일된 클래스 및 JAR을 업로드할 수 있습니다.

메모

JAR을 업로드할 때 모든 전이적 종속성 JAR을 포함해야 합니다. API는 전이적 종속성의 자동 검색을 수행하지 않습니다.

형식화된 데이터 세트 API

UDF에 대한 이전 섹션에서 설명한 것과 동일한 메커니즘이 형식화된 데이터 세트 API에도 적용됩니다.

형식화된 데이터 세트 API를 사용하면 결과 데이터 세트에 대한 맵, 필터 및 집계와 같은 변환을 실행할 수 있습니다. 또한 Databricks 클러스터의 UDF와 유사하게 실행됩니다.

다음 Scala 애플리케이션은 map() API를 사용하여 결과 열의 숫자를 접두사 문자열로 수정합니다.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    spark.range(3).map(f => s"row-$f").show()
  }
}

이 예제에서는 map() API를 사용하지만 filter(), mapPartitions()등과 같은 다른 형식의 데이터 세트 API에도 적용됩니다.