次の方法で共有


JDBC を使用したデータベースのクエリ

Azure Databricks では、JDBC を使用した外部データベースへの接続がサポートされています。 この記事では、Python、SQL、Scala の例でこれらの接続を構成および使用するための基本的な構文について説明します。

重要

この記事で説明する構成は試験段階です。 実験用の機能は現状のまま提供されていて、カスタマー テクニカル サポートを通じて Databricks でサポートされているわけではありません。 クエリフェデレーションの完全なサポートを得るには、代わりにレイクハウス フェデレーション を使用する必要があります。これにより、Azure Databricks ユーザーは Unity Catalog 構文とデータ ガバナンス ツールを利用できます。

Partner Connect は、多くの外部データ ソースとデータを同期するための最適化された統合を提供します。 「Databricks Partner Connect とは」を参照してください。

重要

この記事の例には、JDBC URL のユーザー名とパスワードは含まれていません。 Databricks では、シークレットを使用してデータベース資格情報を格納することをお勧めします。 次に例を示します。

Python

username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")

Scala

val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")

SQL で Databricks シークレットを参照するには、クラスターの初期化中に Spark 構成プロパティを構成する必要があります。

シークレット管理の完全な例については、「 Tutorial: Databricks シークレットの作成と使用を参照してください。

JDBC を使用したデータの読み取り

JDBC を使用してデータを読み取るには、いくつかの設定を構成する必要があります。 データベースごとに <jdbc-url> の異なる形式が使用されていることに注意してください。

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Spark は、データベース テーブルからスキーマを自動的に読み取り、その型を Spark SQL に戻します。

Python

employees_table.printSchema

SQL

DESCRIBE employees_table_vw

Scala

employees_table.printSchema

この JDBC テーブルに対してクエリを実行できます。

Python

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

SQL

SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age

Scala

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

JDBC を使用したデータの書き込み

JDBC を使用してテーブルにデータを保存するときには、読み取りのときと同様の構成を使用します。 次の例を参照してください。

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

既定の動作では、新しいテーブルの作成を試行し、同じ名前のテーブルが既に存在する場合にはエラーをスローします。

次の構文を使用して、既存のテーブルにデータを追加できます。

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()
)

SQL

CREATE TABLE IF NOT EXISTS new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
);

INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()

次の構文を使用して、既存のテーブルのデータを上書きできます。

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()
)

SQL

CREATE OR REPLACE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()

JDBC クエリの並列処理を制御する

既定では、JDBC ドライバーは 1 つのスレッドのみを使用してソース データベースに対してクエリを実行します。 読み取りのパフォーマンスを向上させるには、Azure Databricks がデータベースに対して同時に実行するクエリの数を制御するためのオプションをいくつか指定する必要があります。 小規模なクラスターの場合、numPartitions オプションをクラスター内の Executor コアの数と同じに設定すると、すべてのノードがデータのクエリを並列で実行できます。

警告

大規模なクラスターで numPartitions を大きい値に設定すると、同時クエリが多すぎるとサービスが過負荷状態になる可能性があるため、リモート データベースのパフォーマンスが低下する可能性があります。 これは、アプリケーション データベースでは特に問題になります。 これを 50 より大きい値に設定する場合は注意してください。

注意

ソース データベースで計算された partitionColumn のインデックスを持つ列を選択して、クエリを高速化します。

次のコード例は、8 つのコアを持つクラスターの並列処理を構成する方法を示しています。

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  # a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  # lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  # max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  # number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>',
  partitionColumn "<partition-key>",
  lowerBound "<min-value>",
  upperBound "<max-value>",
  numPartitions 8
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  // a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  // lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  // max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  // number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()

注意

Azure Databricks では、JDBC を構成するためのすべての Apache Spark オプションがサポートされています。

JDBC を使用してデータベースに書き込む場合、Apache Spark はメモリ内のパーティションの数を使用して並列処理を制御します。 書き込み前にデータを再パーティション化して並列処理を制御できます。 リモート データベースの過負荷状態を回避するために、大規模なクラスター上のパーティションの数が多くならないようにします。 次の例では、書き込む前に 8 つのパーティションに再パーティション化する方法を示します。

Python

(employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw

Scala

employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

クエリをデータベース エンジンにプッシュ ダウンする

クエリ全体をデータベースにプッシュ ダウンし、結果だけを返すことができます。 パラメーター table は、読み取る JDBC テーブルを識別します。 SQL クエリ FROM 句の中の有効なものを使用できます。

Python

pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "(select * from employees where emp_no < 10008) as emp_alias",
  user '<username>',
  password '<password>'
)

Scala

val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

クエリごとにフェッチされる行数を制御する

ドライバーには、リモート JDBC データベースから一度にフェッチされる行の数を制御する fetchSize パラメーターがあります。

Setting 結果
低すぎる 多くのラウンドトリップのために待機時間が長くなる (クエリごとに返される行は数行)
高すぎる メモリ不足エラー (1 つのクエリで返されるデータが多すぎる)

最適な値はワークロードに依存します。 考慮事項は次のとおりです。

  • クエリによって返される列の数はいくつですか?
  • どのようなデータ型が返されますか?
  • 各列の文字列を返すのにかかる時間はどれくらいですか?

システムの既定値が非常に小さいために調整するとメリットが得られる場合があります。 たとえば、Oracle の fetchSize の既定値は 10 です。 これを 100 に増やすと、実行する必要があるクエリの合計数が 10 分の 1 に減ります。 JDBC の結果はネットワーク トラフィックであるため、非常に大きな数は避けるべきですが、多くのデータセットでは最適な値が数千になる可能性があります。

次の例のように、fetchSize オプションを使用します。

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'.
  fetchSize 100
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()