使用 Python 管理 Azure Data Lake Analytics
重要
Azure Data Lake Analytics于 2024 年 2 月 29 日停用。 通过此公告了解更多信息。
对于数据分析,组织可以使用 Azure Synapse Analytics 或 Microsoft Fabric。
本文介绍了如何使用 Python 管理 Azure Data Lake Analytics 帐户、数据源、用户和作业。
支持的 Python 版本
- 使用 64 位版本的 Python。
- 可以使用位于 Python.org 下载上的标准 Python 分发版。
- 许多开发人员发现使用 Anaconda Python 分发版会十分方便。
- 本文是使用来自标准 Python 分发版的 Python 版本 3.6 编写的
安装 Azure Python SDK
安装以下模块:
- azure-mgmt-resource 模块包括 Active Directory 等的其他 Azure 模块。
- azure-datalake-store 模块包含 Azure Data Lake Store 文件系统操作。
- azure-mgmt-datalake-store 模块包括 Azure Data Lake Store 帐户管理操作。
- azure-mgmt-datalake-analytics 模块包含 Azure Data Lake Analytics 操作。
首先,通过运行以下命令来确保具有最新 pip
:
python -m pip install --upgrade pip
本文档是使用 pip version 9.0.1
编写的。
从命令行使用以下 pip
命令安装模块:
pip install azure-identity
pip install azure-mgmt-resource
pip install azure-datalake-store
pip install azure-mgmt-datalake-store
pip install azure-mgmt-datalake-analytics
创建新 Python 脚本
将以下代码粘贴到脚本中:
# Use this only for Azure AD service-to-service authentication
#from azure.common.credentials import ServicePrincipalCredentials
# Use this only for Azure AD end-user authentication
#from azure.common.credentials import UserPassCredentials
# Required for Azure Identity
from azure.identity import DefaultAzureCredential
# Required for Azure Resource Manager
from azure.mgmt.resource.resources import ResourceManagementClient
from azure.mgmt.resource.resources.models import ResourceGroup
# Required for Azure Data Lake Store account management
from azure.mgmt.datalake.store import DataLakeStoreAccountManagementClient
from azure.mgmt.datalake.store.models import DataLakeStoreAccount
# Required for Azure Data Lake Store filesystem management
from azure.datalake.store import core, lib, multithread
# Required for Azure Data Lake Analytics account management
from azure.mgmt.datalake.analytics.account import DataLakeAnalyticsAccountManagementClient
from azure.mgmt.datalake.analytics.account.models import DataLakeAnalyticsAccount, DataLakeStoreAccountInformation
# Required for Azure Data Lake Analytics job management
from azure.mgmt.datalake.analytics.job import DataLakeAnalyticsJobManagementClient
from azure.mgmt.datalake.analytics.job.models import JobInformation, JobState, USqlJobProperties
# Required for Azure Data Lake Analytics catalog management
from azure.mgmt.datalake.analytics.catalog import DataLakeAnalyticsCatalogManagementClient
# Required for Azure Data Lake Analytics Model
from azure.mgmt.datalake.analytics.account.models import CreateOrUpdateComputePolicyParameters
# Use these as needed for your application
import logging
import getpass
import pprint
import uuid
import time
运行此脚本以验证是否可以导入模块。
身份验证
使用弹出窗口的交互用户身份验证
不支持此方法。
使用设备代码的交互用户身份验证
user = input(
'Enter the user to authenticate with that has permission to subscription: ')
password = getpass.getpass()
credentials = UserPassCredentials(user, password)
使用 SPI 和机密的非交互身份验证
# Acquire a credential object for the app identity. When running in the cloud,
# DefaultAzureCredential uses the app's managed identity (MSI) or user-assigned service principal.
# When run locally, DefaultAzureCredential relies on environment variables named
# AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, and AZURE_TENANT_ID.
credentials = DefaultAzureCredential()
使用 API 和证书的非交互身份验证
不支持此方法。
常见脚本变量
示例中使用了这些变量。
subid = '<Azure Subscription ID>'
rg = '<Azure Resource Group Name>'
location = '<Location>' # i.e. 'eastus2'
adls = '<Azure Data Lake Store Account Name>'
adla = '<Azure Data Lake Analytics Account Name>'
创建客户端
resourceClient = ResourceManagementClient(credentials, subid)
adlaAcctClient = DataLakeAnalyticsAccountManagementClient(credentials, subid)
adlaJobClient = DataLakeAnalyticsJobManagementClient(
credentials, 'azuredatalakeanalytics.net')
创建 Azure 资源组
armGroupResult = resourceClient.resource_groups.create_or_update(
rg, ResourceGroup(location=location))
创建 Data Lake Analytics 帐户
首先创建存储帐户。
adlsAcctResult = adlsAcctClient.account.begin_create(
rg,
adls,
DataLakeStoreAccount(
location=location)
)
).wait()
然后创建使用该存储的 ADLA 帐户。
adlaAcctResult = adlaAcctClient.account.create(
rg,
adla,
DataLakeAnalyticsAccount(
location=location,
default_data_lake_store_account=adls,
data_lake_store_accounts=[DataLakeStoreAccountInformation(name=adls)]
)
).wait()
提交作业
script = """
@a =
SELECT * FROM
(VALUES
("Contoso", 1500.0),
("Woodgrove", 2700.0)
) AS
D( customer, amount );
OUTPUT @a
TO "/data.csv"
USING Outputters.Csv();
"""
jobId = str(uuid.uuid4())
jobResult = adlaJobClient.job.create(
adla,
jobId,
JobInformation(
name='Sample Job',
type='USql',
properties=USqlJobProperties(script=script)
)
)
等待作业结束
jobResult = adlaJobClient.job.get(adla, jobId)
while(jobResult.state != JobState.ended):
print('Job is not yet done, waiting for 3 seconds. Current state: ' +
jobResult.state.value)
time.sleep(3)
jobResult = adlaJobClient.job.get(adla, jobId)
print('Job finished with result: ' + jobResult.result.value)
列出管道和重复周期
根据作业是附加了管道还是重复周期元数据,可以列出管道和重复周期。
pipelines = adlaJobClient.pipeline.list(adla)
for p in pipelines:
print('Pipeline: ' + p.name + ' ' + p.pipelineId)
recurrences = adlaJobClient.recurrence.list(adla)
for r in recurrences:
print('Recurrence: ' + r.name + ' ' + r.recurrenceId)
管理计算策略
DataLakeAnalyticsAccountManagementClient 对象提供用于为 Data Lake Analytics 帐户管理计算策略的方法。
列出计算策略
下面的代码检索 Data Lake Analytics 帐户的计算策略列表。
policies = adlaAcctClient.compute_policies.list_by_account(rg, adla)
for p in policies:
print('Name: ' + p.name + 'Type: ' + p.object_type + 'Max AUs / job: ' +
p.max_degree_of_parallelism_per_job + 'Min priority / job: ' + p.min_priority_per_job)
创建新计算策略
下面的代码为 Data Lake Analytics 帐户创建新的计算策略,将对指定用户可用的最大 AU 设置为 50,并将最小作业优先级设置为 250。
userAadObjectId = "3b097601-4912-4d41-b9d2-78672fc2acde"
newPolicyParams = CreateOrUpdateComputePolicyParameters(
userAadObjectId, "User", 50, 250)
adlaAcctClient.compute_policies.create_or_update(
rg, adla, "GaryMcDaniel", newPolicyParams)
后续步骤
- 若要使用其他工具查看相同的教程,请选择页面顶部的选项卡选择器。
- 若要了解 U-SQL,请参阅 Azure Data Lake Analytics U-SQL 语言入门。
- 有关管理任务,请参阅使用 Azure 门户管理 Azure Data Lake Analytics。