Python を使用して Azure Data Lake Analytics を管理する
重要
Azure Data Lake Analytics は 2024 年 2 月 29 日に廃止されました。 このお知らせで詳細を学びましょう。
データ分析の場合、組織は Azure Synapse Analytics または Microsoft Fabric を使用できます。
この記事では、Python を使用して Azure Data Lake Analytics アカウント、データ ソース、ユーザー、ジョブを管理する方法について説明します。
サポートされている Python バージョン
- 64 ビット バージョンの Python を使用します。
- Python.org ダウンロードで見つかった標準の Python ディストリビューション を使用できます。
- 多くの開発者は 、Anaconda Python ディストリビューションを使用すると便利です。
- この記事は、標準の Python ディストリビューションから Python バージョン 3.6 を使用して記述されました
Azure Python SDK をインストールする
次のモジュールをインストールします。
- azure-mgmt-resource モジュールには、Active Directory などの他の Azure モジュールが含まれています。
- azure-datalake-store モジュールには、Azure Data Lake Store ファイルシステム操作が含まれています。
- azure-mgmt-datalake-store モジュールには、Azure Data Lake Store アカウント管理操作が含まれています。
- azure-mgmt-datalake-analytics モジュールには、Azure Data Lake Analytics 操作が含まれています。
まず、次のコマンドを実行して、最新の pip
があることを確認します。
python -m pip install --upgrade pip
このドキュメントは、 pip version 9.0.1
を使用して作成されました。
コマンド ラインからモジュールをインストールするには、次の pip
コマンドを使用します。
pip install azure-identity
pip install azure-mgmt-resource
pip install azure-datalake-store
pip install azure-mgmt-datalake-store
pip install azure-mgmt-datalake-analytics
新しい Python スクリプトを作成する
次のコードをスクリプトに貼り付けます。
# Use this only for Azure AD service-to-service authentication
#from azure.common.credentials import ServicePrincipalCredentials
# Use this only for Azure AD end-user authentication
#from azure.common.credentials import UserPassCredentials
# Required for Azure Identity
from azure.identity import DefaultAzureCredential
# Required for Azure Resource Manager
from azure.mgmt.resource.resources import ResourceManagementClient
from azure.mgmt.resource.resources.models import ResourceGroup
# Required for Azure Data Lake Store account management
from azure.mgmt.datalake.store import DataLakeStoreAccountManagementClient
from azure.mgmt.datalake.store.models import DataLakeStoreAccount
# Required for Azure Data Lake Store filesystem management
from azure.datalake.store import core, lib, multithread
# Required for Azure Data Lake Analytics account management
from azure.mgmt.datalake.analytics.account import DataLakeAnalyticsAccountManagementClient
from azure.mgmt.datalake.analytics.account.models import DataLakeAnalyticsAccount, DataLakeStoreAccountInformation
# Required for Azure Data Lake Analytics job management
from azure.mgmt.datalake.analytics.job import DataLakeAnalyticsJobManagementClient
from azure.mgmt.datalake.analytics.job.models import JobInformation, JobState, USqlJobProperties
# Required for Azure Data Lake Analytics catalog management
from azure.mgmt.datalake.analytics.catalog import DataLakeAnalyticsCatalogManagementClient
# Required for Azure Data Lake Analytics Model
from azure.mgmt.datalake.analytics.account.models import CreateOrUpdateComputePolicyParameters
# Use these as needed for your application
import logging
import getpass
import pprint
import uuid
import time
このスクリプトを実行して、モジュールをインポートできることを確認します。
認証
ポップアップを使用した対話型ユーザー認証
このメソッドはサポートされていません。
デバイス コードを使用した対話型ユーザー認証
user = input(
'Enter the user to authenticate with that has permission to subscription: ')
password = getpass.getpass()
credentials = UserPassCredentials(user, password)
SPI とシークレットを使用した非対話型認証
# Acquire a credential object for the app identity. When running in the cloud,
# DefaultAzureCredential uses the app's managed identity (MSI) or user-assigned service principal.
# When run locally, DefaultAzureCredential relies on environment variables named
# AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, and AZURE_TENANT_ID.
credentials = DefaultAzureCredential()
API と証明書を使用した非対話型認証
このメソッドはサポートされていません。
一般的なスクリプト変数
これらの変数はサンプルで使用されます。
subid = '<Azure Subscription ID>'
rg = '<Azure Resource Group Name>'
location = '<Location>' # i.e. 'eastus2'
adls = '<Azure Data Lake Store Account Name>'
adla = '<Azure Data Lake Analytics Account Name>'
クライアントを作成する
resourceClient = ResourceManagementClient(credentials, subid)
adlaAcctClient = DataLakeAnalyticsAccountManagementClient(credentials, subid)
adlaJobClient = DataLakeAnalyticsJobManagementClient(
credentials, 'azuredatalakeanalytics.net')
Azure リソース グループを作成する
armGroupResult = resourceClient.resource_groups.create_or_update(
rg, ResourceGroup(location=location))
Data Lake Analytics アカウントを作成する
まず、ストア アカウントを作成します。
adlsAcctResult = adlsAcctClient.account.begin_create(
rg,
adls,
DataLakeStoreAccount(
location=location)
)
).wait()
次に、そのストアを使用する ADLA アカウントを作成します。
adlaAcctResult = adlaAcctClient.account.create(
rg,
adla,
DataLakeAnalyticsAccount(
location=location,
default_data_lake_store_account=adls,
data_lake_store_accounts=[DataLakeStoreAccountInformation(name=adls)]
)
).wait()
仕事を送信する
script = """
@a =
SELECT * FROM
(VALUES
("Contoso", 1500.0),
("Woodgrove", 2700.0)
) AS
D( customer, amount );
OUTPUT @a
TO "/data.csv"
USING Outputters.Csv();
"""
jobId = str(uuid.uuid4())
jobResult = adlaJobClient.job.create(
adla,
jobId,
JobInformation(
name='Sample Job',
type='USql',
properties=USqlJobProperties(script=script)
)
)
ジョブが終了するのを待つ
jobResult = adlaJobClient.job.get(adla, jobId)
while(jobResult.state != JobState.ended):
print('Job is not yet done, waiting for 3 seconds. Current state: ' +
jobResult.state.value)
time.sleep(3)
jobResult = adlaJobClient.job.get(adla, jobId)
print('Job finished with result: ' + jobResult.result.value)
パイプラインと繰り返しを一覧表示する
ジョブにパイプラインメタデータと繰り返しメタデータのどちらをアタッチするかに応じて、パイプラインと繰り返しを一覧表示できます。
pipelines = adlaJobClient.pipeline.list(adla)
for p in pipelines:
print('Pipeline: ' + p.name + ' ' + p.pipelineId)
recurrences = adlaJobClient.recurrence.list(adla)
for r in recurrences:
print('Recurrence: ' + r.name + ' ' + r.recurrenceId)
コンピューティング ポリシーの管理
DataLakeAnalyticsAccountManagementClient オブジェクトは、Data Lake Analytics アカウントのコンピューティング ポリシーを管理するためのメソッドを提供します。
コンピューティング ポリシーを一覧表示する
次のコードは、Data Lake Analytics アカウントのコンピューティング ポリシーの一覧を取得します。
policies = adlaAcctClient.compute_policies.list_by_account(rg, adla)
for p in policies:
print('Name: ' + p.name + 'Type: ' + p.object_type + 'Max AUs / job: ' +
p.max_degree_of_parallelism_per_job + 'Min priority / job: ' + p.min_priority_per_job)
新しいコンピューティング ポリシーを作成する
次のコードでは、Data Lake Analytics アカウントの新しいコンピューティング ポリシーを作成し、指定したユーザーが使用できる最大 AU を 50 に、最小ジョブ優先度を 250 に設定します。
userAadObjectId = "3b097601-4912-4d41-b9d2-78672fc2acde"
newPolicyParams = CreateOrUpdateComputePolicyParameters(
userAadObjectId, "User", 50, 250)
adlaAcctClient.compute_policies.create_or_update(
rg, adla, "GaryMcDaniel", newPolicyParams)
次のステップ
- 他のツールを使用して同じチュートリアルを表示するには、ページの上部にあるタブ セレクターを選択します。
- U-SQL の詳細については、「Azure Data Lake Analytics U-SQL 言語の概要」を参照してください。
- 管理タスクについては、Azure portal を使用した Azure Data Lake Analytics の管理に関するページを参照してください。