Compartilhar via


Gerenciar o Azure Data Lake Analytics usando o Python

Importante

O Azure Data Lake Analytics desativado em 29 de fevereiro de 2024. Saiba mais nesse comunicado.

Para análise de dados, sua organização pode usar o Azure Synapse Analytics ou o Microsoft Fabric.

Este artigo descreve como gerenciar contas, fontes de dados, usuários e trabalhos do Azure Data Lake Analytics usando o Python.

Versões do Python compatíveis

  • Use uma versão de 64 bits do Python.
  • Você pode usar a distribuição padrão do Python encontrada nos downloads do Python.org.
  • Muitos desenvolvedores consideram conveniente usar a Distribuição do Anaconda Python.
  • Este artigo foi escrito usando o Python versão 3.6 da distribuição padrão do Python

Instalar o SDK do Python do Azure

Instale os seguintes módulos:

  • O módulo azure-mgmt-resource inclui outros módulos do Azure para o Active Directory, etc.
  • O módulo azure-datalake-store inclui as operações de sistema de arquivos do Azure Data Lake Store.
  • O módulo azure-mgmt-datalake-store inclui as operações de gerenciamento de contas do Azure Data Lake Store.
  • O módulo azure-mgmt-datalake-analytics inclui as operações do Azure Data Lake Analytics.

Primeiro, verifique se você tem a versão mais recente do pip, executando o seguinte comando:

python -m pip install --upgrade pip

Este documento foi escrito usando pip version 9.0.1.

Use os seguintes comandos pip para instalar os módulos de linha de comando:

pip install azure-identity
pip install azure-mgmt-resource
pip install azure-datalake-store
pip install azure-mgmt-datalake-store
pip install azure-mgmt-datalake-analytics

Criar um novo script do Python

Cole o seguinte código no script:

# Use this only for Azure AD service-to-service authentication
#from azure.common.credentials import ServicePrincipalCredentials

# Use this only for Azure AD end-user authentication
#from azure.common.credentials import UserPassCredentials

# Required for Azure Identity
from azure.identity import DefaultAzureCredential

# Required for Azure Resource Manager
from azure.mgmt.resource.resources import ResourceManagementClient
from azure.mgmt.resource.resources.models import ResourceGroup

# Required for Azure Data Lake Store account management
from azure.mgmt.datalake.store import DataLakeStoreAccountManagementClient
from azure.mgmt.datalake.store.models import DataLakeStoreAccount

# Required for Azure Data Lake Store filesystem management
from azure.datalake.store import core, lib, multithread

# Required for Azure Data Lake Analytics account management
from azure.mgmt.datalake.analytics.account import DataLakeAnalyticsAccountManagementClient
from azure.mgmt.datalake.analytics.account.models import DataLakeAnalyticsAccount, DataLakeStoreAccountInformation

# Required for Azure Data Lake Analytics job management
from azure.mgmt.datalake.analytics.job import DataLakeAnalyticsJobManagementClient
from azure.mgmt.datalake.analytics.job.models import JobInformation, JobState, USqlJobProperties

# Required for Azure Data Lake Analytics catalog management
from azure.mgmt.datalake.analytics.catalog import DataLakeAnalyticsCatalogManagementClient

# Required for Azure Data Lake Analytics Model
from azure.mgmt.datalake.analytics.account.models import CreateOrUpdateComputePolicyParameters

# Use these as needed for your application
import logging
import getpass
import pprint
import uuid
import time

Execute este script para verificar se os módulos podem ser importados.

Autenticação

Autenticação de usuário interativo com um pop-up

Não há suporte para esse método.

Autenticação de usuário interativo com um código de dispositivo

user = input(
    'Enter the user to authenticate with that has permission to subscription: ')
password = getpass.getpass()
credentials = UserPassCredentials(user, password)

Autenticação não interativa com SPI e um segredo

# Acquire a credential object for the app identity. When running in the cloud,
# DefaultAzureCredential uses the app's managed identity (MSI) or user-assigned service principal.
# When run locally, DefaultAzureCredential relies on environment variables named
# AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, and AZURE_TENANT_ID.

credentials = DefaultAzureCredential()

Autenticação não interativa com API e um certificado

Não há suporte para esse método.

Variáveis do script comum

Essas variáveis são usadas nos exemplos.

subid = '<Azure Subscription ID>'
rg = '<Azure Resource Group Name>'
location = '<Location>'  # i.e. 'eastus2'
adls = '<Azure Data Lake Store Account Name>'
adla = '<Azure Data Lake Analytics Account Name>'

Criar os clientes

resourceClient = ResourceManagementClient(credentials, subid)
adlaAcctClient = DataLakeAnalyticsAccountManagementClient(credentials, subid)
adlaJobClient = DataLakeAnalyticsJobManagementClient(
    credentials, 'azuredatalakeanalytics.net')

Criar um Grupo de Recursos do Azure

armGroupResult = resourceClient.resource_groups.create_or_update(
    rg, ResourceGroup(location=location))

Criar conta da Análise Data Lake

Primeiro, crie uma conta de repositório.

adlsAcctResult = adlsAcctClient.account.begin_create(
	rg,
	adls,
	DataLakeStoreAccount(
		location=location)
	)
).wait()

Em seguida, crie uma conta do ADLA que usará o repositório.

adlaAcctResult = adlaAcctClient.account.create(
    rg,
    adla,
    DataLakeAnalyticsAccount(
        location=location,
        default_data_lake_store_account=adls,
        data_lake_store_accounts=[DataLakeStoreAccountInformation(name=adls)]
    )
).wait()

Enviar um trabalho

script = """
@a  = 
    SELECT * FROM 
        (VALUES
            ("Contoso", 1500.0),
            ("Woodgrove", 2700.0)
        ) AS 
              D( customer, amount );
OUTPUT @a
    TO "/data.csv"
    USING Outputters.Csv();
"""

jobId = str(uuid.uuid4())
jobResult = adlaJobClient.job.create(
    adla,
    jobId,
    JobInformation(
        name='Sample Job',
        type='USql',
        properties=USqlJobProperties(script=script)
    )
)

Aguarde até que um trabalho seja encerrado

jobResult = adlaJobClient.job.get(adla, jobId)
while(jobResult.state != JobState.ended):
    print('Job is not yet done, waiting for 3 seconds. Current state: ' +
          jobResult.state.value)
    time.sleep(3)
    jobResult = adlaJobClient.job.get(adla, jobId)

print('Job finished with result: ' + jobResult.result.value)

Listar pipelines e recorrências

Dependendo se os trabalhos tiverem metadados de pipeline ou de recorrência anexados, você poderá listar os pipelines e as recorrências.

pipelines = adlaJobClient.pipeline.list(adla)
for p in pipelines:
    print('Pipeline: ' + p.name + ' ' + p.pipelineId)

recurrences = adlaJobClient.recurrence.list(adla)
for r in recurrences:
    print('Recurrence: ' + r.name + ' ' + r.recurrenceId)

Gerenciar políticas de computação

O objeto DataLakeAnalyticsAccountManagementClient fornece métodos para gerenciar as políticas de computação para uma conta do Data Lake Analytics.

Listar políticas de computação

O código a seguir recupera uma lista de políticas de computação para uma conta do Data Lake Analytics.

policies = adlaAcctClient.compute_policies.list_by_account(rg, adla)
for p in policies:
    print('Name: ' + p.name + 'Type: ' + p.object_type + 'Max AUs / job: ' +
          p.max_degree_of_parallelism_per_job + 'Min priority / job: ' + p.min_priority_per_job)

Criar uma nova política de computação

O código a seguir cria uma nova política de computação para uma conta do Data Lake Analytics, definindo a quantidade máxima de AUs disponíveis para o usuário especificado como 50 e a prioridade mínima de trabalho como 250.

userAadObjectId = "3b097601-4912-4d41-b9d2-78672fc2acde"
newPolicyParams = CreateOrUpdateComputePolicyParameters(
    userAadObjectId, "User", 50, 250)
adlaAcctClient.compute_policies.create_or_update(
    rg, adla, "GaryMcDaniel", newPolicyParams)

Próximas etapas