Azure Databricks ジョブで Databricks SQL を使用する
Azure Databricks ジョブで SQL タスクの種類を使用すると、クエリ、レガシ、ダッシュボード、アラートなどの Databricks SQL オブジェクトを含むワークフローを作成、スケジュール、操作、監視できます。 たとえば、ワークフローで、データの取り込み、データの準備、Databricks SQL クエリを使用した分析の実行、レガシ ダッシュボードへの結果の表示などを行うことができます。
この記事では、GitHub コントリビューションのメトリックを表示するレガシ ダッシュボードを作成するワークフローの例を示します。 この例では、次の操作を行います。
- Python スクリプトと GitHub REST API を使用して GitHub データを取り込む。
- Delta Live Tables パイプラインを使用して GitHub データを変換する。
- 準備されたデータの分析を実行する Databricks SQL クエリをトリガーする。
- レガシ ダッシュボードに分析を表示する。
始める前に
このチュートリアルを完了するには、次のものが必要です。
- GitHub 個人用アクセス トークン。 このトークンには、リポジトリのアクセス許可が必要です。
- サーバーレス SQL ウェアハウスまたはプロ SQL ウェアハウス。 「SQL ウェアハウスの種類」を参照してください。
- Databricks シークレット スコープ。 シークレット スコープは、GitHub トークンを安全に格納するために使用されます。 「手順 1: GitHub トークンをシークレットに格納する」を参照してください。
手順 1: GitHub トークンをシークレットに格納する
Databricks では、ジョブ内の GitHub 個人用アクセス トークンなどの資格情報をハードコーディングする代わりに、シークレット スコープを使用してシークレットを安全に格納および管理することをお勧めします。 次の Databricks CLI コマンドは、シークレット スコープを作成し、そのスコープ内のシークレットに GitHub トークンを格納する例です。
databricks secrets create-scope <scope-name>
databricks secrets put-secret <scope-name> <token-key> --string-value <token>
<scope-name
を、トークンを格納する Azure Databricks シークレット スコープの名前に置き換えます。<token-key>
を、トークンに割り当てるキーの名前に置き換えます。<token>
を、GitHub 個人用アクセス トークンの値に置き換えます。
手順 2: GitHub データをフェッチするスクリプトを作成する
次の Python スクリプトでは、GitHub REST API を使用して、GitHub リポジトリからコミットとコントリビューションに関するデータをフェッチします。 入力引数は GitHub リポジトリを指定します。 レコードは、別の入力引数で指定された DBFS 内の場所に保存されます。
この例では、DBFS を使用して Python スクリプトを保存しますが、Databricks Git フォルダーまたはワークスペース ファイルを使用して、スクリプトを保存および管理することもできます。
このスクリプトをローカル ディスク上の場所に保存します。
import json import requests import sys api_url = "https://api.github.com" def get_commits(owner, repo, token, path): page = 1 request_url = f"{api_url}/repos/{owner}/{repo}/commits" more = True get_response(request_url, f"{path}/commits", token) def get_contributors(owner, repo, token, path): page = 1 request_url = f"{api_url}/repos/{owner}/{repo}/contributors" more = True get_response(request_url, f"{path}/contributors", token) def get_response(request_url, path, token): page = 1 more = True while more: response = requests.get(request_url, params={'page': page}, headers={'Authorization': "token " + token}) if response.text != "[]": write(path + "/records-" + str(page) + ".json", response.text) page += 1 else: more = False def write(filename, contents): dbutils.fs.put(filename, contents) def main(): args = sys.argv[1:] if len(args) < 6: print("Usage: github-api.py owner repo request output-dir secret-scope secret-key") sys.exit(1) owner = sys.argv[1] repo = sys.argv[2] request = sys.argv[3] output_path = sys.argv[4] secret_scope = sys.argv[5] secret_key = sys.argv[6] token = dbutils.secrets.get(scope=secret_scope, key=secret_key) if (request == "commits"): get_commits(owner, repo, token, output_path) elif (request == "contributors"): get_contributors(owner, repo, token, output_path) if __name__ == "__main__": main()
スクリプトを DBFS にアップロードします。
- Azure Databricks のランディング ページに移動し、サイドバーの [カタログ] をクリックします。
- [DBFS を参照] をクリックします。
- DBFS ファイル ブラウザーで、[アップロード] をクリックします。 [DBFS へのデータのアップロード] ダイアログが表示されます。
- スクリプトを格納するパスを DBFS に入力して [アップロードするファイルのドロップ] をクリックするか、または [クリックして参照] をクリックして Python スクリプトを選択します。
- [Done] をクリックします。
手順 3: GitHub データを処理する Delta Live Tables パイプラインを作成する
このセクションでは、Delta Live Tables パイプラインを作成して、生の GitHub データを Databricks SQL クエリで分析できるテーブルに変換します。 パイプラインを作成するには、次の手順を行います。
サイドバーで、 [新規] をクリックし、メニューから [ノートブック] を選択します。 [ノートブックの作成] ダイアログボックスが表示されます。
[既定の言語] に名前を入力し、[Python] を選択します。 [クラスター] は既定値のままの設定でかまいません。 Delta Live Tables ランタイムでは、パイプラインを実行する前にクラスターを作成します。
Create をクリックしてください。
Python のコード例をコピーし、新しいノートブックに貼り付けます。 ノートブックの 1 つのセルまたは複数のセルにコード例を追加できます。
import dlt from pyspark.sql.functions import * def parse(df): return (df .withColumn("author_date", to_timestamp(col("commit.author.date"))) .withColumn("author_email", col("commit.author.email")) .withColumn("author_name", col("commit.author.name")) .withColumn("comment_count", col("commit.comment_count")) .withColumn("committer_date", to_timestamp(col("commit.committer.date"))) .withColumn("committer_email", col("commit.committer.email")) .withColumn("committer_name", col("commit.committer.name")) .withColumn("message", col("commit.message")) .withColumn("sha", col("commit.tree.sha")) .withColumn("tree_url", col("commit.tree.url")) .withColumn("url", col("commit.url")) .withColumn("verification_payload", col("commit.verification.payload")) .withColumn("verification_reason", col("commit.verification.reason")) .withColumn("verification_signature", col("commit.verification.signature")) .withColumn("verification_verified", col("commit.verification.signature").cast("string")) .drop("commit") ) @dlt.table( comment="Raw GitHub commits" ) def github_commits_raw(): df = spark.read.json(spark.conf.get("commits-path")) return parse(df.select("commit")) @dlt.table( comment="Info on the author of a commit" ) def commits_by_author(): return ( dlt.read("github_commits_raw") .withColumnRenamed("author_date", "date") .withColumnRenamed("author_email", "email") .withColumnRenamed("author_name", "name") .select("sha", "date", "email", "name") ) @dlt.table( comment="GitHub repository contributors" ) def github_contributors_raw(): return( spark.readStream.format("cloudFiles") .option("cloudFiles.format", "json") .load(spark.conf.get("contribs-path")) )
サイドバーで、 [ワークフロー] をクリックし、[Delta Live Tables] タブをクリックし、[パイプラインを作成] をクリックします。
パイプラインに名前 (例:
Transform GitHub data
) を付けます。[ノートブック ライブラリ] フィールドにノートブックへのパスを入力するか、 をクリックしてノートブックを選択します。
[構成の追加] をクリックします。
Key
テキスト ボックスに、「commits-path
」と入力します。Value
テキスト ボックスに、GitHub レコードを書き込む DBFS パスを入力します。 これには任意のパスを選択できます。これは、ワークフローを作成するときに、最初の Python タスクの構成で使用するパスと同じです。もう一度 [構成の追加] をクリックします。
Key
テキスト ボックスに、「contribs-path
」と入力します。Value
テキスト ボックスに、GitHub レコードを書き込む DBFS パスを入力します。 これには任意のパスを選択できます。これは、ワークフローを作成するときに、2 番目の Python タスクの構成で使用するパスと同じです。[ターゲット] フィールドに、ターゲットのデータベース (例:
github_tables
) を入力します。 ターゲットのデータベースを設定すると、出力データがメタストアに発行されます。これは、パイプラインによって生成されたデータを分析するダウンストリーム クエリに必要です。[保存] をクリックします。
手順 4: GitHub データを取り込んで変換するワークフローを作成する
Databricks SQL を使用して GitHub データを分析して視覚化する前に、データを取り込んで準備する必要があります。 これらのタスクを完了するワークフローを作成するには、次の手順を行います。
Azure Databricks ジョブを作成し、最初のタスクを追加する
Azure Databricks のランディング ページに移動し、次のいずれかの操作を行います。
- サイドバーで、 [ワークフロー] をクリックし、 をクリックします。
- サイドバーで、 [新規] をクリックし、メニューから [ジョブ] を選択します。
[タスク] タブに表示されるタスクのダイアログ ボックスで、[ジョブの名前を追加...] をジョブ名 (例:
GitHub analysis workflow
) に置き換えます。[タスク名] にタスクの名前 (例:
get_commits
) を入力します。[種類] で、[Python スクリプト] を選択します。
[ソース] で、[DBFS/ S3] を選択します。
[パス] に、DBFS 内のスクリプトへのパスを入力します。
[パラメーター] に、Python スクリプトの次の引数を入力します。
["<owner>","<repo>","commits","<DBFS-output-dir>","<scope-name>","<github-token-key>"]
<owner>
を、リポジトリ所有者の名前に置き換えます。 たとえば、github.com/databrickslabs/overwatch
リポジトリからレコードをフェッチするには、「databrickslabs
」と入力します。<repo>
を、リポジトリ名 (例:overwatch
) に置き換えます。<DBFS-output-dir>
を、GitHub からフェッチされたレコードを格納する DBFS 内のパスに置き換えます。<scope-name>
を、GitHub トークンを格納するために作成したシークレット スコープの名前に置き換えます。<github-token-key>
を、GitHub トークンに割り当てたキーの名前に置き換えます。
[タスクの保存] をクリックします。
別のタスクを追加する
作成したタスクの下にある をクリックします。
[タスク名] にタスクの名前 (例:
get_contributors
) を入力します。[種類] で、タスクの種類 [Python スクリプト] を選択します。
[ソース] で、[DBFS/ S3] を選択します。
[パス] に、DBFS 内のスクリプトへのパスを入力します。
[パラメーター] に、Python スクリプトの次の引数を入力します。
["<owner>","<repo>","contributors","<DBFS-output-dir>","<scope-name>","<github-token-key>"]
<owner>
を、リポジトリ所有者の名前に置き換えます。 たとえば、github.com/databrickslabs/overwatch
リポジトリからレコードをフェッチするには、「databrickslabs
」と入力します。<repo>
を、リポジトリ名 (例:overwatch
) に置き換えます。<DBFS-output-dir>
を、GitHub からフェッチされたレコードを格納する DBFS 内のパスに置き換えます。<scope-name>
を、GitHub トークンを格納するために作成したシークレット スコープの名前に置き換えます。<github-token-key>
を、GitHub トークンに割り当てたキーの名前に置き換えます。
[タスクの保存] をクリックします。
データを変換するタスクを追加する
- 作成したタスクの下にある をクリックします。
- [タスク名] にタスクの名前 (例:
transform_github_data
) を入力します。 - [種類] で [Delta Live Tables パイプライン] を選択し、タスクの名前を入力します。
- [パイプライン] で、「手順 3: GitHub データを処理する Delta Live Tables パイプラインを作成する」で作成したパイプラインを選択します。
- Create をクリックしてください。
手順 5: データ変換ワークフローを実行する
をクリックしてワークフローを実行します。 実行の詳細を表示するには、[ジョブの実行] ビューの実行の [開始時刻] 列にあるリンクをクリックします。 各タスクをクリックすると、そのタスクの実行の詳細が表示されます。
手順 6: (省略可能) ワークフローの実行が完了した後に出力データを表示するには、次の手順を行います。
- 実行の詳細ビューで、Delta Live Tables タスクをクリックします。
- [タスクの実行の詳細] パネルで、[パイプライン] の下にあるパイプライン名をクリックします。 [パイプラインの詳細] ページが表示されます。
- パイプライン DAG で
commits_by_author
テーブルを選択します。 - [commits_by_author] パネルの [メタストア] の横にあるテーブル名をクリックします。 Catalog Explorer ページが開きます。
Catalog Explorer では、テーブル スキーマ、サンプル データ、およびデータに関するその他の詳細を表示できます。 同じ手順に従って、github_contributors_raw
テーブルのデータを表示します。
手順 7: GitHub データを削除する
実際のアプリケーションでは、データを継続的に取り込んで処理する場合があります。 この例ではデータ セット全体をダウンロードして処理するため、ワークフローを再実行するときにエラーが発生しないように、既にダウンロードした GitHub データを削除する必要があります。 ダウンロードしたデータを削除するには、次の手順を行います。
新しいノートブックを作成し、最初のセルに次のコマンドを入力します。
dbutils.fs.rm("<commits-path", True) dbutils.fs.rm("<contributors-path", True)
<commits-path>
と<contributors-path>
を、Python タスクを作成したときに構成した DBFS パスに置き換えます。をクリックし、[セルの実行] を選択します。
このノートブックをタスクとしてワークフローに追加することもできます。
手順 8: Databricks SQL クエリを作成する
ワークフローを実行し、必要なテーブルを作成したら、準備されたデータを分析するクエリを作成します。 クエリと視覚化の例を作成するには、次の手順を行います。
月別の上位 10 人の共同作成者を表示する
サイドバーで、Databricks ロゴ の下にあるアイコンをクリックし、[SQL] を選択します。
[クエリの作成] をクリックして、Databricks SQL クエリ エディターを開きます。
カタログが [hive_metastore] に設定されていることを確認します。 hive_metastore の横にある [既定値] をクリックし、データベースを、Delta Live Tables パイプラインで設定した [ターゲット] 値に設定します。
[新しいクエリ] タブに次のクエリを入力します。
SELECT date_part('YEAR', date) AS year, date_part('MONTH', date) AS month, name, count(1) FROM commits_by_author WHERE name IN ( SELECT name FROM commits_by_author GROUP BY name ORDER BY count(name) DESC LIMIT 10 ) AND date_part('YEAR', date) >= 2022 GROUP BY name, year, month ORDER BY year, month, name
[新しいクエリ] タブをクリックし、クエリの名前を変更します (例:
Commits by month top 10 contributors
)。既定では、結果はテーブルとして表示されます。 データの視覚化方法を、たとえば横棒グラフを使用するように変更するには、[結果] パネルで、 をクリックし、[編集] をクリックします。
[視覚化の種類] で、[横棒] を選択します。
[X 列] で、[月] を選択します。
[Y 列] で、[カウント (1)] を選択します。
[グループ化] で、[名前] を選択します。
[保存] をクリックします。
上位 20 人の共同作成者を表示する
[+] > [新しいクエリの作成] をクリックし、カタログが hive_metastore に設定されていることを確認します。 hive_metastore の横にある [既定値] をクリックし、データベースを、Delta Live Tables パイプラインで設定した [ターゲット] 値に設定します。
次のクエリを入力します。
SELECT login, cast(contributions AS INTEGER) FROM github_contributors_raw ORDER BY contributions DESC LIMIT 20
[新しいクエリ] タブをクリックし、クエリの名前を変更します (例:
Top 20 contributors
)。視覚化を既定のテーブルから変更するには、[結果] パネルで、 をクリックし、[編集] をクリックします。
[視覚化の種類] で、[横棒] を選択します。
[X 列] で、[ログイン] を選択します。
[Y 列] で、[コントリビューション] を選択します。
[保存] をクリックします。
作成者別の合計コミット数を表示する
[+] > [新しいクエリの作成] をクリックし、カタログが hive_metastore に設定されていることを確認します。 hive_metastore の横にある [既定値] をクリックし、データベースを、Delta Live Tables パイプラインで設定した [ターゲット] 値に設定します。
次のクエリを入力します。
SELECT name, count(1) commits FROM commits_by_author GROUP BY name ORDER BY commits DESC LIMIT 10
[新しいクエリ] タブをクリックし、クエリの名前を変更します (例:
Total commits by author
)。視覚化を既定のテーブルから変更するには、[結果] パネルで、 をクリックし、[編集] をクリックします。
[視覚化の種類] で、[横棒] を選択します。
[X 列] で、[名前] を選択します。
[Y 列] で、[コミット数] を選択します。
[保存] をクリックします。
手順 9: ダッシュボードを作成する
- サイドバーで [ダッシュボード] をクリックします
- [ダッシュボードの作成] をクリックします。
- ダッシュボードの名前 (例:
GitHub analysis
) を入力します。 - 「手順 8: Databricks SQL クエリを作成する」で作成したクエリと視覚化ごとに、[追加] > [視覚化] をクリックし、各視覚化を選択します。
手順 10: SQL タスクをワークフローに追加する
「Azure Databricks ジョブを作成し、最初のタスクを追加する」で作成したワークフローに新しいクエリ タスクを追加するには、「手順 8: Databricks SQL クエリを作成する」で作成したクエリごとに次の操作を行います。
- サイドバーの [ワークフロー] をクリックします。
- [名前] 列で、ジョブ名をクリックします。
- [タスク] タブをクリックします。
- 最後のタスクの下にある をクリックします。
- タスクの名前を入力し、[種類] で、[SQL] を選択し、[SQL タスク] で、[クエリ] を選択します。
- [SQL クエリ] でクエリを選択します。
- [SQL ウェアハウス] で、タスクを実行するサーバーレス SQL ウェアハウスまたは pro SQL ウェアハウスを選択します。
- Create をクリックしてください。
手順 11: ダッシュボード タスクを追加する
- 最後のタスクの下にある をクリックします。
- タスクの名前を入力し、[種類] で、[SQL] を選択し、[SQL タスク] で、[レガシ ダッシュボード] を選択します。
- 「手順 9: ダッシュボードを作成する」で作成したダッシュボードを選択します。
- [SQL ウェアハウス] で、タスクを実行するサーバーレス SQL ウェアハウスまたは pro SQL ウェアハウスを選択します。
- Create をクリックしてください。
手順 12: 完成したワークフローを実行する
ワークフローを実行するために、 をクリックします。 実行の詳細を表示するには、[ジョブの実行] ビューの実行の [開始時刻] 列にあるリンクをクリックします。
手順 13: 結果を表示する
実行が完了したときに結果を表示するには、最終的なダッシュボード タスクをクリックし、右側のパネルの [SQL ダッシュボード] の下にあるダッシュボード名をクリックします。