次の方法で共有


Databricks Runtime 12.2 LTS 以前用の Databricks Connect

Note

Databricks Connect では、代わりに Databricks Runtime 13.0 以降の Databricks Connect を使うことをお勧めします。

Databricks では、Databricks Runtime 12.2 LTS 以前の Databricks Connect の新機能は計画していません。

Databricks Connect を使用すると、Visual Studio Code や PyCharm などの一般的な IDE、ノートブック サーバー、その他のカスタム アプリケーションを Azure Databricks クラスターに接続できます。

この記事では、Databricks Connect のしくみ、Databricks Connect を開始する手順、Databricks Connect を使用する際に発生する可能性のある問題のトラブルシューティング方法、および Databricks Connect を使用した実行と Azure Databricks ノートブックでの実行の違いについて説明します。

概要

Databricks Connect は Databricks Runtime 用のクライアントライブラリです。 これにより、Spark API を使用してジョブを作成し、ローカルの Spark セッションではなく、Azure Databricks クラスターでリモート実行することができます。

たとえば、Databricks Connect を使用して DataFrame コマンド spark.read.format(...).load(...).groupBy(...).agg(...).show() を実行すると、リモート クラスターで実行するために Azure Databricks で実行されている Spark サーバーに、コマンドの論理表現が送信されます。

Databricks Connect では、次のことができます。

  • Python、R、Scala、Java で開発されたどのアプリケーションからでも大規模な Spark ジョブを実行できます。 import pysparkrequire(SparkR)import org.apache.spark が可能な場所なら、アプリケーションから直接 Spark ジョブを実行できるようになりました。IDE プラグインをインストールしたり、Spark 送信スクリプトを使用したりする必要はありません。
  • リモートクラスターを使用している場合でも、IDE でコードをステップ実行してデバッグします。
  • ライブラリの開発時にすばやく反復処理します。 Databricks Connect で Python または Java ライブラリの依存関係を変更した後にクラスターを再起動する必要はありません。各クライアントセッションはクラスター内で相互に分離されているためです。
  • 作業結果を失うことなく、アイドル状態のクラスターをシャットダウンします。 クライアントアプリケーションはクラスターから切り離されているため、クラスターの再起動またはアップグレードの影響を受けません。再起動やアップグレードは通常、ノートブックで定義されているすべての変数、RDD、およびデータフレームオブジェクトが失われる原因となります。

Note

SQL クエリを使用した python 開発の場合、Databricks では、Databricks Connect ではなくPython 用 Databricks SQL コネクタを使用することをお勧めします。 Python 用 Databricks SQL コネクタは、Databricks Connect よりも設定が簡単です。 また、ジョブがリモートのコンピューティングリソース上で実行されている間、Databricks Connect はローカル コンピューター上で実行されているジョブの解析および計画を行います。 これにより、特にランタイムエラーのデバッグが困難になる可能性があります。 Python 用 Databricks SQL コネクタは、リモートのコンピューティングリソースに SQL クエリを直接送信し、結果をフェッチします。

要件

このセクションには、Databricks Connect の要件を示します。

  • 以下のバージョンの Azure Databricks Runtime のみサポートされています。

    • Databricks Runtime 12.2 LTS ML、Databricks Runtime 12.2 LTS
    • Databricks Runtime 11.3 LTS ML、Databricks Runtime 11.3 LTS
    • Databricks Runtime 10.4 LTS ML、Databricks Runtime 10.4 LTS
    • Databricks Runtime 9.1 LTS ML、Databricks Runtime 9.1 LTS
    • Databricks Runtime 7.3 LTS
  • 開発用コンピューターに Python 3 をインストールする必要があり、クライアントの Python インストールのマイナー バージョンは、Azure Databricks クラスターのマイナー Python バージョンと同じである必要があります。 次の表は、各 Databricks Runtime に合わせてインストールされる Python バージョンを示しています。

    Databricks Runtime のバージョン Python バージョン
    12.2 LTS ML、12.2 LTS 3.9
    11.3 LTS ML、11.3 LTS 3.9
    10.4 LTS ML、10.4 LTS 3.8
    9.1 LTS ML、9.1 LTS 3.8
    7.3 LTS 3.7

    Databricks では、Databricks Connect で使用する Python バージョンごとに Python "仮想環境" をアクティブにすることを強く推奨しています。 Python 仮想環境は、正しいバージョンの Python と Databricks Connect を一緒に使用していることを確認するのに役立ちます。 これにより、関連する技術的な問題の解決に費やす時間を短縮できます。

    たとえば、開発マシンで venv を使用していて、クラスターが Python 3.9 を実行している場合、そのバージョンの venv 環境を作成する必要があります。 次のコマンド例は、Python 3.9 を使用して venv 環境をアクティブにするためのスクリプトを生成します。このコマンドは次に、現在の作業ディレクトリ内の .venv という名前の隠しフォルダー内にそれらのスクリプトを配置します。

    # Linux and macOS
    python3.9 -m venv ./.venv
    
    # Windows
    python3.9 -m venv .\.venv
    

    これらのスクリプトを使用してこの venv 環境をアクティブにするには、venvs のしくみに関する記事 (英語) を参照してください。

    もう 1 つの例として、開発マシンで Conda を使用していて、クラスターが Python 3.9 を実行している場合、そのバージョンの Conda 環境を作成する必要があります。次に例を示します。

    conda create --name dbconnect python=3.9
    

    この環境名で Conda 環境をアクティブにするには、conda activate dbconnect を実行します。

  • Databricks Connect のメジャーとマイナーのパッケージ バージョンは、常に Databricks Runtime のバージョンと一致している必要があります。 Databricks では、常に、Azure Databricks Runtime のバージョンと一致する、最新の Databricks Connect のパッケージを使用することをお勧めします。 たとえば、Databricks Runtime 12.2 LTS クラスターを使用するときは、databricks-connect==12.2.* パッケージも使用する必要があります。

    Note

    利用可能な Databricks Connect リリースとメンテナンスの更新プログラムの一覧については、 「Databricks Connect のリリースノート」を参照してください。

  • Java Runtime Environment (JRE) 8. クライアントは OpenJDK 8 JRE でテストされています。 クライアントは、Java 11 をサポートしていません。

Note

Windows で、Databricks Connect が winutils.exe を見つけられないというエラーが表示される場合は、「Windows で winutils.exe が見つからない」を参照してください。

クライアントをセットアップする

次の手順を実行して、Databricks Connect のローカル クライアントを設定します。

注意

ローカルの Databricks Connect クライアントの設定を開始する前に、Databricks Connect の要件を満たす必要があります。

手順 1: Databricks Connect クライアントをインストールする

  1. 仮想環境がアクティブな状態で、uninstall コマンドを実行して PySpark をアンインストールします (既にインストールされている場合)。 これは、databricks-connect パッケージが PySpark と競合するために必要です。 詳細については、「PySpark インストールの競合」を参照してください。 PySpark が既にインストールされているかどうかを調べるには、show コマンドを実行します。

    # Is PySpark already installed?
    pip3 show pyspark
    
    # Uninstall PySpark
    pip3 uninstall pyspark
    
  2. 仮想環境がアクティブな状態のままで、install コマンドを実行して Databricks Connect クライアントをインストールします。 --upgrade オプションを使用して、既存のクライアント インストールを指定したバージョンにアップグレードします。

    pip3 install --upgrade "databricks-connect==12.2.*"  # Or X.Y.* to match your cluster version.
    

    注意

    Databricks では、最新のパッケージがインストールされるように、databricks-connect=X.Y ではなく databricks-connect==X.Y.* を指定する "ドットとアスタリスク" の表記を追加することをお勧めします。

手順 2: 接続プロパティの構成

  1. 次の構成プロパティを収集します。

  2. 次のように接続を構成します。

    CLI、SQL 構成、環境変数を使用できます。 構成メソッドの優先順位は高い方から、SQL 構成キー、CLI、環境変数の順になります。

    • CLI

      1. databricks-connect を実行します。

        databricks-connect configure
        

        ライセンスは次のように表示されます。

        Copyright (2018) Databricks, Inc.
        
        This library (the "Software") may not be used except in connection with the
        Licensee's use of the Databricks Platform Services pursuant to an Agreement
          ...
        
      2. ライセンスに同意し、構成値を指定します。 Databricks HostDatabricks Tokenには、手順 1 で確認したワークスペース URL と個人用アクセス トークンを入力します。

        Do you accept the above agreement? [y/N] y
        Set new config values (leave input empty to accept default):
        Databricks Host [no current value, must start with https://]: <databricks-url>
        Databricks Token [no current value]: <databricks-token>
        Cluster ID (e.g., 0921-001415-jelly628) [no current value]: <cluster-id>
        Org ID (Azure-only, see ?o=orgId in URL) [0]: <org-id>
        Port [15001]: <port>
        

        Microsoft Entra ID トークンが長すぎますというメッセージが表示された場合は、 Databricks Tokenフィールドを空のままにして、~/.databricks-connect にトークンを手動で入力できます。

    • SQL 構成または環境変数。 次の表は、手順 1 で確認した構成プロパティに対応する SQL 構成キーと環境変数を示しています。 SQL 構成キーを設定するには、sql("set config=value") を使用します。 (例: sql("set spark.databricks.service.clusterId=0304-201045-abcdefgh"))。

      パラメーター SQL 構成キー 環境変数の名前
      Databricks Host spark.databricks.service.address DATABRICKS_ADDRESS
      Databricks Token spark.databricks.service.token DATABRICKS_API_TOKEN
      クラスター ID spark.databricks.service.clusterId DATABRICKS_CLUSTER_ID
      組織 ID spark.databricks.service.orgId DATABRICKS_ORG_ID
      Port spark.databricks.service.port DATABRICKS_PORT
  3. 仮想環境がまだアクティブ化された状態で、次のように Azure Databricks への接続をテストします。

    databricks-connect test
    

    構成したクラスターが実行されていない場合、テストはクラスターを開始します。クラスターは、構成された自動終了時刻まで実行されたままとなります。 出力は次のようになります。

    * PySpark is installed at /.../.../pyspark
    * Checking java version
    java version "1.8..."
    Java(TM) SE Runtime Environment (build 1.8...)
    Java HotSpot(TM) 64-Bit Server VM (build 25..., mixed mode)
    * Testing scala command
    ../../.. ..:..:.. WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    ../../.. ..:..:.. WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set.
    ../../.. ..:..:.. WARN SparkServiceRPCClient: Now tracking server state for 5ab..., invalidating prev state
    ../../.. ..:..:.. WARN SparkServiceRPCClient: Syncing 129 files (176036 bytes) took 3003 ms
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 2...
          /_/
    
    Using Scala version 2.... (Java HotSpot(TM) 64-Bit Server VM, Java 1.8...)
    Type in expressions to have them evaluated.
    Type :help for more information.
    
    scala> spark.range(100).reduce(_ + _)
    Spark context Web UI available at https://...
    Spark context available as 'sc' (master = local[*], app id = local-...).
    Spark session available as 'spark'.
    View job details at <databricks-url>/?o=0#/setting/clusters/<cluster-id>/sparkUi
    View job details at <databricks-url>?o=0#/setting/clusters/<cluster-id>/sparkUi
    res0: Long = 4950
    
    scala> :quit
    
    * Testing python command
    ../../.. ..:..:.. WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    ../../.. ..:..:.. WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set.
    ../../.. ..:..:.. WARN SparkServiceRPCClient: Now tracking server state for 5ab.., invalidating prev state
    View job details at <databricks-url>/?o=0#/setting/clusters/<cluster-id>/sparkUi
    
  4. 接続関連のエラーが表示されない場合 (WARN メッセージは問題ありません)、正常に接続されています。

Databricks Connect を使用する

このセクションでは、Databricks Connect に対してクライアントを使用するために使用する IDE またはノートブック サーバーの構成方法を説明します。

このセクションの内容は次のとおりです。

JupyterLab

Note

Databricks Connect の使用を開始する前に、要件を満たし、Databricks Connect のクライアントを設定する必要があります。

JupyterLab と Python で Databricks Connect を使用するには、次の手順に従います。

  1. JupyterLab をインストールするには、Python 仮想環境がアクティブな状態で、ターミナルまたはコマンド プロンプトから次のコマンドを実行します。

    pip3 install jupyterlab
    
  2. Web ブラウザーで JupyterLab を起動するには、アクティブな Python 仮想環境から次のコマンドを実行します。

    jupyter lab
    

    JupyterLab が Web ブラウザーに表示されない場合は、localhost または 127.0.0.1 で始まる URL を仮想環境からコピーし、Web ブラウザーのアドレス バーに入力します。

  3. JupyterLab で、メインメニューの [File]>[New]>[Notebook] をクリックし、[Python 3 (ipykernel)] を選択し、[Select] をクリックして新しいノートブックを作成します。

  4. ノートブックの最初のセルに、コード例または独自のコードを入力します。 独自のコードを使用する場合は、コード例に示すように、少なくとも SparkSession.builder.getOrCreate() のインスタンスを 1 つインスタンス化する必要があります。

  5. ノートブックを実行するには、[実行] > [すべてのセルを実行] をクリックします。

  6. ノートブックをデバッグするには、ノートブックのツール バーの [Python 3 (ipykernel)] の横にあるバグ (デバッガーを有効にする) アイコンをクリックします。 1 つ以上のブレークポイントを設定し、[実行] > [すべてのセルを実行] をクリックします。

  7. JupyterLab をシャットダウンするには、[ファイル] > [シャットダウン] をクリックします。 JupyterLab プロセスがターミナルまたはコマンド プロンプトで引き続き実行されている場合は、Ctrl + c を押してから、y を入力して確定してこのプロセスを停止します。

具体的なデバッグ手順については、「Debugger」 (デバッガー) を参照してください。

クラシック Jupyter Notebook

Note

Databricks Connect の使用を開始する前に、要件を満たし、Databricks Connect のクライアントを設定する必要があります。

Databricks Connect の構成スクリプトが、パッケージをプロジェクト構成に自動的に追加します。 Python カーネルで開始するには、以下を実行します。

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

SQL クエリの実行および視覚化のために %sql の短縮形を有効にするには、以下のスニペットを使用します。

from IPython.core.magic import line_magic, line_cell_magic, Magics, magics_class

@magics_class
class DatabricksConnectMagics(Magics):

   @line_cell_magic
   def sql(self, line, cell=None):
       if cell and line:
           raise ValueError("Line must be empty for cell magic", line)
       try:
           from autovizwidget.widget.utils import display_dataframe
       except ImportError:
           print("Please run `pip install autovizwidget` to enable the visualization widget.")
           display_dataframe = lambda x: x
       return display_dataframe(self.get_spark().sql(cell or line).toPandas())

   def get_spark(self):
       user_ns = get_ipython().user_ns
       if "spark" in user_ns:
           return user_ns["spark"]
       else:
           from pyspark.sql import SparkSession
           user_ns["spark"] = SparkSession.builder.getOrCreate()
           return user_ns["spark"]

ip = get_ipython()
ip.register_magics(DatabricksConnectMagics)

Visual Studio Code

Note

Databricks Connect の使用を開始する前に、要件を満たし、Databricks Connect のクライアントを設定する必要があります。

Visual Studio Code で Databricks Connect を使用するには、次のようにします。

  1. Python 拡張機能がインストールされていることを確認します。

  2. コマンド パレットを開きます (macOS では Command + Shift + P、Windows/Linux では Ctrl + Shift + P)。

  3. Python インタープリターを選択します。 [コード] > [基本設定] > [設定] に移動し、[Python 設定] を選択します。

  4. databricks-connect get-jar-dir を実行します。

  5. コマンドから返されたディレクトリを、python.venvPath にある[User Settings JSON](ユーザー設定 JSON) に追加します。 これを Python 構成に追加する必要があります。

  6. リンターを無効にします。 右側の [...] をクリックし、JSON 設定を編集します。 変更された設定は次のとおりです。

    VS Code の構成

  7. 仮想環境を使用して実行する場合は、VS Code で Python 用に開発する手法が推奨されます。コマンド パレットで、select python interpreter と入力し、クラスターの Python バージョンに一致する環境を指定します。

    Python インタープリターの選択

    たとえば、クラスターが Python 3.9 の場合、開発環境は Python 3.9 である必要があります。

    Python バージョン

PyCharm

Note

Databricks Connect の使用を開始する前に、要件を満たし、Databricks Connect のクライアントを設定する必要があります。

Databricks Connect の構成スクリプトが、パッケージをプロジェクト構成に自動的に追加します。

Python 3 クラスター

  1. PyCharm プロジェクトを作成する場合、[Existing Interpreter](既存のインタープリター) を選択します。 ドロップダウン メニューから、作成した Conda 環境を選択します (「要件」を参照)。

    インタープリターの選択

  2. [実行]>[構成の編集]に移動します。

  3. 環境変数として PYSPARK_PYTHON=python3 を追加します。

    Python 3 クラスター構成

SparkR と RStudio Desktop

Note

Databricks Connect の使用を開始する前に、要件を満たし、Databricks Connect のクライアントを設定する必要があります。

SparkR と RStudio Desktop で Databricks Connect を使用するには、次のようにします。

  1. オープンソース Spark ディストリビューションをダウンロードして開発マシンに展開します。 Azure Databricks クラスター (Hadoop 2.7) のものと同じバージョンを選択します。

  2. databricks-connect get-jar-dir を実行します。 このコマンドは、/usr/local/lib/python3.5/dist-packages/pyspark/jars のようなパスを返します。 JAR ディレクトリ ファイル パスの 1 つ上のファイル パスをコピーします。たとえば、/usr/local/lib/python3.5/dist-packages/pyspark のように。これが SPARK_HOME ディレクトリです。

  3. R スクリプトの一番上に追加して、Spark ライブラリ パスと Spark ホームを構成します。 手順 1 でオープン ソースの Spark パッケージを展開したディレクトリに、<spark-lib-path> を設定します。 手順 2 の Databricks Connect ディレクトリに <spark-home-path> を設定します。

    # Point to the OSS package path, e.g., /path/to/.../spark-2.4.0-bin-hadoop2.7
    library(SparkR, lib.loc = .libPaths(c(file.path('<spark-lib-path>', 'R', 'lib'), .libPaths())))
    
    # Point to the Databricks Connect PySpark installation, e.g., /path/to/.../pyspark
    Sys.setenv(SPARK_HOME = "<spark-home-path>")
    
  4. Spark セッションを開始し、SparkR コマンドの実行を開始します。

    sparkR.session()
    
    df <- as.DataFrame(faithful)
    head(df)
    
    df1 <- dapply(df, function(x) { x }, schema(df))
    collect(df1)
    

sparklyr と RStudio Desktop

Note

Databricks Connect の使用を開始する前に、要件を満たし、Databricks Connect のクライアントを設定する必要があります。

重要

この機能はパブリック プレビュー段階にあります。

Databricks Connect を使用して、ローカルで開発した sparklyr に依存するコードをコピーし、Azure Databricks ノートブックまたは Azure Databricks ワークスペースにホストされている RStudio Server で、最小限の変更、またはコード変更なしに実行できます。

このセクションの内容は次のとおりです。

必要条件

  • sparklyr 1.2 以上。
  • 一致するバージョンの Databricks Connect を使用する Databricks Runtime 7.3 LTS 以上。

sparklyr のインストール、構成、使用

  1. RStudio Desktop で、CRAN から sparklyr 1.2 以上をインストールするか、GitHub から最新のマスター バージョンをインストールします。

    # Install from CRAN
    install.packages("sparklyr")
    
    # Or install the latest master version from GitHub
    install.packages("devtools")
    devtools::install_github("sparklyr/sparklyr")
    
  2. 正しいバージョンの Databricks Connect をインストールした Python 環境をアクティブ化し、ターミナルで次のコマンドを実行して <spark-home-path> を取得します。

    databricks-connect get-spark-home
    
  3. Spark セッションを開始し、sparklyr コマンドの実行を開始します。

    library(sparklyr)
    sc <- spark_connect(method = "databricks", spark_home = "<spark-home-path>")
    
    iris_tbl <- copy_to(sc, iris, overwrite = TRUE)
    
    library(dplyr)
    src_tbls(sc)
    
    iris_tbl %>% count
    
  4. 接続を閉じます。

    spark_disconnect(sc)
    

リソース

詳しくは、sparklyr GitHub の「説明ファイル」をご覧ください。

コード例については、「sparklyr」を参照してください。

sparklyr と RStudio Desktop の制限事項

次の機能はサポートされていません。

  • sparklyr ストリーミング API
  • sparklyr ML API
  • broom API
  • csv_file シリアル化モード
  • spark-submit

IntelliJ (Scala または Java)

Note

Databricks Connect の使用を開始する前に、要件を満たし、Databricks Connect のクライアントを設定する必要があります。

IntelliJ (Scala または Java) で Databricks Connect を使用するには、次のようにします。

  1. databricks-connect get-jar-dir を実行します。

  2. コマンドから返されたディレクトリに依存関係を指定します。 [ファイル]>[プロジェクトの構造]>[モジュール]>[依存関係]>[+]>[JARs or Directories](JAR またはディレクトリ)に移動します。

    IntelliJ JAR

    競合を回避するために、クラスパスから他の Spark インストールを削除することを強く推奨します。 これが不可能な場合は、追加する JAR がクラスパスの前に来るよう確認してください。 特に、インストールされている他のバージョンの Spark より前である必要があります (そうでない場合、他の Spark バージョンのいずれかを使用してローカルで実行するか、ClassDefNotFoundError をスローします)。

  3. IntelliJ のブレークアウト オプションの設定を確認します。 既定値は [All] で、デバッグ用にブレークポイントを設定するとネットワーク タイムアウトが生します。 バックグラウンド ネットワーク スレッドが停止しないようにするには、Thread に設定します。

    IntelliJ スレッド

Eclipse を使用した PyDev

Note

Databricks Connect の使用を開始する前に、要件を満たし、Databricks Connect のクライアントを設定する必要があります。

Eclipse を使用して Databricks Connect と PyDev を使用するには、次のようにします。

  1. Eclipse を起動します。
  2. [File]>[New]>[Project]>[PyDev]>[PyDev Project] の順にクリックし、[Next] をクリックして、プロジェクトを作成します。
  3. プロジェクト名を指定します。
  4. [Project contents] で、Python 仮想環境へのパスを指定します。
  5. [Please configure an interpreter before proceeding] をクリックします。
  6. [Manual config]をクリックします。
  7. [New]>[Browse for python/pypy exe] をクリックします。
  8. 仮想環境から参照される Python インタープリターへの完全なパスを参照して選択し、[Open] をクリックします。
  9. [Select interpreter] ダイアログで、[OK] をクリックします。
  10. [Selection needed] ダイアログで、[OK] をクリックします。
  11. [Preferences] ダイアログで、[Apply and Close] をクリックします。
  12. [PyDev Project] ダイアログで、[Finish] をクリックします。
  13. [Open Perspective] をクリックします。
  14. コード例または独自のコードを含む Python コード (.py) ファイルをプロジェクトに追加します。 独自のコードを使用する場合は、コード例に示すように、少なくとも SparkSession.builder.getOrCreate() のインスタンスを 1 つインスタンス化する必要があります。
  15. Python コード ファイルを開いた状態で、実行中にコードを一時停止するブレークポイントを設定します。
  16. [実行] > [実行] または [実行] > [デバッグ] をクリックします。

具体的な実行とデバッグの手順については、プログラムの実行に関する記事 (英語) を参照してください。

Eclipse

Note

Databricks Connect の使用を開始する前に、要件を満たし、Databricks Connect のクライアントを設定する必要があります。

Databricks Connect と Eclipse を使用するには、次のようにします。

  1. databricks-connect get-jar-dir を実行します。

  2. 外部の JAR 構成をコマンドから返されたディレクトリに指定します。 [プロジェクト メニュー]>[プロパティ]>[Java Build Path]>[ライブラリ]>[Add External Jar](外部の JAR を追加) に移動します。

    Eclipse の外部の JAR 構成

    競合を回避するために、クラスパスから他の Spark インストールを削除することを強く推奨します。 これが不可能な場合は、追加する JAR がクラスパスの前に来るよう確認してください。 特に、インストールされている他のバージョンの Spark より前である必要があります (そうでない場合、他の Spark バージョンのいずれかを使用してローカルで実行するか、ClassDefNotFoundError をスローします)。

    Eclipse の Spark 構成

SBT

Note

Databricks Connect の使用を開始する前に、要件を満たし、Databricks Connect のクライアントを設定する必要があります。

SBT で Databricks Connect を使用するには、通常の Spark ライブラリの依存関係ではなく、Databricks Connect JAR にリンクする build.sbt ファイルを構成する必要があります。 次のビルド ファイルの例の unmanagedBase ディレクティブで、これを行っています。これは、com.example.Test メイン オブジェクトを持つ Scala アプリを想定しています。

build.sbt

name := "hello-world"
version := "1.0"
scalaVersion := "2.11.6"
// this should be set to the path returned by ``databricks-connect get-jar-dir``
unmanagedBase := new java.io.File("/usr/local/lib/python2.7/dist-packages/pyspark/jars")
mainClass := Some("com.example.Test")

Spark シェル

Note

Databricks Connect の使用を開始する前に、要件を満たし、Databricks Connect のクライアントを設定する必要があります。

Spark シェルと Python または Scala で Databricks Connect を使用するには、次の手順に従います。

  1. 仮想環境がアクティブ化された状態で、「クライアントをセットアップする」で databricks-connect test コマンドが正常に実行されたことを確認します。

  2. 仮想環境がアクティブ化された状態で、Spark シェルを起動します。 Python の場合は、pyspark コマンドを実行します。 Scala の場合は、spark-shell コマンドを実行します。

    # For Python:
    pyspark
    
    # For Scala:
    spark-shell
    
  3. Spark シェルが表示されます。Python の例:

    Python 3... (v3...)
    [Clang 6... (clang-6...)] on darwin
    Type "help", "copyright", "credits" or "license" for more information.
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    ../../.. ..:..:.. WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Welcome to
           ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /__ / .__/\_,_/_/ /_/\_\   version 3....
          /_/
    
    Using Python version 3... (v3...)
    Spark context Web UI available at http://...:...
    Spark context available as 'sc' (master = local[*], app id = local-...).
    SparkSession available as 'spark'.
    >>>
    

    Scala の場合:

    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    ../../.. ..:..:.. WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Spark context Web UI available at http://...
    Spark context available as 'sc' (master = local[*], app id = local-...).
    Spark session available as 'spark'.
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 3...
          /_/
    
    Using Scala version 2... (OpenJDK 64-Bit Server VM, Java 1.8...)
    Type in expressions to have them evaluated.
    Type :help for more information.
    
    scala>
    
  4. Spark シェルと Python または Scala を使用してクラスターでコマンドを実行する方法については、「Interactive Analysis with the Spark Shell」を参照してください。

    組み込みの spark 変数を使用して、実行中のクラスターの SparkSession を表します。Python の例:

    >>> df = spark.read.table("samples.nyctaxi.trips")
    >>> df.show(5)
    +--------------------+---------------------+-------------+-----------+----------+-----------+
    |tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|fare_amount|pickup_zip|dropoff_zip|
    +--------------------+---------------------+-------------+-----------+----------+-----------+
    | 2016-02-14 16:52:13|  2016-02-14 17:16:04|         4.94|       19.0|     10282|      10171|
    | 2016-02-04 18:44:19|  2016-02-04 18:46:00|         0.28|        3.5|     10110|      10110|
    | 2016-02-17 17:13:57|  2016-02-17 17:17:55|          0.7|        5.0|     10103|      10023|
    | 2016-02-18 10:36:07|  2016-02-18 10:41:45|          0.8|        6.0|     10022|      10017|
    | 2016-02-22 14:14:41|  2016-02-22 14:31:52|         4.51|       17.0|     10110|      10282|
    +--------------------+---------------------+-------------+-----------+----------+-----------+
    only showing top 5 rows
    

    Scala の場合:

    >>> val df = spark.read.table("samples.nyctaxi.trips")
    >>> df.show(5)
    +--------------------+---------------------+-------------+-----------+----------+-----------+
    |tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|fare_amount|pickup_zip|dropoff_zip|
    +--------------------+---------------------+-------------+-----------+----------+-----------+
    | 2016-02-14 16:52:13|  2016-02-14 17:16:04|         4.94|       19.0|     10282|      10171|
    | 2016-02-04 18:44:19|  2016-02-04 18:46:00|         0.28|        3.5|     10110|      10110|
    | 2016-02-17 17:13:57|  2016-02-17 17:17:55|          0.7|        5.0|     10103|      10023|
    | 2016-02-18 10:36:07|  2016-02-18 10:41:45|          0.8|        6.0|     10022|      10017|
    | 2016-02-22 14:14:41|  2016-02-22 14:31:52|         4.51|       17.0|     10110|      10282|
    +--------------------+---------------------+-------------+-----------+----------+-----------+
    only showing top 5 rows
    
  5. Spark シェルを停止するには、Ctrl + d または Ctrl + z を押すか、quit() または exit() コマンド、あるいは Python の場合は :q、Scala の場合は :quit を実行します。

コード例

この単純なコード例は、指定されたテーブルに対してクエリを実行して、指定されたテーブルの最初の 5 行を表示します。 異なるテーブルを使用するには、spark.read.table への呼び出しを調整します。

from pyspark.sql.session import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.read.table("samples.nyctaxi.trips")
df.show(5)

次の長いコード例の実行内容は次のとおりです。

  1. メモリ内 DataFrame を作成します。
  2. default スキーマ内に zzz_demo_temps_table という名前のテーブルを作成します。 この名前のテーブルが既に存在する場合は、まずテーブルが削除されます。 異なるスキーマまたはテーブルを使用するには、spark.sqltemps.write.saveAsTable、または両方への呼び出しを調整します。
  3. DataFrame の内容をテーブルに保存します。
  4. テーブルの内容に対して SELECT クエリを実行します。
  5. クエリの結果を表示します。
  6. テーブルを削除します。

Python

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from datetime import date

spark = SparkSession.builder.appName('temps-demo').getOrCreate()

# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
    StructField('AirportCode', StringType(), False),
    StructField('Date', DateType(), False),
    StructField('TempHighF', IntegerType(), False),
    StructField('TempLowF', IntegerType(), False)
])

data = [
    [ 'BLI', date(2021, 4, 3), 52, 43],
    [ 'BLI', date(2021, 4, 2), 50, 38],
    [ 'BLI', date(2021, 4, 1), 52, 41],
    [ 'PDX', date(2021, 4, 3), 64, 45],
    [ 'PDX', date(2021, 4, 2), 61, 41],
    [ 'PDX', date(2021, 4, 1), 66, 39],
    [ 'SEA', date(2021, 4, 3), 57, 43],
    [ 'SEA', date(2021, 4, 2), 54, 39],
    [ 'SEA', date(2021, 4, 1), 56, 41]
]

temps = spark.createDataFrame(data, schema)

# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS zzz_demo_temps_table')
temps.write.saveAsTable('zzz_demo_temps_table')

# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " \
    "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
    "GROUP BY AirportCode, Date, TempHighF, TempLowF " \
    "ORDER BY TempHighF DESC")
df_temps.show()

# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode|      Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# |        PDX|2021-04-03|       64|      45|
# |        PDX|2021-04-02|       61|      41|
# |        SEA|2021-04-03|       57|      43|
# |        SEA|2021-04-02|       54|      39|
# +-----------+----------+---------+--------+

# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE zzz_demo_temps_table')

Scala

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Date

object Demo {
  def main(args: Array[String]) {
      val spark = SparkSession.builder.master("local").getOrCreate()

      // Create a Spark DataFrame consisting of high and low temperatures
      // by airport code and date.
      val schema = StructType(Array(
        StructField("AirportCode", StringType, false),
        StructField("Date", DateType, false),
        StructField("TempHighF", IntegerType, false),
        StructField("TempLowF", IntegerType, false)
      ))

      val data = List(
        Row("BLI", Date.valueOf("2021-04-03"), 52, 43),
        Row("BLI", Date.valueOf("2021-04-02"), 50, 38),
        Row("BLI", Date.valueOf("2021-04-01"), 52, 41),
        Row("PDX", Date.valueOf("2021-04-03"), 64, 45),
        Row("PDX", Date.valueOf("2021-04-02"), 61, 41),
        Row("PDX", Date.valueOf("2021-04-01"), 66, 39),
        Row("SEA", Date.valueOf("2021-04-03"), 57, 43),
        Row("SEA", Date.valueOf("2021-04-02"), 54, 39),
        Row("SEA", Date.valueOf("2021-04-01"), 56, 41)
      )

      val rdd = spark.sparkContext.makeRDD(data)
      val temps = spark.createDataFrame(rdd, schema)

      // Create a table on the Databricks cluster and then fill
      // the table with the DataFrame's contents.
      // If the table already exists from a previous run,
      // delete it first.
      spark.sql("USE default")
      spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table")
      temps.write.saveAsTable("zzz_demo_temps_table")

      // Query the table on the Databricks cluster, returning rows
      // where the airport code is not BLI and the date is later
      // than 2021-04-01. Group the results and order by high
      // temperature in descending order.
      val df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
        "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
        "GROUP BY AirportCode, Date, TempHighF, TempLowF " +
        "ORDER BY TempHighF DESC")
      df_temps.show()

      // Results:
      //
      // +-----------+----------+---------+--------+
      // |AirportCode|      Date|TempHighF|TempLowF|
      // +-----------+----------+---------+--------+
      // |        PDX|2021-04-03|       64|      45|
      // |        PDX|2021-04-02|       61|      41|
      // |        SEA|2021-04-03|       57|      43|
      // |        SEA|2021-04-02|       54|      39|
      // +-----------+----------+---------+--------+

      // Clean up by deleting the table from the Databricks cluster.
      spark.sql("DROP TABLE zzz_demo_temps_table")
  }
}

Java

import java.util.ArrayList;
import java.util.List;
import java.sql.Date;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.Dataset;

public class App {
    public static void main(String[] args) throws Exception {
        SparkSession spark = SparkSession
            .builder()
            .appName("Temps Demo")
            .config("spark.master", "local")
            .getOrCreate();

        // Create a Spark DataFrame consisting of high and low temperatures
        // by airport code and date.
        StructType schema = new StructType(new StructField[] {
            new StructField("AirportCode", DataTypes.StringType, false, Metadata.empty()),
            new StructField("Date", DataTypes.DateType, false, Metadata.empty()),
            new StructField("TempHighF", DataTypes.IntegerType, false, Metadata.empty()),
            new StructField("TempLowF", DataTypes.IntegerType, false, Metadata.empty()),
        });

        List<Row> dataList = new ArrayList<Row>();
        dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-03"), 52, 43));
        dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-02"), 50, 38));
        dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-01"), 52, 41));
        dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-03"), 64, 45));
        dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-02"), 61, 41));
        dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-01"), 66, 39));
        dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-03"), 57, 43));
        dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-02"), 54, 39));
        dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-01"), 56, 41));

        Dataset<Row> temps = spark.createDataFrame(dataList, schema);

        // Create a table on the Databricks cluster and then fill
        // the table with the DataFrame's contents.
        // If the table already exists from a previous run,
        // delete it first.
        spark.sql("USE default");
        spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table");
        temps.write().saveAsTable("zzz_demo_temps_table");

        // Query the table on the Databricks cluster, returning rows
        // where the airport code is not BLI and the date is later
        // than 2021-04-01. Group the results and order by high
        // temperature in descending order.
        Dataset<Row> df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
            "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
            "GROUP BY AirportCode, Date, TempHighF, TempLowF " +
            "ORDER BY TempHighF DESC");
        df_temps.show();

        // Results:
        //
        // +-----------+----------+---------+--------+
        // |AirportCode|      Date|TempHighF|TempLowF|
        // +-----------+----------+---------+--------+
        // |        PDX|2021-04-03|       64|      45|
        // |        PDX|2021-04-02|       61|      41|
        // |        SEA|2021-04-03|       57|      43|
        // |        SEA|2021-04-02|       54|      39|
        // +-----------+----------+---------+--------+

        // Clean up by deleting the table from the Databricks cluster.
        spark.sql("DROP TABLE zzz_demo_temps_table");
    }
}

依存関係の動作

通常、メイン クラスまたは Python ファイルには、他の依存関係の JAR やファイルがあります。 sparkContext.addJar("path-to-the-jar")sparkContext.addPyFile("path-to-the-file") を呼び出すことで、このような依存関係の JAR やファイルを追加することができます。 addPyFile() インターフェイスを使用して、Egg ファイルと zip ファイルを追加することもできます。 IDE でコードを実行するたびに、依存関係の JAR やファイルがクラスターにインストールされます。

Python

from lib import Foo
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

sc = spark.sparkContext
#sc.setLogLevel("INFO")

print("Testing simple count")
print(spark.range(100).count())

print("Testing addPyFile isolation")
sc.addPyFile("lib.py")
print(sc.parallelize(range(10)).map(lambda i: Foo(2)).collect())

class Foo(object):
  def __init__(self, x):
    self.x = x

Python + Java UDFs

from pyspark.sql import SparkSession
from pyspark.sql.column import _to_java_column, _to_seq, Column

## In this example, udf.jar contains compiled Java / Scala UDFs:
#package com.example
#
#import org.apache.spark.sql._
#import org.apache.spark.sql.expressions._
#import org.apache.spark.sql.functions.udf
#
#object Test {
#  val plusOne: UserDefinedFunction = udf((i: Long) => i + 1)
#}

spark = SparkSession.builder \
  .config("spark.jars", "/path/to/udf.jar") \
  .getOrCreate()
sc = spark.sparkContext

def plus_one_udf(col):
  f = sc._jvm.com.example.Test.plusOne()
  return Column(f.apply(_to_seq(sc, [col], _to_java_column)))

sc._jsc.addJar("/path/to/udf.jar")
spark.range(100).withColumn("plusOne", plus_one_udf("id")).show()

Scala

package com.example

import org.apache.spark.sql.SparkSession

case class Foo(x: String)

object Test {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      ...
      .getOrCreate();
    spark.sparkContext.setLogLevel("INFO")

    println("Running simple show query...")
    spark.read.format("parquet").load("/tmp/x").show()

    println("Running simple UDF query...")
    spark.sparkContext.addJar("./target/scala-2.11/hello-world_2.11-1.0.jar")
    spark.udf.register("f", (x: Int) => x + 1)
    spark.range(10).selectExpr("f(id)").show()

    println("Running custom objects query...")
    val objs = spark.sparkContext.parallelize(Seq(Foo("bye"), Foo("hi"))).collect()
    println(objs.toSeq)
  }
}

Databricks ユーティリティにアクセスする

このセクションでは、Databricks Connect を使用して Databricks ユーティリティにアクセスする方法について説明します。

Databricks Utilities (dbutils) リファレンス モジュールの dbutils.fsdbutils.secrets のユーティリティを使用できます。 サポートされるコマンドは dbutils.fs.cpdbutils.fs.headdbutils.fs.lsdbutils.fs.mkdirsdbutils.fs.mvdbutils.fs.putdbutils.fs.rmdbutils.secrets.getdbutils.secrets.getBytesdbutils.secrets.listdbutils.secrets.listScopes です。 ファイル システム ユーティリティ (dbutils.fs) を参照、または dbutils.fs.help() を実行します。および、シークレット ユーティリティ (dbutils.secrets) を参照、または dbutils.secrets.help() を実行します。

Python

from pyspark.sql import SparkSession
from pyspark.dbutils import DBUtils

spark = SparkSession.builder.getOrCreate()

dbutils = DBUtils(spark)
print(dbutils.fs.ls("dbfs:/"))
print(dbutils.secrets.listScopes())

Databricks Runtime 7.3 LTS 以上を使用する場合に、ローカルと Azure Databricks クラスターの両方で動作する方法で DBUtils モジュールにアクセスするには、次のように get_dbutils() を使用します。

def get_dbutils(spark):
  from pyspark.dbutils import DBUtils
  return DBUtils(spark)

それ以外の場合は、次のように get_dbutils() を使用します。

def get_dbutils(spark):
  if spark.conf.get("spark.databricks.service.client.enabled") == "true":
    from pyspark.dbutils import DBUtils
    return DBUtils(spark)
  else:
    import IPython
    return IPython.get_ipython().user_ns["dbutils"]

Scala

val dbutils = com.databricks.service.DBUtils
println(dbutils.fs.ls("dbfs:/"))
println(dbutils.secrets.listScopes())

ローカルおよびリモートのファイル システム間でのファイルのコピー

dbutils.fs を使用して、クライアントおよびリモートのファイル システム間でファイルをコピーできます。 スキーム file:/ は、クライアント上のローカル ファイル システムを参照します。

from pyspark.dbutils import DBUtils
dbutils = DBUtils(spark)

dbutils.fs.cp('file:/home/user/data.csv', 'dbfs:/uploads')
dbutils.fs.cp('dbfs:/output/results.csv', 'file:/home/user/downloads/')

この方法で転送できる最大ファイル サイズは 250 MB です。

[dbutils.secrets.get] を有効にします

セキュリティの制限により、dbutils.secrets.get を呼び出す機能は既定では無効になっています。 この機能をワークスペースで有効にするには、Azure Databricks サポートにお問い合わせください。

Hadoop 構成を設定する

クライアントでは、spark.conf.set API を使用して Hadoop 構成を設定できます。これは、SQL と DataFrame の操作に適用されます。 sparkContext に設定された Hadoop 構成は、クラスター構成に設定するか、ノートブックを使用する必要があります。 これは、sparkContext に設定された構成はユーザー セッションに関連付けられるのではなく、クラスター全体に適用されるからです。

トラブルシューティング

databricks-connect test を実行して、接続の問題を確認します。 このセクションでは、Databricks Connect で発生する可能性がある一般的な問題とその解決方法について説明します。

このセクションの内容は次のとおりです。

Python のバージョンが一致しない

ローカルで使用している Python バージョンのマイナー リリースが、少なくともクラスター上のバージョンと同じ (たとえば、3.9.163.9.15 なら OK で、3.93.8 ではダメ) ことを確認します。

複数の Python バージョンがローカルにインストールされている場合は、PYSPARK_PYTHON 環境変数 (たとえば、PYSPARK_PYTHON=python3) を設定して、Databricks Connect で適切なバージョンが使用されていることを確認します。

サーバーが有効ではない

クラスターで Spark サーバーが spark.databricks.service.server.enabled true で有効になっているか確認します。 有効であれば、ドライバー ログに次の行が表示されます。

../../.. ..:..:.. INFO SparkConfUtils$: Set spark config:
spark.databricks.service.server.enabled -> true
...
../../.. ..:..:.. INFO SparkContext: Loading Spark Service RPC Server
../../.. ..:..:.. INFO SparkServiceRPCServer:
Starting Spark Service RPC Server
../../.. ..:..:.. INFO Server: jetty-9...
../../.. ..:..:.. INFO AbstractConnector: Started ServerConnector@6a6c7f42
{HTTP/1.1,[http/1.1]}{0.0.0.0:15001}
../../.. ..:..:.. INFO Server: Started @5879ms

競合している PySpark インストール

databricks-connect パッケージが PySpark と競合している。 両方をインストールすると、Python で Spark コンテキストを初期化するときにエラーが発生します。 これは、"ストリームの破損" エラーや "クラスが見つかりません" エラーなど、いくつかのパターンで現れる可能性があります。 Python 環境に PySpark がインストールされている場合は、Databricks Connect をインストールする前に PySpark をアンインストールしておくようにしてください。 PySpark をアンインストールした後、Databricks Connect パッケージをすべて再インストールしてください。

pip3 uninstall pyspark
pip3 uninstall databricks-connect
pip3 install --upgrade "databricks-connect==12.2.*"  # or X.Y.* to match your specific cluster version.

SPARK_HOME の 競合

以前にマシンで Spark を使用していた場合は、Databricks Connect Spark ではなく、他のバージョンの Spark のいずれかを使用するように IDE が構成されることがあります。 これは、"ストリームの破損" エラーや "クラスが見つかりません" エラーなど、いくつかのパターンで現れる可能性があります。 SPARK_HOME 環境変数の値を確認すると、使用されている Spark のバージョンがわかります。

Python

import os
print(os.environ['SPARK_HOME'])

Scala

println(sys.env.get("SPARK_HOME"))

Java

System.out.println(System.getenv("SPARK_HOME"));

解決方法

SPARK_HOME がクライアント内のバージョン以外のバージョンの Spark に設定されている場合、SPARK_HOME 変数の設定を解除して、もう一度やり直す必要があります。

IDE 環境変数の設定、.bashrc.zshrc.bash_profile のファイル、および他の環境変数が設定されている可能性がある場所を確認します。 古い状態を消去するには IDE を終了して再起動しなければならない可能性が高く、問題が解決しない場合は、さらに新しいプロジェクトを作成する必要がある場合もあります。

SPARK_HOME を新しい値に設定する必要はありません。設定を解除すれば十分です。

バイナリの PATH エントリの競合または欠落

spark-shell のようなコマンドが、Databricks Connect のバイナリではなく、以前にインストールされた他のバイナリを実行するよう、PATH が構成されている場合があります。 これにより、databricks-connect testが失敗する可能性があります。 Databricks Connect のバイナリを優先とするか、以前にインストールしたバイナリを削除する必要があります。

spark-shell のようなコマンドを実行できない場合、 PATH が pip3 install によって自動的に設定されていない可能性があり、インストール bin ディレクトリを PATH に手動で追加する必要があります。 これが設定されていない場合でも、IDE で Databricks Connect を使用できます。 ただし、databricks-connect test コマンドは動作しません。

クラスターのシリアル化設定の競合

databricks-connect test の実行中に "ストリームが破損しています" というエラーが表示される場合は、クラスターのシリアル化構成に互換性がないことが原因である可能性があります。 たとえば、spark.io.compression.codec 構成の設定によって、この問題が発生する場合があります。 この問題を解決するには、クラスター設定からこれらの構成を削除するか、Databricks Connect クライアントで構成を設定してください。

Windows で winutils.exe が見つからない

Databricks Connect を Windows で使用していて、以下が表示される場合

ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.

Windows で Hadoop パスを構成する手順に従ってください。

Windows 上のファイル名、ディレクトリ名、ボリューム ラベル構文が正しくない

Windows と Databricks Connect を使用していて、次のように表示される場合

The filename, directory name, or volume label syntax is incorrect.

Java または Databricks Connect がスペースを含むパスを持つディレクトリにインストールされています。 これに対処するには、スペースのないディレクトリ パスに をインストールするか、短縮名形式を使用してパスを構成します。

Microsoft Entra ID トークンを使用した認証

Note

次の情報は、Databricks Connect バージョン 7.3.5 から 12.2.x にのみ適用されます。

Databricks Runtime 13.3 LTS 以降用の Databricks Connect は、現在 Microsoft Entra ID トークンをサポートしていません。

Databricks Connect バージョン 7.3.5 から 12.2.x を使用するときは、個人用アクセス トークンではなく Microsoft Entra ID トークンを使用して認証できます。 Microsoft Entra ID トークンは有効期間は限られています。 Microsoft Entra ID トークンが期限切れの場合、Databricks Connect は Invalid Token エラーで失敗となります。

Databricks Connect バージョン 7.3.5 から 12.2.x では、実行中の Databricks Connect アプリケーションに Microsoft Entra ID トークンを提供できます。 アプリケーションは、新しいアクセス トークンを取得し、それを spark.databricks.service.token SQL 構成キーに設定する必要があります。

Python

spark.conf.set("spark.databricks.service.token", new_aad_token)

Scala

spark.conf.set("spark.databricks.service.token", newAADToken)

トークンを更新すると、アプリケーションは同じ SparkSession、およびそのセッションのコンテキストで作成されたオブジェクトと状態を引き続き使用できます。 断続的なエラーを回避するために、Databricks では、古いトークンの有効期限が切れる前に新しいトークンを提供することをお勧めしています。

Microsoft Entra ID トークンの有効期間を延長して、アプリケーションの実行中、維持することができます。 これを行うには、使用した Microsoft Entra ID 認証アプリケーションに、適切な長さの有効期間のTokenLifetimePolicyをアタッチして、アクセストークンを取得します。

Note

Microsoft Entra ID パススルーは 2 つのトークンを使用します。前に説明した Databricks Connect バージョン 7.3.5 から 12.2.x で構成する Microsoft Entra ID アクセス トークンと、Databricks が要求を処理するときに Databricks が生成する特定のリソースのための ADLS パススルー トークンです。 Microsoft Entra ID トークンの有効期間ポリシーを使用して、ADLS パススルー トークンの有効期間を延長することはできません。 1 時間以上かかるコマンドをクラスターに送信した場合、1 時間経過した後にコマンドが ADLS リソースにアクセスすると、失敗します。

制限事項

  • Unity Catalog

  • 構造化ストリーム。

  • リモートクラスター上の Spark ジョブの一部ではない任意のコードの実行。

  • デルタ テーブル操作用の Native Scala、Python、R API (DeltaTable.forPath など) はサポートされていません。 ただし、Delta Lake 操作の SQL API ( spark.sql(...) ) と Spark API (spark.read.load など) はどちらもサポートされています。

  • COPY INTO 機能。

  • サーバーのカタログの一部である SQL 関数、Python または Scala の UDF の使用。 ただし、ローカルに導入されている Scala と Python UDF は機能します。

  • Apache Zeppelin 0.7.x 以下。

  • テーブル アクセス制御によるクラスターへの接続。

  • プロセス分離が有効になっているクラスターへの接続 (つまり、spark.databricks.pyspark.enableProcessIsolationtrue に設定されている場合)。

  • Delta CLONE SQL コマンド。

  • グローバル一時ビュー。

  • Koalaspyspark.pandas

  • CREATE TABLE table AS SELECT ... SQL コマンドは常に機能するとは限りません。 代わりに spark.sql("SELECT ...").write.saveAsTable("table") を使用してください。

  • Microsoft Entra ID 資格情報のパススルーは Databricks Runtime 7.3 LTS 以降を実行している標準クラスターでのみサポートされており、サービスプリンシパル認証と互換性がありません。

  • 次の Databricks Utilities (dbutils) リファレンス: