Administración de Azure Data Lake Analytics con Python
Importante
Azure Data Lake Analytics retiró el 29 de febrero de 2024. Más información sobre este anuncio.
Para el análisis de datos, su organización puede usar Azure Synapse Analytics o Microsoft Fabric.
En este artículo se describe cómo administrar cuentas, orígenes de datos, usuarios y trabajos de Azure Data Lake Analytics mediante Python.
Versiones de Python compatibles
- Use una versión de Python de 64 bits.
- Puede usar la distribución estándar de Python que encontrará en la sección de descargas de Python.org .
- Muchos desarrolladores consideran conveniente usar la distribución de Python Anaconda .
- Este artículo se escribió para la versión 3.6 de Python con la distribución de Python estándar
Instalación del SDK de Python de Azure
Instale los siguientes módulos:
- El módulo azure-mgmt-resource incluye otros módulos de Azure para Active Directory, etc.
- El módulo azure-datalake-store incluye las operaciones de sistema de archivos de Azure Data Lake Store.
- El módulo azure-datalake-store incluye las operaciones de administración de cuentas de Azure Data Lake Store.
- El módulo azure-mgmt-datalake-analytics incluye las operaciones de Azure Data Lake Analytics.
En primer lugar, asegúrese de que dispone del último pip
; para ello, ejecute el comando siguiente:
python -m pip install --upgrade pip
Este documento se ha escrito con pip version 9.0.1
.
Use el comando pip
siguiente para instalar los módulos desde la línea de comandos:
pip install azure-identity
pip install azure-mgmt-resource
pip install azure-datalake-store
pip install azure-mgmt-datalake-store
pip install azure-mgmt-datalake-analytics
Creación de un nuevo script de Python
Pegue el código siguiente en el script:
# Use this only for Azure AD service-to-service authentication
#from azure.common.credentials import ServicePrincipalCredentials
# Use this only for Azure AD end-user authentication
#from azure.common.credentials import UserPassCredentials
# Required for Azure Identity
from azure.identity import DefaultAzureCredential
# Required for Azure Resource Manager
from azure.mgmt.resource.resources import ResourceManagementClient
from azure.mgmt.resource.resources.models import ResourceGroup
# Required for Azure Data Lake Store account management
from azure.mgmt.datalake.store import DataLakeStoreAccountManagementClient
from azure.mgmt.datalake.store.models import DataLakeStoreAccount
# Required for Azure Data Lake Store filesystem management
from azure.datalake.store import core, lib, multithread
# Required for Azure Data Lake Analytics account management
from azure.mgmt.datalake.analytics.account import DataLakeAnalyticsAccountManagementClient
from azure.mgmt.datalake.analytics.account.models import DataLakeAnalyticsAccount, DataLakeStoreAccountInformation
# Required for Azure Data Lake Analytics job management
from azure.mgmt.datalake.analytics.job import DataLakeAnalyticsJobManagementClient
from azure.mgmt.datalake.analytics.job.models import JobInformation, JobState, USqlJobProperties
# Required for Azure Data Lake Analytics catalog management
from azure.mgmt.datalake.analytics.catalog import DataLakeAnalyticsCatalogManagementClient
# Required for Azure Data Lake Analytics Model
from azure.mgmt.datalake.analytics.account.models import CreateOrUpdateComputePolicyParameters
# Use these as needed for your application
import logging
import getpass
import pprint
import uuid
import time
Ejecute este script para comprobar que se pueden importar los módulos.
Authentication
Autenticación interactiva de usuarios con elemento emergente
Este método no se admite.
Autenticación interactiva de usuarios con código de dispositivo
user = input(
'Enter the user to authenticate with that has permission to subscription: ')
password = getpass.getpass()
credentials = UserPassCredentials(user, password)
Autenticación no interactiva con SPI y secreto
# Acquire a credential object for the app identity. When running in the cloud,
# DefaultAzureCredential uses the app's managed identity (MSI) or user-assigned service principal.
# When run locally, DefaultAzureCredential relies on environment variables named
# AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, and AZURE_TENANT_ID.
credentials = DefaultAzureCredential()
Autenticación no interactiva con API y certificado
Este método no se admite.
Variables de script comunes
Estas variables se usan en los ejemplos.
subid = '<Azure Subscription ID>'
rg = '<Azure Resource Group Name>'
location = '<Location>' # i.e. 'eastus2'
adls = '<Azure Data Lake Store Account Name>'
adla = '<Azure Data Lake Analytics Account Name>'
Creación de los clientes
resourceClient = ResourceManagementClient(credentials, subid)
adlaAcctClient = DataLakeAnalyticsAccountManagementClient(credentials, subid)
adlaJobClient = DataLakeAnalyticsJobManagementClient(
credentials, 'azuredatalakeanalytics.net')
Creación de un grupo de recursos de Azure
armGroupResult = resourceClient.resource_groups.create_or_update(
rg, ResourceGroup(location=location))
Creación de una cuenta de Análisis de Data Lake
En primer lugar, cree una cuenta de almacenamiento.
adlsAcctResult = adlsAcctClient.account.begin_create(
rg,
adls,
DataLakeStoreAccount(
location=location)
)
).wait()
A continuación, cree una cuenta de ADLA que utilice ese almacén.
adlaAcctResult = adlaAcctClient.account.create(
rg,
adla,
DataLakeAnalyticsAccount(
location=location,
default_data_lake_store_account=adls,
data_lake_store_accounts=[DataLakeStoreAccountInformation(name=adls)]
)
).wait()
Enviar un trabajo
script = """
@a =
SELECT * FROM
(VALUES
("Contoso", 1500.0),
("Woodgrove", 2700.0)
) AS
D( customer, amount );
OUTPUT @a
TO "/data.csv"
USING Outputters.Csv();
"""
jobId = str(uuid.uuid4())
jobResult = adlaJobClient.job.create(
adla,
jobId,
JobInformation(
name='Sample Job',
type='USql',
properties=USqlJobProperties(script=script)
)
)
Esperar a que finalice un trabajo
jobResult = adlaJobClient.job.get(adla, jobId)
while(jobResult.state != JobState.ended):
print('Job is not yet done, waiting for 3 seconds. Current state: ' +
jobResult.state.value)
time.sleep(3)
jobResult = adlaJobClient.job.get(adla, jobId)
print('Job finished with result: ' + jobResult.result.value)
Enumerar las canalizaciones y las repeticiones
Dependiendo de si los trabajos tienen metadatos adjuntos de canalización o repetición, puede enumerar las canalizaciones y las repeticiones.
pipelines = adlaJobClient.pipeline.list(adla)
for p in pipelines:
print('Pipeline: ' + p.name + ' ' + p.pipelineId)
recurrences = adlaJobClient.recurrence.list(adla)
for r in recurrences:
print('Recurrence: ' + r.name + ' ' + r.recurrenceId)
Administración de nodos directivas de proceso
El objeto de DataLakeAnalyticsAccountManagementClient proporciona métodos para administrar las directivas de proceso de una cuenta de Data Lake Analytics.
Enumeración de directivas de proceso
El código siguiente recupera una lista de directivas de proceso de una cuenta de Data Lake Analytics.
policies = adlaAcctClient.compute_policies.list_by_account(rg, adla)
for p in policies:
print('Name: ' + p.name + 'Type: ' + p.object_type + 'Max AUs / job: ' +
p.max_degree_of_parallelism_per_job + 'Min priority / job: ' + p.min_priority_per_job)
Creación de una nueva directiva de proceso
El siguiente código crea una nueva directiva de cálculo para una cuenta de análisis de Data Lake y establece que el número máximo de AU disponibles para el usuario especificado en 50 y la prioridad del trabajo mínimo en 250.
userAadObjectId = "3b097601-4912-4d41-b9d2-78672fc2acde"
newPolicyParams = CreateOrUpdateComputePolicyParameters(
userAadObjectId, "User", 50, 250)
adlaAcctClient.compute_policies.create_or_update(
rg, adla, "GaryMcDaniel", newPolicyParams)
Pasos siguientes
- Para ver el mismo tutorial con otras herramientas, seleccione los selectores de pestañas en la parte superior de la página.
- Para obtener más información sobre U-SQL, consulte Introducción al lenguaje U-SQL de Análisis de Azure Data Lake.
- Para conocer las tareas de administración, consulte Administración de Azure Data Lake Analytics mediante el Azure Portal.