Gestire Azure Data Lake Analytics con Python
Importante
Azure Data Lake Analytics ritirato il 29 febbraio 2024. Altre informazioni con questo annuncio.
Per l'analisi dei dati, l'organizzazione può usare Azure Synapse Analytics o Microsoft Fabric.
Questo articolo descrive come gestire utenti, processi, origini dati e account Azure Data Lake Analytics tramite Python.
Versioni di Python supportate
- Usare una versione a 64 bit di Python.
- È possibile usare la distribuzione Python standard disponibile in Python.org download.
- Molti sviluppatori hanno la comodità di usare la distribuzione python di Anaconda.
- Questo articolo si basa su Python versione 3.6 della distribuzione di Python standard
Installare Azure Python SDK
Installare i moduli seguenti:
- Il modulo azure-mgmt-resource include altri moduli di Azure per Active Directory e così via.
- Il modulo azure-datalake-store include le operazioni di file system di Azure Data Lake Store.
- Il modulo azure-mgmt-datalake-store include le operazioni di gestione degli account di Azure Data Lake Store.
- Il modulo azure-mgmt-datalake-analytics include le operazioni di Azure Data Lake Analytics.
Assicurarsi prima di tutto di avere la versione più recente di pip
usando il comando seguente:
python -m pip install --upgrade pip
Questo documento si basa su pip version 9.0.1
.
Per installare i moduli dalla riga di comando, usare i comandi pip
seguenti:
pip install azure-identity
pip install azure-mgmt-resource
pip install azure-datalake-store
pip install azure-mgmt-datalake-store
pip install azure-mgmt-datalake-analytics
Creare un nuovo script Python
Incollare il codice seguente nello script:
# Use this only for Azure AD service-to-service authentication
#from azure.common.credentials import ServicePrincipalCredentials
# Use this only for Azure AD end-user authentication
#from azure.common.credentials import UserPassCredentials
# Required for Azure Identity
from azure.identity import DefaultAzureCredential
# Required for Azure Resource Manager
from azure.mgmt.resource.resources import ResourceManagementClient
from azure.mgmt.resource.resources.models import ResourceGroup
# Required for Azure Data Lake Store account management
from azure.mgmt.datalake.store import DataLakeStoreAccountManagementClient
from azure.mgmt.datalake.store.models import DataLakeStoreAccount
# Required for Azure Data Lake Store filesystem management
from azure.datalake.store import core, lib, multithread
# Required for Azure Data Lake Analytics account management
from azure.mgmt.datalake.analytics.account import DataLakeAnalyticsAccountManagementClient
from azure.mgmt.datalake.analytics.account.models import DataLakeAnalyticsAccount, DataLakeStoreAccountInformation
# Required for Azure Data Lake Analytics job management
from azure.mgmt.datalake.analytics.job import DataLakeAnalyticsJobManagementClient
from azure.mgmt.datalake.analytics.job.models import JobInformation, JobState, USqlJobProperties
# Required for Azure Data Lake Analytics catalog management
from azure.mgmt.datalake.analytics.catalog import DataLakeAnalyticsCatalogManagementClient
# Required for Azure Data Lake Analytics Model
from azure.mgmt.datalake.analytics.account.models import CreateOrUpdateComputePolicyParameters
# Use these as needed for your application
import logging
import getpass
import pprint
import uuid
import time
Eseguire questo script per verificare che i moduli possano essere importati.
Authentication
Autenticazione utente interattiva con un popup
Questo metodo non è supportato.
Autenticazione utente interattiva con un codice di dispositivo
user = input(
'Enter the user to authenticate with that has permission to subscription: ')
password = getpass.getpass()
credentials = UserPassCredentials(user, password)
Autenticazione non interattiva con una SPI e un segreto
# Acquire a credential object for the app identity. When running in the cloud,
# DefaultAzureCredential uses the app's managed identity (MSI) or user-assigned service principal.
# When run locally, DefaultAzureCredential relies on environment variables named
# AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, and AZURE_TENANT_ID.
credentials = DefaultAzureCredential()
Autenticazione non interattiva con un'API e un certificato
Questo metodo non è supportato.
Variabili dello script comuni
Negli esempi sono usate queste variabili.
subid = '<Azure Subscription ID>'
rg = '<Azure Resource Group Name>'
location = '<Location>' # i.e. 'eastus2'
adls = '<Azure Data Lake Store Account Name>'
adla = '<Azure Data Lake Analytics Account Name>'
Creare i client
resourceClient = ResourceManagementClient(credentials, subid)
adlaAcctClient = DataLakeAnalyticsAccountManagementClient(credentials, subid)
adlaJobClient = DataLakeAnalyticsJobManagementClient(
credentials, 'azuredatalakeanalytics.net')
Creare un gruppo di risorse di Azure
armGroupResult = resourceClient.resource_groups.create_or_update(
rg, ResourceGroup(location=location))
Creare un account di Analisi Data Lake
Creare prima un account di archiviazione.
adlsAcctResult = adlsAcctClient.account.begin_create(
rg,
adls,
DataLakeStoreAccount(
location=location)
)
).wait()
Creare quindi un account ADLA che usa tale archivio.
adlaAcctResult = adlaAcctClient.account.create(
rg,
adla,
DataLakeAnalyticsAccount(
location=location,
default_data_lake_store_account=adls,
data_lake_store_accounts=[DataLakeStoreAccountInformation(name=adls)]
)
).wait()
Inviare un processo
script = """
@a =
SELECT * FROM
(VALUES
("Contoso", 1500.0),
("Woodgrove", 2700.0)
) AS
D( customer, amount );
OUTPUT @a
TO "/data.csv"
USING Outputters.Csv();
"""
jobId = str(uuid.uuid4())
jobResult = adlaJobClient.job.create(
adla,
jobId,
JobInformation(
name='Sample Job',
type='USql',
properties=USqlJobProperties(script=script)
)
)
Attendere la fine di un processo
jobResult = adlaJobClient.job.get(adla, jobId)
while(jobResult.state != JobState.ended):
print('Job is not yet done, waiting for 3 seconds. Current state: ' +
jobResult.state.value)
time.sleep(3)
jobResult = adlaJobClient.job.get(adla, jobId)
print('Job finished with result: ' + jobResult.result.value)
Elencare pipeline e ricorrenze
A seconda se i processi hanno pipeline o metadati associati, è possibile elencare le pipeline e le ricorrenze.
pipelines = adlaJobClient.pipeline.list(adla)
for p in pipelines:
print('Pipeline: ' + p.name + ' ' + p.pipelineId)
recurrences = adlaJobClient.recurrence.list(adla)
for r in recurrences:
print('Recurrence: ' + r.name + ' ' + r.recurrenceId)
Gestire i criteri di calcolo
L'oggetto DataLakeAnalyticsAccountManagementClient offre metodi per gestire i criteri di calcolo per un account Data Lake Analytics.
Elencare i criteri di calcolo
Il codice seguente recupera un elenco di criteri di calcolo per un account Data Lake Analytics.
policies = adlaAcctClient.compute_policies.list_by_account(rg, adla)
for p in policies:
print('Name: ' + p.name + 'Type: ' + p.object_type + 'Max AUs / job: ' +
p.max_degree_of_parallelism_per_job + 'Min priority / job: ' + p.min_priority_per_job)
Creare un nuovo criterio di calcolo
Il codice seguente crea un nuovo criterio di calcolo per un account Data Lake Analytics, impostando il massimo di unità di analisi disponibile per l'utente specificato su 50 e la priorità minima del processo su 250.
userAadObjectId = "3b097601-4912-4d41-b9d2-78672fc2acde"
newPolicyParams = CreateOrUpdateComputePolicyParameters(
userAadObjectId, "User", 50, 250)
adlaAcctClient.compute_policies.create_or_update(
rg, adla, "GaryMcDaniel", newPolicyParams)
Passaggi successivi
- Per visualizzare la stessa esercitazione usando altri strumenti, selezionare i selettore di schede nella parte superiore della pagina.
- Per informazioni su U-SQL, vedere Introduzione al linguaggio U-SQL di Azure Data Lake Analytics.
- Per le attività di gestione, vedere Gestire Data Lake Analytics di Azure usando portale di Azure.