次の方法で共有


Azure Databricks ジョブで Databricks SQL を使用する

Azure Databricks ジョブで SQL タスクの種類を使用すると、クエリ、レガシ、ダッシュボード、アラートなどの Databricks SQL オブジェクトを含むワークフローを作成、スケジュール、操作、監視できます。 たとえば、ワークフローで、データの取り込み、データの準備、Databricks SQL クエリを使用した分析の実行、レガシ ダッシュボードへの結果の表示などを行うことができます。

この記事では、GitHub コントリビューションのメトリックを表示するレガシ ダッシュボードを作成するワークフローの例を示します。 この例では、次の操作を行います。

  • Python スクリプトと GitHub REST API を使用して GitHub データを取り込む。
  • Delta Live Tables パイプラインを使用して GitHub データを変換する。
  • 準備されたデータの分析を実行する Databricks SQL クエリをトリガーする。
  • レガシ ダッシュボードに分析を表示する。

GitHub 分析ダッシュボード

始める前に

このチュートリアルを完了するには、次のものが必要です。

手順 1: GitHub トークンをシークレットに格納する

Databricks では、ジョブ内の GitHub 個人用アクセス トークンなどの資格情報をハードコーディングする代わりに、シークレット スコープを使用してシークレットを安全に格納および管理することをお勧めします。 次の Databricks CLI コマンドは、シークレット スコープを作成し、そのスコープ内のシークレットに GitHub トークンを格納する例です。

databricks secrets create-scope <scope-name>
databricks secrets put-secret <scope-name> <token-key> --string-value <token>
  • <scope-name を、トークンを格納する Azure Databricks シークレット スコープの名前に置き換えます。
  • <token-key> を、トークンに割り当てるキーの名前に置き換えます。
  • <token> を、GitHub 個人用アクセス トークンの値に置き換えます。

手順 2: GitHub データをフェッチするスクリプトを作成する

次の Python スクリプトでは、GitHub REST API を使用して、GitHub リポジトリからコミットとコントリビューションに関するデータをフェッチします。 入力引数は GitHub リポジトリを指定します。 レコードは、別の入力引数で指定された DBFS 内の場所に保存されます。

この例では、DBFS を使用して Python スクリプトを保存しますが、Databricks Git フォルダーまたはワークスペース ファイルを使用して、スクリプトを保存および管理することもできます。

  • このスクリプトをローカル ディスク上の場所に保存します。

    import json
    import requests
    import sys
    
    api_url = "https://api.github.com"
    
    def get_commits(owner, repo, token, path):
      page = 1
      request_url =  f"{api_url}/repos/{owner}/{repo}/commits"
      more = True
    
      get_response(request_url, f"{path}/commits", token)
    
    def get_contributors(owner, repo, token, path):
      page = 1
      request_url =  f"{api_url}/repos/{owner}/{repo}/contributors"
      more = True
    
      get_response(request_url, f"{path}/contributors", token)
    
    def get_response(request_url, path, token):
      page = 1
      more = True
    
      while more:
        response = requests.get(request_url, params={'page': page}, headers={'Authorization': "token " + token})
        if response.text != "[]":
          write(path + "/records-" + str(page) + ".json", response.text)
          page += 1
        else:
          more = False
    
    def write(filename, contents):
      dbutils.fs.put(filename, contents)
    
    def main():
      args = sys.argv[1:]
      if len(args) < 6:
        print("Usage: github-api.py owner repo request output-dir secret-scope secret-key")
        sys.exit(1)
    
      owner = sys.argv[1]
      repo = sys.argv[2]
      request = sys.argv[3]
      output_path = sys.argv[4]
      secret_scope = sys.argv[5]
      secret_key = sys.argv[6]
    
      token = dbutils.secrets.get(scope=secret_scope, key=secret_key)
    
      if (request == "commits"):
        get_commits(owner, repo, token, output_path)
      elif (request == "contributors"):
        get_contributors(owner, repo, token, output_path)
    
    if __name__ == "__main__":
        main()
    
  • スクリプトを DBFS にアップロードします。

    1. Azure Databricks のランディング ページに移動し、サイドバーの カタログ アイコン [カタログ] をクリックします。
    2. [DBFS を参照] をクリックします。
    3. DBFS ファイル ブラウザーで、[アップロード] をクリックします。 [DBFS へのデータのアップロード] ダイアログが表示されます。
    4. スクリプトを格納するパスを DBFS に入力して [アップロードするファイルのドロップ] をクリックするか、または [クリックして参照] をクリックして Python スクリプトを選択します。
    5. [Done] をクリックします。

手順 3: GitHub データを処理する Delta Live Tables パイプラインを作成する

このセクションでは、Delta Live Tables パイプラインを作成して、生の GitHub データを Databricks SQL クエリで分析できるテーブルに変換します。 パイプラインを作成するには、次の手順を行います。

  1. サイドバーで、新規アイコン [新規] をクリックし、メニューから [ノートブック] を選択します。 [ノートブックの作成] ダイアログボックスが表示されます。

  2. [既定の言語] に名前を入力し、[Python] を選択します。 [クラスター] は既定値のままの設定でかまいません。 Delta Live Tables ランタイムでは、パイプラインを実行する前にクラスターを作成します。

  3. Create をクリックしてください。

  4. Python のコード例をコピーし、新しいノートブックに貼り付けます。 ノートブックの 1 つのセルまたは複数のセルにコード例を追加できます。

    import dlt
    from pyspark.sql.functions import *
    
    def parse(df):
       return (df
         .withColumn("author_date", to_timestamp(col("commit.author.date")))
         .withColumn("author_email", col("commit.author.email"))
         .withColumn("author_name", col("commit.author.name"))
         .withColumn("comment_count", col("commit.comment_count"))
         .withColumn("committer_date", to_timestamp(col("commit.committer.date")))
         .withColumn("committer_email", col("commit.committer.email"))
         .withColumn("committer_name", col("commit.committer.name"))
         .withColumn("message", col("commit.message"))
         .withColumn("sha", col("commit.tree.sha"))
         .withColumn("tree_url", col("commit.tree.url"))
         .withColumn("url", col("commit.url"))
         .withColumn("verification_payload", col("commit.verification.payload"))
         .withColumn("verification_reason", col("commit.verification.reason"))
         .withColumn("verification_signature", col("commit.verification.signature"))
         .withColumn("verification_verified", col("commit.verification.signature").cast("string"))
         .drop("commit")
       )
    
    @dlt.table(
       comment="Raw GitHub commits"
    )
    def github_commits_raw():
      df = spark.read.json(spark.conf.get("commits-path"))
      return parse(df.select("commit"))
    
    @dlt.table(
      comment="Info on the author of a commit"
    )
    def commits_by_author():
      return (
        dlt.read("github_commits_raw")
          .withColumnRenamed("author_date", "date")
          .withColumnRenamed("author_email", "email")
          .withColumnRenamed("author_name", "name")
          .select("sha", "date", "email", "name")
      )
    
    @dlt.table(
      comment="GitHub repository contributors"
    )
    def github_contributors_raw():
      return(
        spark.readStream.format("cloudFiles")
          .option("cloudFiles.format", "json")
          .load(spark.conf.get("contribs-path"))
      )
    
  5. サイドバーで、[ワークフロー] アイコン [ワークフロー] をクリックし、[Delta Live Tables] タブをクリックし、[パイプラインを作成] をクリックします。

  6. パイプラインに名前 (例: Transform GitHub data) を付けます。

  7. [ノートブック ライブラリ] フィールドにノートブックへのパスを入力するか、ファイル ピッカー アイコン をクリックしてノートブックを選択します。

  8. [構成の追加] をクリックします。 Key テキスト ボックスに、「commits-path」と入力します。 Value テキスト ボックスに、GitHub レコードを書き込む DBFS パスを入力します。 これには任意のパスを選択できます。これは、ワークフローを作成するときに、最初の Python タスクの構成で使用するパスと同じです。

  9. もう一度 [構成の追加] をクリックします。 Key テキスト ボックスに、「contribs-path」と入力します。 Value テキスト ボックスに、GitHub レコードを書き込む DBFS パスを入力します。 これには任意のパスを選択できます。これは、ワークフローを作成するときに、2 番目の Python タスクの構成で使用するパスと同じです。

  10. [ターゲット] フィールドに、ターゲットのデータベース (例: github_tables) を入力します。 ターゲットのデータベースを設定すると、出力データがメタストアに発行されます。これは、パイプラインによって生成されたデータを分析するダウンストリーム クエリに必要です。

  11. [保存] をクリックします。

手順 4: GitHub データを取り込んで変換するワークフローを作成する

Databricks SQL を使用して GitHub データを分析して視覚化する前に、データを取り込んで準備する必要があります。 これらのタスクを完了するワークフローを作成するには、次の手順を行います。

Azure Databricks ジョブを作成し、最初のタスクを追加する

  1. Azure Databricks のランディング ページに移動し、次のいずれかの操作を行います。

    • サイドバーで、[ワークフロー] アイコン [ワークフロー] をクリックし、[ジョブの作成] ボタン をクリックします。
    • サイドバーで、[新規] アイコン [新規] をクリックし、メニューから [ジョブ] を選択します。
  2. [タスク] タブに表示されるタスクのダイアログ ボックスで、[ジョブの名前を追加...] をジョブ名 (例: GitHub analysis workflow) に置き換えます。

  3. [タスク名] にタスクの名前 (例: get_commits) を入力します。

  4. [種類] で、[Python スクリプト] を選択します。

  5. [ソース] で、[DBFS/ S3] を選択します。

  6. [パス] に、DBFS 内のスクリプトへのパスを入力します。

  7. [パラメーター] に、Python スクリプトの次の引数を入力します。

    ["<owner>","<repo>","commits","<DBFS-output-dir>","<scope-name>","<github-token-key>"]

    • <owner> を、リポジトリ所有者の名前に置き換えます。 たとえば、github.com/databrickslabs/overwatch リポジトリからレコードをフェッチするには、「databrickslabs」と入力します。
    • <repo> を、リポジトリ名 (例: overwatch) に置き換えます。
    • <DBFS-output-dir> を、GitHub からフェッチされたレコードを格納する DBFS 内のパスに置き換えます。
    • <scope-name> を、GitHub トークンを格納するために作成したシークレット スコープの名前に置き換えます。
    • <github-token-key> を、GitHub トークンに割り当てたキーの名前に置き換えます。
  8. [タスクの保存] をクリックします。

別のタスクを追加する

  1. 作成したタスクの下にある [タスクの追加] ボタン をクリックします。

  2. [タスク名] にタスクの名前 (例: get_contributors) を入力します。

  3. [種類] で、タスクの種類 [Python スクリプト] を選択します。

  4. [ソース] で、[DBFS/ S3] を選択します。

  5. [パス] に、DBFS 内のスクリプトへのパスを入力します。

  6. [パラメーター] に、Python スクリプトの次の引数を入力します。

    ["<owner>","<repo>","contributors","<DBFS-output-dir>","<scope-name>","<github-token-key>"]

    • <owner> を、リポジトリ所有者の名前に置き換えます。 たとえば、github.com/databrickslabs/overwatch リポジトリからレコードをフェッチするには、「databrickslabs」と入力します。
    • <repo> を、リポジトリ名 (例: overwatch) に置き換えます。
    • <DBFS-output-dir> を、GitHub からフェッチされたレコードを格納する DBFS 内のパスに置き換えます。
    • <scope-name> を、GitHub トークンを格納するために作成したシークレット スコープの名前に置き換えます。
    • <github-token-key> を、GitHub トークンに割り当てたキーの名前に置き換えます。
  7. [タスクの保存] をクリックします。

データを変換するタスクを追加する

  1. 作成したタスクの下にある [タスクの追加] ボタン をクリックします。
  2. [タスク名] にタスクの名前 (例: transform_github_data) を入力します。
  3. [種類][Delta Live Tables パイプライン] を選択し、タスクの名前を入力します。
  4. [パイプライン] で、「手順 3: GitHub データを処理する Delta Live Tables パイプラインを作成する」で作成したパイプラインを選択します。
  5. Create をクリックしてください。

手順 5: データ変換ワークフローを実行する

[今すぐ実行] ボタン をクリックしてワークフローを実行します。 実行の詳細を表示するには、[ジョブの実行] ビューの実行の [開始時刻] 列にあるリンクをクリックします。 各タスクをクリックすると、そのタスクの実行の詳細が表示されます。

手順 6: (省略可能) ワークフローの実行が完了した後に出力データを表示するには、次の手順を行います。

  1. 実行の詳細ビューで、Delta Live Tables タスクをクリックします。
  2. [タスクの実行の詳細] パネルで、[パイプライン] の下にあるパイプライン名をクリックします。 [パイプラインの詳細] ページが表示されます。
  3. パイプライン DAG で commits_by_author テーブルを選択します。
  4. [commits_by_author] パネルの [メタストア] の横にあるテーブル名をクリックします。 Catalog Explorer ページが開きます。

Catalog Explorer では、テーブル スキーマ、サンプル データ、およびデータに関するその他の詳細を表示できます。 同じ手順に従って、github_contributors_raw テーブルのデータを表示します。

手順 7: GitHub データを削除する

実際のアプリケーションでは、データを継続的に取り込んで処理する場合があります。 この例ではデータ セット全体をダウンロードして処理するため、ワークフローを再実行するときにエラーが発生しないように、既にダウンロードした GitHub データを削除する必要があります。 ダウンロードしたデータを削除するには、次の手順を行います。

  1. 新しいノートブックを作成し、最初のセルに次のコマンドを入力します。

    dbutils.fs.rm("<commits-path", True)
    dbutils.fs.rm("<contributors-path", True)
    

    <commits-path><contributors-path> を、Python タスクを作成したときに構成した DBFS パスに置き換えます。

  2. 実行メニュー をクリックし、[セルの実行] を選択します。

このノートブックをタスクとしてワークフローに追加することもできます。

手順 8: Databricks SQL クエリを作成する

ワークフローを実行し、必要なテーブルを作成したら、準備されたデータを分析するクエリを作成します。 クエリと視覚化の例を作成するには、次の手順を行います。

月別の上位 10 人の共同作成者を表示する

  1. サイドバーで、Databricks ロゴ Databricks ロゴ の下にあるアイコンをクリックし、[SQL] を選択します。

  2. [クエリの作成] をクリックして、Databricks SQL クエリ エディターを開きます。

  3. カタログが [hive_metastore] に設定されていることを確認します。 hive_metastore の横にある [既定値] をクリックし、データベースを、Delta Live Tables パイプラインで設定した [ターゲット] 値に設定します。

  4. [新しいクエリ] タブに次のクエリを入力します。

    SELECT
      date_part('YEAR', date) AS year,
      date_part('MONTH', date) AS month,
      name,
      count(1)
    FROM
      commits_by_author
    WHERE
      name IN (
        SELECT
          name
        FROM
          commits_by_author
        GROUP BY
          name
        ORDER BY
          count(name) DESC
        LIMIT 10
      )
      AND
        date_part('YEAR', date) >= 2022
    GROUP BY
      name, year, month
    ORDER BY
      year, month, name
    
  5. [新しいクエリ] タブをクリックし、クエリの名前を変更します (例: Commits by month top 10 contributors)。

  6. 既定では、結果はテーブルとして表示されます。 データの視覚化方法を、たとえば横棒グラフを使用するように変更するには、[結果] パネルで、ケバブ メニュー をクリックし、[編集] をクリックします。

  7. [視覚化の種類] で、[横棒] を選択します。

  8. [X 列] で、[月] を選択します。

  9. [Y 列] で、[カウント (1)] を選択します。

  10. [グループ化] で、[名前] を選択します。

  11. [保存] をクリックします。

上位 20 人の共同作成者を表示する

  1. [+] > [新しいクエリの作成] をクリックし、カタログが hive_metastore に設定されていることを確認します。 hive_metastore の横にある [既定値] をクリックし、データベースを、Delta Live Tables パイプラインで設定した [ターゲット] 値に設定します。

  2. 次のクエリを入力します。

    SELECT
      login,
      cast(contributions AS INTEGER)
    FROM
      github_contributors_raw
    ORDER BY
      contributions DESC
    LIMIT 20
    
  3. [新しいクエリ] タブをクリックし、クエリの名前を変更します (例: Top 20 contributors)。

  4. 視覚化を既定のテーブルから変更するには、[結果] パネルで、ケバブ メニュー をクリックし、[編集] をクリックします。

  5. [視覚化の種類] で、[横棒] を選択します。

  6. [X 列] で、[ログイン] を選択します。

  7. [Y 列] で、[コントリビューション] を選択します。

  8. [保存] をクリックします。

作成者別の合計コミット数を表示する

  1. [+] > [新しいクエリの作成] をクリックし、カタログが hive_metastore に設定されていることを確認します。 hive_metastore の横にある [既定値] をクリックし、データベースを、Delta Live Tables パイプラインで設定した [ターゲット] 値に設定します。

  2. 次のクエリを入力します。

    SELECT
      name,
      count(1) commits
    FROM
      commits_by_author
    GROUP BY
      name
    ORDER BY
      commits DESC
    LIMIT 10
    
  3. [新しいクエリ] タブをクリックし、クエリの名前を変更します (例: Total commits by author)。

  4. 視覚化を既定のテーブルから変更するには、[結果] パネルで、ケバブ メニュー をクリックし、[編集] をクリックします。

  5. [視覚化の種類] で、[横棒] を選択します。

  6. [X 列] で、[名前] を選択します。

  7. [Y 列] で、[コミット数] を選択します。

  8. [保存] をクリックします。

手順 9: ダッシュボードを作成する

  1. サイドバーで ダッシュボード アイコン [ダッシュボード] をクリックします
  2. [ダッシュボードの作成] をクリックします。
  3. ダッシュボードの名前 (例: GitHub analysis) を入力します。
  4. 手順 8: Databricks SQL クエリを作成する」で作成したクエリと視覚化ごとに、[追加] > [視覚化] をクリックし、各視覚化を選択します。

手順 10: SQL タスクをワークフローに追加する

Azure Databricks ジョブを作成し、最初のタスクを追加する」で作成したワークフローに新しいクエリ タスクを追加するには、「手順 8: Databricks SQL クエリを作成する」で作成したクエリごとに次の操作を行います。

  1. サイドバーの [ワークフロー] アイコン [ワークフロー] をクリックします。
  2. [名前] 列で、ジョブ名をクリックします。
  3. [タスク] タブをクリックします。
  4. 最後のタスクの下にある [タスクの追加] ボタン をクリックします。
  5. タスクの名前を入力し、[種類] で、[SQL] を選択し、[SQL タスク] で、[クエリ] を選択します。
  6. [SQL クエリ] でクエリを選択します。
  7. [SQL ウェアハウス] で、タスクを実行するサーバーレス SQL ウェアハウスまたは pro SQL ウェアハウスを選択します。
  8. Create をクリックしてください。

手順 11: ダッシュボード タスクを追加する

  1. 最後のタスクの下にある [タスクの追加] ボタン をクリックします。
  2. タスクの名前を入力し、[種類] で、[SQL] を選択し、[SQL タスク] で、[レガシ ダッシュボード] を選択します。
  3. 手順 9: ダッシュボードを作成する」で作成したダッシュボードを選択します。
  4. [SQL ウェアハウス] で、タスクを実行するサーバーレス SQL ウェアハウスまたは pro SQL ウェアハウスを選択します。
  5. Create をクリックしてください。

手順 12: 完成したワークフローを実行する

ワークフローを実行するために、[今すぐ実行] ボタン をクリックします。 実行の詳細を表示するには、[ジョブの実行] ビューの実行の [開始時刻] 列にあるリンクをクリックします。

手順 13: 結果を表示する

実行が完了したときに結果を表示するには、最終的なダッシュボード タスクをクリックし、右側のパネルの [SQL ダッシュボード] の下にあるダッシュボード名をクリックします。