Správa azure Data Lake Analytics pomocí Pythonu
Důležité
Azure Data Lake Analytics vyřazena 29. února 2024. Další informace najdete v tomto oznámení.
K analýze dat může vaše organizace použít Azure Synapse Analytics nebo Microsoft Fabric.
Tento článek popisuje, jak spravovat účty Azure Data Lake Analytics, zdroje dat, uživatele a úlohy pomocí Pythonu.
Podporované verze Pythonu
- Použijte 64bitovou verzi Pythonu.
- Můžete použít standardní distribuci Pythonu, kterou najdete na Python.org stažení.
- Pro mnoho vývojářů je vhodné používat distribuci Pythonu Anaconda.
- Tento článek byl napsán pomocí Pythonu verze 3.6 ze standardní distribuce Pythonu.
Instalace sady Azure Python SDK
Nainstalujte následující moduly:
- Modul azure-mgmt-resource zahrnuje další moduly Azure pro Active Directory atd.
- Modul azure-datalake-store zahrnuje operace systému souborů Azure Data Lake Store.
- Modul azure-mgmt-datalake-store zahrnuje operace správy účtu Azure Data Lake Store.
- Modul azure-mgmt-datalake-analytics zahrnuje operace Azure Data Lake Analytics.
Nejprve spusťte následující příkaz a ujistěte se, že máte nejnovější pip
verzi:
python -m pip install --upgrade pip
Tento dokument byl napsán pomocí .pip version 9.0.1
K instalaci modulů z příkazového řádku použijte následující pip
příkazy:
pip install azure-identity
pip install azure-mgmt-resource
pip install azure-datalake-store
pip install azure-mgmt-datalake-store
pip install azure-mgmt-datalake-analytics
Vytvoření nového skriptu Pythonu
Do skriptu vložte následující kód:
# Use this only for Azure AD service-to-service authentication
#from azure.common.credentials import ServicePrincipalCredentials
# Use this only for Azure AD end-user authentication
#from azure.common.credentials import UserPassCredentials
# Required for Azure Identity
from azure.identity import DefaultAzureCredential
# Required for Azure Resource Manager
from azure.mgmt.resource.resources import ResourceManagementClient
from azure.mgmt.resource.resources.models import ResourceGroup
# Required for Azure Data Lake Store account management
from azure.mgmt.datalake.store import DataLakeStoreAccountManagementClient
from azure.mgmt.datalake.store.models import DataLakeStoreAccount
# Required for Azure Data Lake Store filesystem management
from azure.datalake.store import core, lib, multithread
# Required for Azure Data Lake Analytics account management
from azure.mgmt.datalake.analytics.account import DataLakeAnalyticsAccountManagementClient
from azure.mgmt.datalake.analytics.account.models import DataLakeAnalyticsAccount, DataLakeStoreAccountInformation
# Required for Azure Data Lake Analytics job management
from azure.mgmt.datalake.analytics.job import DataLakeAnalyticsJobManagementClient
from azure.mgmt.datalake.analytics.job.models import JobInformation, JobState, USqlJobProperties
# Required for Azure Data Lake Analytics catalog management
from azure.mgmt.datalake.analytics.catalog import DataLakeAnalyticsCatalogManagementClient
# Required for Azure Data Lake Analytics Model
from azure.mgmt.datalake.analytics.account.models import CreateOrUpdateComputePolicyParameters
# Use these as needed for your application
import logging
import getpass
import pprint
import uuid
import time
Spuštěním tohoto skriptu ověřte, že moduly lze importovat.
Authentication
Interaktivní ověřování uživatelů pomocí automaticky otevíraných oken
Tato metoda se nepodporuje.
Interaktivní ověřování uživatelů pomocí kódu zařízení
user = input(
'Enter the user to authenticate with that has permission to subscription: ')
password = getpass.getpass()
credentials = UserPassCredentials(user, password)
Neinteraktivní ověřování s využitím SPI a tajného klíče
# Acquire a credential object for the app identity. When running in the cloud,
# DefaultAzureCredential uses the app's managed identity (MSI) or user-assigned service principal.
# When run locally, DefaultAzureCredential relies on environment variables named
# AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, and AZURE_TENANT_ID.
credentials = DefaultAzureCredential()
Neinteraktivní ověřování pomocí rozhraní API a certifikátu
Tato metoda se nepodporuje.
Běžné proměnné skriptu
Tyto proměnné se používají v ukázkách.
subid = '<Azure Subscription ID>'
rg = '<Azure Resource Group Name>'
location = '<Location>' # i.e. 'eastus2'
adls = '<Azure Data Lake Store Account Name>'
adla = '<Azure Data Lake Analytics Account Name>'
Vytvoření klientů
resourceClient = ResourceManagementClient(credentials, subid)
adlaAcctClient = DataLakeAnalyticsAccountManagementClient(credentials, subid)
adlaJobClient = DataLakeAnalyticsJobManagementClient(
credentials, 'azuredatalakeanalytics.net')
Vytvoření skupiny prostředků Azure
armGroupResult = resourceClient.resource_groups.create_or_update(
rg, ResourceGroup(location=location))
Vytvoření účtu Data Lake Analytics
Nejprve vytvořte účet obchodu.
adlsAcctResult = adlsAcctClient.account.begin_create(
rg,
adls,
DataLakeStoreAccount(
location=location)
)
).wait()
Pak vytvořte účet ADLA, který používá toto úložiště.
adlaAcctResult = adlaAcctClient.account.create(
rg,
adla,
DataLakeAnalyticsAccount(
location=location,
default_data_lake_store_account=adls,
data_lake_store_accounts=[DataLakeStoreAccountInformation(name=adls)]
)
).wait()
Odeslání úlohy
script = """
@a =
SELECT * FROM
(VALUES
("Contoso", 1500.0),
("Woodgrove", 2700.0)
) AS
D( customer, amount );
OUTPUT @a
TO "/data.csv"
USING Outputters.Csv();
"""
jobId = str(uuid.uuid4())
jobResult = adlaJobClient.job.create(
adla,
jobId,
JobInformation(
name='Sample Job',
type='USql',
properties=USqlJobProperties(script=script)
)
)
Čekání na ukončení úlohy
jobResult = adlaJobClient.job.get(adla, jobId)
while(jobResult.state != JobState.ended):
print('Job is not yet done, waiting for 3 seconds. Current state: ' +
jobResult.state.value)
time.sleep(3)
jobResult = adlaJobClient.job.get(adla, jobId)
print('Job finished with result: ' + jobResult.result.value)
Výpis kanálů a opakování
V závislosti na tom, jestli vaše úlohy mají připojená metadata kanálu nebo opakování, můžete zobrazit seznam kanálů a opakování.
pipelines = adlaJobClient.pipeline.list(adla)
for p in pipelines:
print('Pipeline: ' + p.name + ' ' + p.pipelineId)
recurrences = adlaJobClient.recurrence.list(adla)
for r in recurrences:
print('Recurrence: ' + r.name + ' ' + r.recurrenceId)
Správa zásad výpočetních prostředků
Objekt DataLakeAnalyticsAccountManagementClient poskytuje metody pro správu zásad výpočetních prostředků pro účet Data Lake Analytics.
Výpis zásad výpočetních prostředků
Následující kód načte seznam zásad výpočetních prostředků pro účet Data Lake Analytics.
policies = adlaAcctClient.compute_policies.list_by_account(rg, adla)
for p in policies:
print('Name: ' + p.name + 'Type: ' + p.object_type + 'Max AUs / job: ' +
p.max_degree_of_parallelism_per_job + 'Min priority / job: ' + p.min_priority_per_job)
Vytvoření nové zásady výpočetních prostředků
Následující kód vytvoří novou výpočetní zásadu pro účet Data Lake Analytics a nastaví maximální počet jednotek AU dostupných pro zadaného uživatele na 50 a minimální prioritu úlohy na 250.
userAadObjectId = "3b097601-4912-4d41-b9d2-78672fc2acde"
newPolicyParams = CreateOrUpdateComputePolicyParameters(
userAadObjectId, "User", 50, 250)
adlaAcctClient.compute_policies.create_or_update(
rg, adla, "GaryMcDaniel", newPolicyParams)
Další kroky
- Pokud chcete zobrazit stejný kurz pomocí jiných nástrojů, vyberte selektory karet v horní části stránky.
- Pokud se chcete naučit jazyk U-SQL, informace najdete v tématu Začínáme s jazykem U-SQL Azure Data Lake Analytics.
- Informace o úlohách správy najdete v tématu Správa azure Data Lake Analytics pomocí Azure Portal.