Databricks Runtime 12.2 LTS 以前用の Databricks Connect
Note
Databricks Connect では、代わりに Databricks Runtime 13.0 以降の Databricks Connect を使うことをお勧めします。
Databricks では、Databricks Runtime 12.2 LTS 以前の Databricks Connect の新機能は計画していません。
Databricks Connect を使用すると、Visual Studio Code や PyCharm などの一般的な IDE、ノートブック サーバー、その他のカスタム アプリケーションを Azure Databricks クラスターに接続できます。
この記事では、Databricks Connect のしくみ、Databricks Connect を開始する手順、Databricks Connect を使用する際に発生する可能性のある問題のトラブルシューティング方法、および Databricks Connect を使用した実行と Azure Databricks ノートブックでの実行の違いについて説明します。
概要
Databricks Connect は Databricks Runtime 用のクライアントライブラリです。 これにより、Spark API を使用してジョブを作成し、ローカルの Spark セッションではなく、Azure Databricks クラスターでリモート実行することができます。
たとえば、Databricks Connect を使用して DataFrame コマンド spark.read.format(...).load(...).groupBy(...).agg(...).show()
を実行すると、リモート クラスターで実行するために Azure Databricks で実行されている Spark サーバーに、コマンドの論理表現が送信されます。
Databricks Connect では、次のことができます。
- Python、R、Scala、Java で開発されたどのアプリケーションからでも大規模な Spark ジョブを実行できます。
import pyspark
、require(SparkR)
、import org.apache.spark
が可能な場所なら、アプリケーションから直接 Spark ジョブを実行できるようになりました。IDE プラグインをインストールしたり、Spark 送信スクリプトを使用したりする必要はありません。 - リモートクラスターを使用している場合でも、IDE でコードをステップ実行してデバッグします。
- ライブラリの開発時にすばやく反復処理します。 Databricks Connect で Python または Java ライブラリの依存関係を変更した後にクラスターを再起動する必要はありません。各クライアントセッションはクラスター内で相互に分離されているためです。
- 作業結果を失うことなく、アイドル状態のクラスターをシャットダウンします。 クライアントアプリケーションはクラスターから切り離されているため、クラスターの再起動またはアップグレードの影響を受けません。再起動やアップグレードは通常、ノートブックで定義されているすべての変数、RDD、およびデータフレームオブジェクトが失われる原因となります。
Note
SQL クエリを使用した python 開発の場合、Databricks では、Databricks Connect ではなくPython 用 Databricks SQL コネクタを使用することをお勧めします。 Python 用 Databricks SQL コネクタは、Databricks Connect よりも設定が簡単です。 また、ジョブがリモートのコンピューティングリソース上で実行されている間、Databricks Connect はローカル コンピューター上で実行されているジョブの解析および計画を行います。 これにより、特にランタイムエラーのデバッグが困難になる可能性があります。 Python 用 Databricks SQL コネクタは、リモートのコンピューティングリソースに SQL クエリを直接送信し、結果をフェッチします。
要件
このセクションには、Databricks Connect の要件を示します。
以下のバージョンの Azure Databricks Runtime のみサポートされています。
- Databricks Runtime 12.2 LTS ML、Databricks Runtime 12.2 LTS
- Databricks Runtime 11.3 LTS ML、Databricks Runtime 11.3 LTS
- Databricks Runtime 10.4 LTS ML、Databricks Runtime 10.4 LTS
- Databricks Runtime 9.1 LTS ML、Databricks Runtime 9.1 LTS
- Databricks Runtime 7.3 LTS
開発用コンピューターに Python 3 をインストールする必要があり、クライアントの Python インストールのマイナー バージョンは、Azure Databricks クラスターのマイナー Python バージョンと同じである必要があります。 次の表は、各 Databricks Runtime に合わせてインストールされる Python バージョンを示しています。
Databricks Runtime のバージョン Python バージョン 12.2 LTS ML、12.2 LTS 3.9 11.3 LTS ML、11.3 LTS 3.9 10.4 LTS ML、10.4 LTS 3.8 9.1 LTS ML、9.1 LTS 3.8 7.3 LTS 3.7 Databricks では、Databricks Connect で使用する Python バージョンごとに Python "仮想環境" をアクティブにすることを強く推奨しています。 Python 仮想環境は、正しいバージョンの Python と Databricks Connect を一緒に使用していることを確認するのに役立ちます。 これにより、関連する技術的な問題の解決に費やす時間を短縮できます。
たとえば、開発マシンで venv を使用していて、クラスターが Python 3.9 を実行している場合、そのバージョンの
venv
環境を作成する必要があります。 次のコマンド例は、Python 3.9 を使用してvenv
環境をアクティブにするためのスクリプトを生成します。このコマンドは次に、現在の作業ディレクトリ内の.venv
という名前の隠しフォルダー内にそれらのスクリプトを配置します。# Linux and macOS python3.9 -m venv ./.venv # Windows python3.9 -m venv .\.venv
これらのスクリプトを使用してこの
venv
環境をアクティブにするには、venvs のしくみに関する記事 (英語) を参照してください。もう 1 つの例として、開発マシンで Conda を使用していて、クラスターが Python 3.9 を実行している場合、そのバージョンの Conda 環境を作成する必要があります。次に例を示します。
conda create --name dbconnect python=3.9
この環境名で Conda 環境をアクティブにするには、
conda activate dbconnect
を実行します。Databricks Connect のメジャーとマイナーのパッケージ バージョンは、常に Databricks Runtime のバージョンと一致している必要があります。 Databricks では、常に、Azure Databricks Runtime のバージョンと一致する、最新の Databricks Connect のパッケージを使用することをお勧めします。 たとえば、Databricks Runtime 12.2 LTS クラスターを使用するときは、
databricks-connect==12.2.*
パッケージも使用する必要があります。Note
利用可能な Databricks Connect リリースとメンテナンスの更新プログラムの一覧については、 「Databricks Connect のリリースノート」を参照してください。
Java Runtime Environment (JRE) 8. クライアントは OpenJDK 8 JRE でテストされています。 クライアントは、Java 11 をサポートしていません。
Note
Windows で、Databricks Connect が winutils.exe
を見つけられないというエラーが表示される場合は、「Windows で winutils.exe が見つからない」を参照してください。
クライアントをセットアップする
次の手順を実行して、Databricks Connect のローカル クライアントを設定します。
注意
ローカルの Databricks Connect クライアントの設定を開始する前に、Databricks Connect の要件を満たす必要があります。
手順 1: Databricks Connect クライアントをインストールする
仮想環境がアクティブな状態で、
uninstall
コマンドを実行して PySpark をアンインストールします (既にインストールされている場合)。 これは、databricks-connect
パッケージが PySpark と競合するために必要です。 詳細については、「PySpark インストールの競合」を参照してください。 PySpark が既にインストールされているかどうかを調べるには、show
コマンドを実行します。# Is PySpark already installed? pip3 show pyspark # Uninstall PySpark pip3 uninstall pyspark
仮想環境がアクティブな状態のままで、
install
コマンドを実行して Databricks Connect クライアントをインストールします。--upgrade
オプションを使用して、既存のクライアント インストールを指定したバージョンにアップグレードします。pip3 install --upgrade "databricks-connect==12.2.*" # Or X.Y.* to match your cluster version.
注意
Databricks では、最新のパッケージがインストールされるように、
databricks-connect=X.Y
ではなくdatabricks-connect==X.Y.*
を指定する "ドットとアスタリスク" の表記を追加することをお勧めします。
手順 2: 接続プロパティの構成
次の構成プロパティを収集します。
Azure Databricks のワークスペースごとの URL。 これは、
https://
の後にクラスターのサーバー ホスト名の値を付けたものと同じです。「Azure Databricks コンピューティング リソースの接続の詳細を取得する」を参照してください。Azure Databricks 個人用アクセス トークンまたは Microsoft Entra ID (旧称 Azure Active Directory) トークン。
- Azure Data Lake Storage (ADLS) 資格情報のパススルーの場合は、Microsoft Entra ID トークンを使用する必要があります。 Microsoft Entra ID 資格情報のパススルーは Databricks Runtime 7.3 LTS 以降を実行している標準クラスターでのみサポートされており、サービスプリンシパル認証と互換性がありません。
- Microsoft Entra ID トークンを使用した認証の詳細については、「Microsoft Entra ID トークンを使用した認証」を参照してください。
クラスターの ID。 クラスター ID は URL から取得できます。 ここでの、クラスター ID は
1108-201635-xxxxxxxx
です。 「クラスター URL と ID」も参照してください。ワークスペースの一意の組織 ID。 「ワークスペース オブジェクトの識別子を取得する」を参照してください。
Databricks Connect の接続先のクラスター上のポート。 既定のポートは
15001
です。 クラスターが別のポートを使用するように構成されている場合 (Azure Databricks についての前の指示で指定されていた8787
など)、構成されているポート番号を使用します。
次のように接続を構成します。
CLI、SQL 構成、環境変数を使用できます。 構成メソッドの優先順位は高い方から、SQL 構成キー、CLI、環境変数の順になります。
CLI
databricks-connect
を実行します。databricks-connect configure
ライセンスは次のように表示されます。
Copyright (2018) Databricks, Inc. This library (the "Software") may not be used except in connection with the Licensee's use of the Databricks Platform Services pursuant to an Agreement ...
ライセンスに同意し、構成値を指定します。 Databricks HostとDatabricks Tokenには、手順 1 で確認したワークスペース URL と個人用アクセス トークンを入力します。
Do you accept the above agreement? [y/N] y Set new config values (leave input empty to accept default): Databricks Host [no current value, must start with https://]: <databricks-url> Databricks Token [no current value]: <databricks-token> Cluster ID (e.g., 0921-001415-jelly628) [no current value]: <cluster-id> Org ID (Azure-only, see ?o=orgId in URL) [0]: <org-id> Port [15001]: <port>
Microsoft Entra ID トークンが長すぎますというメッセージが表示された場合は、 Databricks Tokenフィールドを空のままにして、
~/.databricks-connect
にトークンを手動で入力できます。
SQL 構成または環境変数。 次の表は、手順 1 で確認した構成プロパティに対応する SQL 構成キーと環境変数を示しています。 SQL 構成キーを設定するには、
sql("set config=value")
を使用します。 (例:sql("set spark.databricks.service.clusterId=0304-201045-abcdefgh")
)。パラメーター SQL 構成キー 環境変数の名前 Databricks Host spark.databricks.service.address DATABRICKS_ADDRESS Databricks Token spark.databricks.service.token DATABRICKS_API_TOKEN クラスター ID spark.databricks.service.clusterId DATABRICKS_CLUSTER_ID 組織 ID spark.databricks.service.orgId DATABRICKS_ORG_ID Port spark.databricks.service.port DATABRICKS_PORT
仮想環境がまだアクティブ化された状態で、次のように Azure Databricks への接続をテストします。
databricks-connect test
構成したクラスターが実行されていない場合、テストはクラスターを開始します。クラスターは、構成された自動終了時刻まで実行されたままとなります。 出力は次のようになります。
* PySpark is installed at /.../.../pyspark * Checking java version java version "1.8..." Java(TM) SE Runtime Environment (build 1.8...) Java HotSpot(TM) 64-Bit Server VM (build 25..., mixed mode) * Testing scala command ../../.. ..:..:.. WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). ../../.. ..:..:.. WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set. ../../.. ..:..:.. WARN SparkServiceRPCClient: Now tracking server state for 5ab..., invalidating prev state ../../.. ..:..:.. WARN SparkServiceRPCClient: Syncing 129 files (176036 bytes) took 3003 ms Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2... /_/ Using Scala version 2.... (Java HotSpot(TM) 64-Bit Server VM, Java 1.8...) Type in expressions to have them evaluated. Type :help for more information. scala> spark.range(100).reduce(_ + _) Spark context Web UI available at https://... Spark context available as 'sc' (master = local[*], app id = local-...). Spark session available as 'spark'. View job details at <databricks-url>/?o=0#/setting/clusters/<cluster-id>/sparkUi View job details at <databricks-url>?o=0#/setting/clusters/<cluster-id>/sparkUi res0: Long = 4950 scala> :quit * Testing python command ../../.. ..:..:.. WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). ../../.. ..:..:.. WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set. ../../.. ..:..:.. WARN SparkServiceRPCClient: Now tracking server state for 5ab.., invalidating prev state View job details at <databricks-url>/?o=0#/setting/clusters/<cluster-id>/sparkUi
接続関連のエラーが表示されない場合 (
WARN
メッセージは問題ありません)、正常に接続されています。
Databricks Connect を使用する
このセクションでは、Databricks Connect に対してクライアントを使用するために使用する IDE またはノートブック サーバーの構成方法を説明します。
このセクションの内容は次のとおりです。
- JupyterLab
- クラシック Jupyter Notebook
- PyCharm
- SparkR と RStudio Desktop
- sparklyr と RStudio Desktop
- IntelliJ (Scala または Java)
- Eclipse を使用した PyDev
- Eclipse
- SBT
- Spark シェル
JupyterLab
JupyterLab と Python で Databricks Connect を使用するには、次の手順に従います。
JupyterLab をインストールするには、Python 仮想環境がアクティブな状態で、ターミナルまたはコマンド プロンプトから次のコマンドを実行します。
pip3 install jupyterlab
Web ブラウザーで JupyterLab を起動するには、アクティブな Python 仮想環境から次のコマンドを実行します。
jupyter lab
JupyterLab が Web ブラウザーに表示されない場合は、
localhost
または127.0.0.1
で始まる URL を仮想環境からコピーし、Web ブラウザーのアドレス バーに入力します。JupyterLab で、メインメニューの [File]>[New]>[Notebook] をクリックし、[Python 3 (ipykernel)] を選択し、[Select] をクリックして新しいノートブックを作成します。
ノートブックの最初のセルに、コード例または独自のコードを入力します。 独自のコードを使用する場合は、コード例に示すように、少なくとも
SparkSession.builder.getOrCreate()
のインスタンスを 1 つインスタンス化する必要があります。ノートブックを実行するには、[実行] > [すべてのセルを実行] をクリックします。
ノートブックをデバッグするには、ノートブックのツール バーの [Python 3 (ipykernel)] の横にあるバグ (デバッガーを有効にする) アイコンをクリックします。 1 つ以上のブレークポイントを設定し、[実行] > [すべてのセルを実行] をクリックします。
JupyterLab をシャットダウンするには、[ファイル] > [シャットダウン] をクリックします。 JupyterLab プロセスがターミナルまたはコマンド プロンプトで引き続き実行されている場合は、
Ctrl + c
を押してから、y
を入力して確定してこのプロセスを停止します。
具体的なデバッグ手順については、「Debugger」 (デバッガー) を参照してください。
クラシック Jupyter Notebook
Databricks Connect の構成スクリプトが、パッケージをプロジェクト構成に自動的に追加します。 Python カーネルで開始するには、以下を実行します。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
SQL クエリの実行および視覚化のために %sql
の短縮形を有効にするには、以下のスニペットを使用します。
from IPython.core.magic import line_magic, line_cell_magic, Magics, magics_class
@magics_class
class DatabricksConnectMagics(Magics):
@line_cell_magic
def sql(self, line, cell=None):
if cell and line:
raise ValueError("Line must be empty for cell magic", line)
try:
from autovizwidget.widget.utils import display_dataframe
except ImportError:
print("Please run `pip install autovizwidget` to enable the visualization widget.")
display_dataframe = lambda x: x
return display_dataframe(self.get_spark().sql(cell or line).toPandas())
def get_spark(self):
user_ns = get_ipython().user_ns
if "spark" in user_ns:
return user_ns["spark"]
else:
from pyspark.sql import SparkSession
user_ns["spark"] = SparkSession.builder.getOrCreate()
return user_ns["spark"]
ip = get_ipython()
ip.register_magics(DatabricksConnectMagics)
Visual Studio Code
Visual Studio Code で Databricks Connect を使用するには、次のようにします。
Python 拡張機能がインストールされていることを確認します。
コマンド パレットを開きます (macOS では Command + Shift + P、Windows/Linux では Ctrl + Shift + P)。
Python インタープリターを選択します。 [コード] > [基本設定] > [設定] に移動し、[Python 設定] を選択します。
databricks-connect get-jar-dir
を実行します。コマンドから返されたディレクトリを、
python.venvPath
にある[User Settings JSON](ユーザー設定 JSON) に追加します。 これを Python 構成に追加する必要があります。リンターを無効にします。 右側の [...] をクリックし、JSON 設定を編集します。 変更された設定は次のとおりです。
仮想環境を使用して実行する場合は、VS Code で Python 用に開発する手法が推奨されます。コマンド パレットで、
select python interpreter
と入力し、クラスターの Python バージョンに一致する環境を指定します。たとえば、クラスターが Python 3.9 の場合、開発環境は Python 3.9 である必要があります。
PyCharm
Databricks Connect の構成スクリプトが、パッケージをプロジェクト構成に自動的に追加します。
Python 3 クラスター
PyCharm プロジェクトを作成する場合、[Existing Interpreter](既存のインタープリター) を選択します。 ドロップダウン メニューから、作成した Conda 環境を選択します (「要件」を参照)。
[実行]>[構成の編集]に移動します。
環境変数として
PYSPARK_PYTHON=python3
を追加します。
SparkR と RStudio Desktop
SparkR と RStudio Desktop で Databricks Connect を使用するには、次のようにします。
オープンソース Spark ディストリビューションをダウンロードして開発マシンに展開します。 Azure Databricks クラスター (Hadoop 2.7) のものと同じバージョンを選択します。
databricks-connect get-jar-dir
を実行します。 このコマンドは、/usr/local/lib/python3.5/dist-packages/pyspark/jars
のようなパスを返します。 JAR ディレクトリ ファイル パスの 1 つ上のファイル パスをコピーします。たとえば、/usr/local/lib/python3.5/dist-packages/pyspark
のように。これがSPARK_HOME
ディレクトリです。R スクリプトの一番上に追加して、Spark ライブラリ パスと Spark ホームを構成します。 手順 1 でオープン ソースの Spark パッケージを展開したディレクトリに、
<spark-lib-path>
を設定します。 手順 2 の Databricks Connect ディレクトリに<spark-home-path>
を設定します。# Point to the OSS package path, e.g., /path/to/.../spark-2.4.0-bin-hadoop2.7 library(SparkR, lib.loc = .libPaths(c(file.path('<spark-lib-path>', 'R', 'lib'), .libPaths()))) # Point to the Databricks Connect PySpark installation, e.g., /path/to/.../pyspark Sys.setenv(SPARK_HOME = "<spark-home-path>")
Spark セッションを開始し、SparkR コマンドの実行を開始します。
sparkR.session() df <- as.DataFrame(faithful) head(df) df1 <- dapply(df, function(x) { x }, schema(df)) collect(df1)
sparklyr と RStudio Desktop
重要
この機能はパブリック プレビュー段階にあります。
Databricks Connect を使用して、ローカルで開発した sparklyr に依存するコードをコピーし、Azure Databricks ノートブックまたは Azure Databricks ワークスペースにホストされている RStudio Server で、最小限の変更、またはコード変更なしに実行できます。
このセクションの内容は次のとおりです。
必要条件
- sparklyr 1.2 以上。
- 一致するバージョンの Databricks Connect を使用する Databricks Runtime 7.3 LTS 以上。
sparklyr のインストール、構成、使用
RStudio Desktop で、CRAN から sparklyr 1.2 以上をインストールするか、GitHub から最新のマスター バージョンをインストールします。
# Install from CRAN install.packages("sparklyr") # Or install the latest master version from GitHub install.packages("devtools") devtools::install_github("sparklyr/sparklyr")
正しいバージョンの Databricks Connect をインストールした Python 環境をアクティブ化し、ターミナルで次のコマンドを実行して
<spark-home-path>
を取得します。databricks-connect get-spark-home
Spark セッションを開始し、sparklyr コマンドの実行を開始します。
library(sparklyr) sc <- spark_connect(method = "databricks", spark_home = "<spark-home-path>") iris_tbl <- copy_to(sc, iris, overwrite = TRUE) library(dplyr) src_tbls(sc) iris_tbl %>% count
接続を閉じます。
spark_disconnect(sc)
リソース
詳しくは、sparklyr GitHub の「説明ファイル」をご覧ください。
コード例については、「sparklyr」を参照してください。
sparklyr と RStudio Desktop の制限事項
次の機能はサポートされていません。
- sparklyr ストリーミング API
- sparklyr ML API
- broom API
- csv_file シリアル化モード
- spark-submit
IntelliJ (Scala または Java)
IntelliJ (Scala または Java) で Databricks Connect を使用するには、次のようにします。
databricks-connect get-jar-dir
を実行します。コマンドから返されたディレクトリに依存関係を指定します。 [ファイル]>[プロジェクトの構造]>[モジュール]>[依存関係]>[+]>[JARs or Directories](JAR またはディレクトリ)に移動します。
競合を回避するために、クラスパスから他の Spark インストールを削除することを強く推奨します。 これが不可能な場合は、追加する JAR がクラスパスの前に来るよう確認してください。 特に、インストールされている他のバージョンの Spark より前である必要があります (そうでない場合、他の Spark バージョンのいずれかを使用してローカルで実行するか、
ClassDefNotFoundError
をスローします)。IntelliJ のブレークアウト オプションの設定を確認します。 既定値は [All] で、デバッグ用にブレークポイントを設定するとネットワーク タイムアウトが生します。 バックグラウンド ネットワーク スレッドが停止しないようにするには、Thread に設定します。
Eclipse を使用した PyDev
Eclipse を使用して Databricks Connect と PyDev を使用するには、次のようにします。
- Eclipse を起動します。
- [File]>[New]>[Project]>[PyDev]>[PyDev Project] の順にクリックし、[Next] をクリックして、プロジェクトを作成します。
- プロジェクト名を指定します。
- [Project contents] で、Python 仮想環境へのパスを指定します。
- [Please configure an interpreter before proceeding] をクリックします。
- [Manual config]をクリックします。
- [New]>[Browse for python/pypy exe] をクリックします。
- 仮想環境から参照される Python インタープリターへの完全なパスを参照して選択し、[Open] をクリックします。
- [Select interpreter] ダイアログで、[OK] をクリックします。
- [Selection needed] ダイアログで、[OK] をクリックします。
- [Preferences] ダイアログで、[Apply and Close] をクリックします。
- [PyDev Project] ダイアログで、[Finish] をクリックします。
- [Open Perspective] をクリックします。
- コード例または独自のコードを含む Python コード (
.py
) ファイルをプロジェクトに追加します。 独自のコードを使用する場合は、コード例に示すように、少なくともSparkSession.builder.getOrCreate()
のインスタンスを 1 つインスタンス化する必要があります。 - Python コード ファイルを開いた状態で、実行中にコードを一時停止するブレークポイントを設定します。
- [実行] > [実行] または [実行] > [デバッグ] をクリックします。
具体的な実行とデバッグの手順については、プログラムの実行に関する記事 (英語) を参照してください。
Eclipse
Databricks Connect と Eclipse を使用するには、次のようにします。
databricks-connect get-jar-dir
を実行します。外部の JAR 構成をコマンドから返されたディレクトリに指定します。 [プロジェクト メニュー]>[プロパティ]>[Java Build Path]>[ライブラリ]>[Add External Jar](外部の JAR を追加) に移動します。
競合を回避するために、クラスパスから他の Spark インストールを削除することを強く推奨します。 これが不可能な場合は、追加する JAR がクラスパスの前に来るよう確認してください。 特に、インストールされている他のバージョンの Spark より前である必要があります (そうでない場合、他の Spark バージョンのいずれかを使用してローカルで実行するか、
ClassDefNotFoundError
をスローします)。
SBT
SBT で Databricks Connect を使用するには、通常の Spark ライブラリの依存関係ではなく、Databricks Connect JAR にリンクする build.sbt
ファイルを構成する必要があります。 次のビルド ファイルの例の unmanagedBase
ディレクティブで、これを行っています。これは、com.example.Test
メイン オブジェクトを持つ Scala アプリを想定しています。
build.sbt
name := "hello-world"
version := "1.0"
scalaVersion := "2.11.6"
// this should be set to the path returned by ``databricks-connect get-jar-dir``
unmanagedBase := new java.io.File("/usr/local/lib/python2.7/dist-packages/pyspark/jars")
mainClass := Some("com.example.Test")
Spark シェル
Spark シェルと Python または Scala で Databricks Connect を使用するには、次の手順に従います。
仮想環境がアクティブ化された状態で、「クライアントをセットアップする」で
databricks-connect test
コマンドが正常に実行されたことを確認します。仮想環境がアクティブ化された状態で、Spark シェルを起動します。 Python の場合は、
pyspark
コマンドを実行します。 Scala の場合は、spark-shell
コマンドを実行します。# For Python: pyspark
# For Scala: spark-shell
Spark シェルが表示されます。Python の例:
Python 3... (v3...) [Clang 6... (clang-6...)] on darwin Type "help", "copyright", "credits" or "license" for more information. Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). ../../.. ..:..:.. WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 3.... /_/ Using Python version 3... (v3...) Spark context Web UI available at http://...:... Spark context available as 'sc' (master = local[*], app id = local-...). SparkSession available as 'spark'. >>>
Scala の場合:
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). ../../.. ..:..:.. WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Spark context Web UI available at http://... Spark context available as 'sc' (master = local[*], app id = local-...). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3... /_/ Using Scala version 2... (OpenJDK 64-Bit Server VM, Java 1.8...) Type in expressions to have them evaluated. Type :help for more information. scala>
Spark シェルと Python または Scala を使用してクラスターでコマンドを実行する方法については、「Interactive Analysis with the Spark Shell」を参照してください。
組み込みの
spark
変数を使用して、実行中のクラスターのSparkSession
を表します。Python の例:>>> df = spark.read.table("samples.nyctaxi.trips") >>> df.show(5) +--------------------+---------------------+-------------+-----------+----------+-----------+ |tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|fare_amount|pickup_zip|dropoff_zip| +--------------------+---------------------+-------------+-----------+----------+-----------+ | 2016-02-14 16:52:13| 2016-02-14 17:16:04| 4.94| 19.0| 10282| 10171| | 2016-02-04 18:44:19| 2016-02-04 18:46:00| 0.28| 3.5| 10110| 10110| | 2016-02-17 17:13:57| 2016-02-17 17:17:55| 0.7| 5.0| 10103| 10023| | 2016-02-18 10:36:07| 2016-02-18 10:41:45| 0.8| 6.0| 10022| 10017| | 2016-02-22 14:14:41| 2016-02-22 14:31:52| 4.51| 17.0| 10110| 10282| +--------------------+---------------------+-------------+-----------+----------+-----------+ only showing top 5 rows
Scala の場合:
>>> val df = spark.read.table("samples.nyctaxi.trips") >>> df.show(5) +--------------------+---------------------+-------------+-----------+----------+-----------+ |tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|fare_amount|pickup_zip|dropoff_zip| +--------------------+---------------------+-------------+-----------+----------+-----------+ | 2016-02-14 16:52:13| 2016-02-14 17:16:04| 4.94| 19.0| 10282| 10171| | 2016-02-04 18:44:19| 2016-02-04 18:46:00| 0.28| 3.5| 10110| 10110| | 2016-02-17 17:13:57| 2016-02-17 17:17:55| 0.7| 5.0| 10103| 10023| | 2016-02-18 10:36:07| 2016-02-18 10:41:45| 0.8| 6.0| 10022| 10017| | 2016-02-22 14:14:41| 2016-02-22 14:31:52| 4.51| 17.0| 10110| 10282| +--------------------+---------------------+-------------+-----------+----------+-----------+ only showing top 5 rows
Spark シェルを停止するには、
Ctrl + d
またはCtrl + z
を押すか、quit()
またはexit()
コマンド、あるいは Python の場合は:q
、Scala の場合は:quit
を実行します。
コード例
この単純なコード例は、指定されたテーブルに対してクエリを実行して、指定されたテーブルの最初の 5 行を表示します。 異なるテーブルを使用するには、spark.read.table
への呼び出しを調整します。
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.table("samples.nyctaxi.trips")
df.show(5)
次の長いコード例の実行内容は次のとおりです。
- メモリ内 DataFrame を作成します。
default
スキーマ内にzzz_demo_temps_table
という名前のテーブルを作成します。 この名前のテーブルが既に存在する場合は、まずテーブルが削除されます。 異なるスキーマまたはテーブルを使用するには、spark.sql
、temps.write.saveAsTable
、または両方への呼び出しを調整します。- DataFrame の内容をテーブルに保存します。
- テーブルの内容に対して
SELECT
クエリを実行します。 - クエリの結果を表示します。
- テーブルを削除します。
Python
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from datetime import date
spark = SparkSession.builder.appName('temps-demo').getOrCreate()
# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
StructField('AirportCode', StringType(), False),
StructField('Date', DateType(), False),
StructField('TempHighF', IntegerType(), False),
StructField('TempLowF', IntegerType(), False)
])
data = [
[ 'BLI', date(2021, 4, 3), 52, 43],
[ 'BLI', date(2021, 4, 2), 50, 38],
[ 'BLI', date(2021, 4, 1), 52, 41],
[ 'PDX', date(2021, 4, 3), 64, 45],
[ 'PDX', date(2021, 4, 2), 61, 41],
[ 'PDX', date(2021, 4, 1), 66, 39],
[ 'SEA', date(2021, 4, 3), 57, 43],
[ 'SEA', date(2021, 4, 2), 54, 39],
[ 'SEA', date(2021, 4, 1), 56, 41]
]
temps = spark.createDataFrame(data, schema)
# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS zzz_demo_temps_table')
temps.write.saveAsTable('zzz_demo_temps_table')
# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " \
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
"GROUP BY AirportCode, Date, TempHighF, TempLowF " \
"ORDER BY TempHighF DESC")
df_temps.show()
# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode| Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# | PDX|2021-04-03| 64| 45|
# | PDX|2021-04-02| 61| 41|
# | SEA|2021-04-03| 57| 43|
# | SEA|2021-04-02| 54| 39|
# +-----------+----------+---------+--------+
# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE zzz_demo_temps_table')
Scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Date
object Demo {
def main(args: Array[String]) {
val spark = SparkSession.builder.master("local").getOrCreate()
// Create a Spark DataFrame consisting of high and low temperatures
// by airport code and date.
val schema = StructType(Array(
StructField("AirportCode", StringType, false),
StructField("Date", DateType, false),
StructField("TempHighF", IntegerType, false),
StructField("TempLowF", IntegerType, false)
))
val data = List(
Row("BLI", Date.valueOf("2021-04-03"), 52, 43),
Row("BLI", Date.valueOf("2021-04-02"), 50, 38),
Row("BLI", Date.valueOf("2021-04-01"), 52, 41),
Row("PDX", Date.valueOf("2021-04-03"), 64, 45),
Row("PDX", Date.valueOf("2021-04-02"), 61, 41),
Row("PDX", Date.valueOf("2021-04-01"), 66, 39),
Row("SEA", Date.valueOf("2021-04-03"), 57, 43),
Row("SEA", Date.valueOf("2021-04-02"), 54, 39),
Row("SEA", Date.valueOf("2021-04-01"), 56, 41)
)
val rdd = spark.sparkContext.makeRDD(data)
val temps = spark.createDataFrame(rdd, schema)
// Create a table on the Databricks cluster and then fill
// the table with the DataFrame's contents.
// If the table already exists from a previous run,
// delete it first.
spark.sql("USE default")
spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table")
temps.write.saveAsTable("zzz_demo_temps_table")
// Query the table on the Databricks cluster, returning rows
// where the airport code is not BLI and the date is later
// than 2021-04-01. Group the results and order by high
// temperature in descending order.
val df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
"GROUP BY AirportCode, Date, TempHighF, TempLowF " +
"ORDER BY TempHighF DESC")
df_temps.show()
// Results:
//
// +-----------+----------+---------+--------+
// |AirportCode| Date|TempHighF|TempLowF|
// +-----------+----------+---------+--------+
// | PDX|2021-04-03| 64| 45|
// | PDX|2021-04-02| 61| 41|
// | SEA|2021-04-03| 57| 43|
// | SEA|2021-04-02| 54| 39|
// +-----------+----------+---------+--------+
// Clean up by deleting the table from the Databricks cluster.
spark.sql("DROP TABLE zzz_demo_temps_table")
}
}
Java
import java.util.ArrayList;
import java.util.List;
import java.sql.Date;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.Dataset;
public class App {
public static void main(String[] args) throws Exception {
SparkSession spark = SparkSession
.builder()
.appName("Temps Demo")
.config("spark.master", "local")
.getOrCreate();
// Create a Spark DataFrame consisting of high and low temperatures
// by airport code and date.
StructType schema = new StructType(new StructField[] {
new StructField("AirportCode", DataTypes.StringType, false, Metadata.empty()),
new StructField("Date", DataTypes.DateType, false, Metadata.empty()),
new StructField("TempHighF", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("TempLowF", DataTypes.IntegerType, false, Metadata.empty()),
});
List<Row> dataList = new ArrayList<Row>();
dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-03"), 52, 43));
dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-02"), 50, 38));
dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-01"), 52, 41));
dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-03"), 64, 45));
dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-02"), 61, 41));
dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-01"), 66, 39));
dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-03"), 57, 43));
dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-02"), 54, 39));
dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-01"), 56, 41));
Dataset<Row> temps = spark.createDataFrame(dataList, schema);
// Create a table on the Databricks cluster and then fill
// the table with the DataFrame's contents.
// If the table already exists from a previous run,
// delete it first.
spark.sql("USE default");
spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table");
temps.write().saveAsTable("zzz_demo_temps_table");
// Query the table on the Databricks cluster, returning rows
// where the airport code is not BLI and the date is later
// than 2021-04-01. Group the results and order by high
// temperature in descending order.
Dataset<Row> df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
"GROUP BY AirportCode, Date, TempHighF, TempLowF " +
"ORDER BY TempHighF DESC");
df_temps.show();
// Results:
//
// +-----------+----------+---------+--------+
// |AirportCode| Date|TempHighF|TempLowF|
// +-----------+----------+---------+--------+
// | PDX|2021-04-03| 64| 45|
// | PDX|2021-04-02| 61| 41|
// | SEA|2021-04-03| 57| 43|
// | SEA|2021-04-02| 54| 39|
// +-----------+----------+---------+--------+
// Clean up by deleting the table from the Databricks cluster.
spark.sql("DROP TABLE zzz_demo_temps_table");
}
}
依存関係の動作
通常、メイン クラスまたは Python ファイルには、他の依存関係の JAR やファイルがあります。 sparkContext.addJar("path-to-the-jar")
や sparkContext.addPyFile("path-to-the-file")
を呼び出すことで、このような依存関係の JAR やファイルを追加することができます。 addPyFile()
インターフェイスを使用して、Egg ファイルと zip ファイルを追加することもできます。 IDE でコードを実行するたびに、依存関係の JAR やファイルがクラスターにインストールされます。
Python
from lib import Foo
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
#sc.setLogLevel("INFO")
print("Testing simple count")
print(spark.range(100).count())
print("Testing addPyFile isolation")
sc.addPyFile("lib.py")
print(sc.parallelize(range(10)).map(lambda i: Foo(2)).collect())
class Foo(object):
def __init__(self, x):
self.x = x
Python + Java UDFs
from pyspark.sql import SparkSession
from pyspark.sql.column import _to_java_column, _to_seq, Column
## In this example, udf.jar contains compiled Java / Scala UDFs:
#package com.example
#
#import org.apache.spark.sql._
#import org.apache.spark.sql.expressions._
#import org.apache.spark.sql.functions.udf
#
#object Test {
# val plusOne: UserDefinedFunction = udf((i: Long) => i + 1)
#}
spark = SparkSession.builder \
.config("spark.jars", "/path/to/udf.jar") \
.getOrCreate()
sc = spark.sparkContext
def plus_one_udf(col):
f = sc._jvm.com.example.Test.plusOne()
return Column(f.apply(_to_seq(sc, [col], _to_java_column)))
sc._jsc.addJar("/path/to/udf.jar")
spark.range(100).withColumn("plusOne", plus_one_udf("id")).show()
Scala
package com.example
import org.apache.spark.sql.SparkSession
case class Foo(x: String)
object Test {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
...
.getOrCreate();
spark.sparkContext.setLogLevel("INFO")
println("Running simple show query...")
spark.read.format("parquet").load("/tmp/x").show()
println("Running simple UDF query...")
spark.sparkContext.addJar("./target/scala-2.11/hello-world_2.11-1.0.jar")
spark.udf.register("f", (x: Int) => x + 1)
spark.range(10).selectExpr("f(id)").show()
println("Running custom objects query...")
val objs = spark.sparkContext.parallelize(Seq(Foo("bye"), Foo("hi"))).collect()
println(objs.toSeq)
}
}
Databricks ユーティリティにアクセスする
このセクションでは、Databricks Connect を使用して Databricks ユーティリティにアクセスする方法について説明します。
Databricks Utilities (dbutils) リファレンス モジュールの dbutils.fs
と dbutils.secrets
のユーティリティを使用できます。
サポートされるコマンドは dbutils.fs.cp
、dbutils.fs.head
、dbutils.fs.ls
、dbutils.fs.mkdirs
、dbutils.fs.mv
、dbutils.fs.put
、dbutils.fs.rm
、dbutils.secrets.get
、dbutils.secrets.getBytes
、dbutils.secrets.list
、dbutils.secrets.listScopes
です。
ファイル システム ユーティリティ (dbutils.fs) を参照、または dbutils.fs.help()
を実行します。および、シークレット ユーティリティ (dbutils.secrets) を参照、または dbutils.secrets.help()
を実行します。
Python
from pyspark.sql import SparkSession
from pyspark.dbutils import DBUtils
spark = SparkSession.builder.getOrCreate()
dbutils = DBUtils(spark)
print(dbutils.fs.ls("dbfs:/"))
print(dbutils.secrets.listScopes())
Databricks Runtime 7.3 LTS 以上を使用する場合に、ローカルと Azure Databricks クラスターの両方で動作する方法で DBUtils モジュールにアクセスするには、次のように get_dbutils()
を使用します。
def get_dbutils(spark):
from pyspark.dbutils import DBUtils
return DBUtils(spark)
それ以外の場合は、次のように get_dbutils()
を使用します。
def get_dbutils(spark):
if spark.conf.get("spark.databricks.service.client.enabled") == "true":
from pyspark.dbutils import DBUtils
return DBUtils(spark)
else:
import IPython
return IPython.get_ipython().user_ns["dbutils"]
Scala
val dbutils = com.databricks.service.DBUtils
println(dbutils.fs.ls("dbfs:/"))
println(dbutils.secrets.listScopes())
ローカルおよびリモートのファイル システム間でのファイルのコピー
dbutils.fs
を使用して、クライアントおよびリモートのファイル システム間でファイルをコピーできます。 スキーム file:/
は、クライアント上のローカル ファイル システムを参照します。
from pyspark.dbutils import DBUtils
dbutils = DBUtils(spark)
dbutils.fs.cp('file:/home/user/data.csv', 'dbfs:/uploads')
dbutils.fs.cp('dbfs:/output/results.csv', 'file:/home/user/downloads/')
この方法で転送できる最大ファイル サイズは 250 MB です。
[dbutils.secrets.get
] を有効にします
セキュリティの制限により、dbutils.secrets.get
を呼び出す機能は既定では無効になっています。 この機能をワークスペースで有効にするには、Azure Databricks サポートにお問い合わせください。
Hadoop 構成を設定する
クライアントでは、spark.conf.set
API を使用して Hadoop 構成を設定できます。これは、SQL と DataFrame の操作に適用されます。 sparkContext
に設定された Hadoop 構成は、クラスター構成に設定するか、ノートブックを使用する必要があります。 これは、sparkContext
に設定された構成はユーザー セッションに関連付けられるのではなく、クラスター全体に適用されるからです。
トラブルシューティング
databricks-connect test
を実行して、接続の問題を確認します。 このセクションでは、Databricks Connect で発生する可能性がある一般的な問題とその解決方法について説明します。
このセクションの内容は次のとおりです。
- Python のバージョンが一致しない
- サーバーが有効ではない
- 競合している PySpark インストール
- 競合している
SPARK_HOME
- バイナリの
PATH
エントリの競合または欠落 - クラスターのシリアル化設定の競合
- Windows で
winutils.exe
が見つからない - Windows 上のファイル名、ディレクトリ名、ボリューム ラベル構文が正しくない
Python のバージョンが一致しない
ローカルで使用している Python バージョンのマイナー リリースが、少なくともクラスター上のバージョンと同じ (たとえば、3.9.16
と 3.9.15
なら OK で、3.9
と 3.8
ではダメ) ことを確認します。
複数の Python バージョンがローカルにインストールされている場合は、PYSPARK_PYTHON
環境変数 (たとえば、PYSPARK_PYTHON=python3
) を設定して、Databricks Connect で適切なバージョンが使用されていることを確認します。
サーバーが有効ではない
クラスターで Spark サーバーが spark.databricks.service.server.enabled true
で有効になっているか確認します。 有効であれば、ドライバー ログに次の行が表示されます。
../../.. ..:..:.. INFO SparkConfUtils$: Set spark config:
spark.databricks.service.server.enabled -> true
...
../../.. ..:..:.. INFO SparkContext: Loading Spark Service RPC Server
../../.. ..:..:.. INFO SparkServiceRPCServer:
Starting Spark Service RPC Server
../../.. ..:..:.. INFO Server: jetty-9...
../../.. ..:..:.. INFO AbstractConnector: Started ServerConnector@6a6c7f42
{HTTP/1.1,[http/1.1]}{0.0.0.0:15001}
../../.. ..:..:.. INFO Server: Started @5879ms
競合している PySpark インストール
databricks-connect
パッケージが PySpark と競合している。 両方をインストールすると、Python で Spark コンテキストを初期化するときにエラーが発生します。 これは、"ストリームの破損" エラーや "クラスが見つかりません" エラーなど、いくつかのパターンで現れる可能性があります。 Python 環境に PySpark がインストールされている場合は、Databricks Connect をインストールする前に PySpark をアンインストールしておくようにしてください。 PySpark をアンインストールした後、Databricks Connect パッケージをすべて再インストールしてください。
pip3 uninstall pyspark
pip3 uninstall databricks-connect
pip3 install --upgrade "databricks-connect==12.2.*" # or X.Y.* to match your specific cluster version.
SPARK_HOME
の 競合
以前にマシンで Spark を使用していた場合は、Databricks Connect Spark ではなく、他のバージョンの Spark のいずれかを使用するように IDE が構成されることがあります。 これは、"ストリームの破損" エラーや "クラスが見つかりません" エラーなど、いくつかのパターンで現れる可能性があります。 SPARK_HOME
環境変数の値を確認すると、使用されている Spark のバージョンがわかります。
Python
import os
print(os.environ['SPARK_HOME'])
Scala
println(sys.env.get("SPARK_HOME"))
Java
System.out.println(System.getenv("SPARK_HOME"));
解決方法
SPARK_HOME
がクライアント内のバージョン以外のバージョンの Spark に設定されている場合、SPARK_HOME
変数の設定を解除して、もう一度やり直す必要があります。
IDE 環境変数の設定、.bashrc
、.zshrc
、.bash_profile
のファイル、および他の環境変数が設定されている可能性がある場所を確認します。 古い状態を消去するには IDE を終了して再起動しなければならない可能性が高く、問題が解決しない場合は、さらに新しいプロジェクトを作成する必要がある場合もあります。
SPARK_HOME
を新しい値に設定する必要はありません。設定を解除すれば十分です。
バイナリの PATH
エントリの競合または欠落
spark-shell
のようなコマンドが、Databricks Connect のバイナリではなく、以前にインストールされた他のバイナリを実行するよう、PATH が構成されている場合があります。 これにより、databricks-connect test
が失敗する可能性があります。 Databricks Connect のバイナリを優先とするか、以前にインストールしたバイナリを削除する必要があります。
spark-shell
のようなコマンドを実行できない場合、 PATH が pip3 install
によって自動的に設定されていない可能性があり、インストール bin
ディレクトリを PATH に手動で追加する必要があります。 これが設定されていない場合でも、IDE で Databricks Connect を使用できます。 ただし、databricks-connect test
コマンドは動作しません。
クラスターのシリアル化設定の競合
databricks-connect test
の実行中に "ストリームが破損しています" というエラーが表示される場合は、クラスターのシリアル化構成に互換性がないことが原因である可能性があります。 たとえば、spark.io.compression.codec
構成の設定によって、この問題が発生する場合があります。 この問題を解決するには、クラスター設定からこれらの構成を削除するか、Databricks Connect クライアントで構成を設定してください。
Windows で winutils.exe
が見つからない
Databricks Connect を Windows で使用していて、以下が表示される場合
ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
Windows で Hadoop パスを構成する手順に従ってください。
Windows 上のファイル名、ディレクトリ名、ボリューム ラベル構文が正しくない
Windows と Databricks Connect を使用していて、次のように表示される場合
The filename, directory name, or volume label syntax is incorrect.
Java または Databricks Connect がスペースを含むパスを持つディレクトリにインストールされています。 これに対処するには、スペースのないディレクトリ パスに をインストールするか、短縮名形式を使用してパスを構成します。
Microsoft Entra ID トークンを使用した認証
Note
次の情報は、Databricks Connect バージョン 7.3.5 から 12.2.x にのみ適用されます。
Databricks Runtime 13.3 LTS 以降用の Databricks Connect は、現在 Microsoft Entra ID トークンをサポートしていません。
Databricks Connect バージョン 7.3.5 から 12.2.x を使用するときは、個人用アクセス トークンではなく Microsoft Entra ID トークンを使用して認証できます。 Microsoft Entra ID トークンは有効期間は限られています。 Microsoft Entra ID トークンが期限切れの場合、Databricks Connect は Invalid Token
エラーで失敗となります。
Databricks Connect バージョン 7.3.5 から 12.2.x では、実行中の Databricks Connect アプリケーションに Microsoft Entra ID トークンを提供できます。 アプリケーションは、新しいアクセス トークンを取得し、それを spark.databricks.service.token
SQL 構成キーに設定する必要があります。
Python
spark.conf.set("spark.databricks.service.token", new_aad_token)
Scala
spark.conf.set("spark.databricks.service.token", newAADToken)
トークンを更新すると、アプリケーションは同じ SparkSession
、およびそのセッションのコンテキストで作成されたオブジェクトと状態を引き続き使用できます。 断続的なエラーを回避するために、Databricks では、古いトークンの有効期限が切れる前に新しいトークンを提供することをお勧めしています。
Microsoft Entra ID トークンの有効期間を延長して、アプリケーションの実行中、維持することができます。 これを行うには、使用した Microsoft Entra ID 認証アプリケーションに、適切な長さの有効期間のTokenLifetimePolicyをアタッチして、アクセストークンを取得します。
Note
Microsoft Entra ID パススルーは 2 つのトークンを使用します。前に説明した Databricks Connect バージョン 7.3.5 から 12.2.x で構成する Microsoft Entra ID アクセス トークンと、Databricks が要求を処理するときに Databricks が生成する特定のリソースのための ADLS パススルー トークンです。 Microsoft Entra ID トークンの有効期間ポリシーを使用して、ADLS パススルー トークンの有効期間を延長することはできません。 1 時間以上かかるコマンドをクラスターに送信した場合、1 時間経過した後にコマンドが ADLS リソースにアクセスすると、失敗します。
制限事項
構造化ストリーム。
リモートクラスター上の Spark ジョブの一部ではない任意のコードの実行。
デルタ テーブル操作用の Native Scala、Python、R API (
DeltaTable.forPath
など) はサポートされていません。 ただし、Delta Lake 操作の SQL API (spark.sql(...)
) と Spark API (spark.read.load
など) はどちらもサポートされています。COPY INTO 機能。
サーバーのカタログの一部である SQL 関数、Python または Scala の UDF の使用。 ただし、ローカルに導入されている Scala と Python UDF は機能します。
Apache Zeppelin 0.7.x 以下。
テーブル アクセス制御によるクラスターへの接続。
プロセス分離が有効になっているクラスターへの接続 (つまり、
spark.databricks.pyspark.enableProcessIsolation
がtrue
に設定されている場合)。Delta
CLONE
SQL コマンド。グローバル一時ビュー。
Koalas と
pyspark.pandas
。CREATE TABLE table AS SELECT ...
SQL コマンドは常に機能するとは限りません。 代わりにspark.sql("SELECT ...").write.saveAsTable("table")
を使用してください。Microsoft Entra ID 資格情報のパススルーは Databricks Runtime 7.3 LTS 以降を実行している標準クラスターでのみサポートされており、サービスプリンシパル認証と互換性がありません。