Поделиться через


Управление Azure Data Lake Analytics с помощью Python

Это важно

Azure Data Lake Analytics вышел из эксплуатации 29 февраля 2024 года. Дополнительные сведения см. в этом объявлении.

Для аналитики данных ваша организация может использовать Azure Synapse Analytics или Microsoft Fabric.

В этой статье описывается, как управлять учетными записями Azure Data Lake Analytics, источниками данных, пользователями и заданиями с помощью Python.

Поддерживаемые версии Python

  • Используйте 64-разрядную версию Python.
  • Вы можете использовать стандартный дистрибутив Python, найденный в Python.org скачивания.
  • Многие разработчики находят удобное использование дистрибутива Anaconda Python.
  • Эта статья была написана с помощью Python версии 3.6 из стандартного дистрибутива Python

Установка пакета SDK для Python для Azure

Установите следующие модули:

  • Модуль azure-mgmt-resource включает другие модули Azure для Active Directory и т. д.
  • Модуль azure-datalake-store включает операции файловой системы Azure Data Lake Store.
  • Модуль azure-mgmt-datalake-store включает операции управления учетными записями Azure Data Lake Store.
  • Модуль azure-mgmt-datalake-analytics включает операции Azure Data Lake Analytics.

Сначала убедитесь, что у вас есть последняя pip версия, выполнив следующую команду:

python -m pip install --upgrade pip

Этот документ был написан с помощью pip version 9.0.1.

Используйте следующие pip команды, чтобы установить модули из командной строки:

pip install azure-identity
pip install azure-mgmt-resource
pip install azure-datalake-store
pip install azure-mgmt-datalake-store
pip install azure-mgmt-datalake-analytics

Создание сценария Python

Вставьте следующий код в скрипт:

# Use this only for Azure AD service-to-service authentication
#from azure.common.credentials import ServicePrincipalCredentials

# Use this only for Azure AD end-user authentication
#from azure.common.credentials import UserPassCredentials

# Required for Azure Identity
from azure.identity import DefaultAzureCredential

# Required for Azure Resource Manager
from azure.mgmt.resource.resources import ResourceManagementClient
from azure.mgmt.resource.resources.models import ResourceGroup

# Required for Azure Data Lake Store account management
from azure.mgmt.datalake.store import DataLakeStoreAccountManagementClient
from azure.mgmt.datalake.store.models import DataLakeStoreAccount

# Required for Azure Data Lake Store filesystem management
from azure.datalake.store import core, lib, multithread

# Required for Azure Data Lake Analytics account management
from azure.mgmt.datalake.analytics.account import DataLakeAnalyticsAccountManagementClient
from azure.mgmt.datalake.analytics.account.models import DataLakeAnalyticsAccount, DataLakeStoreAccountInformation

# Required for Azure Data Lake Analytics job management
from azure.mgmt.datalake.analytics.job import DataLakeAnalyticsJobManagementClient
from azure.mgmt.datalake.analytics.job.models import JobInformation, JobState, USqlJobProperties

# Required for Azure Data Lake Analytics catalog management
from azure.mgmt.datalake.analytics.catalog import DataLakeAnalyticsCatalogManagementClient

# Required for Azure Data Lake Analytics Model
from azure.mgmt.datalake.analytics.account.models import CreateOrUpdateComputePolicyParameters

# Use these as needed for your application
import logging
import getpass
import pprint
import uuid
import time

Запустите этот скрипт, чтобы убедиться, что модули можно импортировать.

Аутентификация

Интерактивная проверка подлинности пользователей с помощью всплывающего окна

Этот метод не поддерживается.

Интерактивная проверка подлинности пользователей с помощью кода устройства

user = input(
    'Enter the user to authenticate with that has permission to subscription: ')
password = getpass.getpass()
credentials = UserPassCredentials(user, password)

Неинтерактивная проверка подлинности с помощью SPI и секрета

# Acquire a credential object for the app identity. When running in the cloud,
# DefaultAzureCredential uses the app's managed identity (MSI) or user-assigned service principal.
# When run locally, DefaultAzureCredential relies on environment variables named
# AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, and AZURE_TENANT_ID.

credentials = DefaultAzureCredential()

Неинтерактивная проверка подлинности с помощью API и сертификата

Этот метод не поддерживается.

Общие переменные скрипта

Эти переменные используются в примерах.

subid = '<Azure Subscription ID>'
rg = '<Azure Resource Group Name>'
location = '<Location>'  # i.e. 'eastus2'
adls = '<Azure Data Lake Store Account Name>'
adla = '<Azure Data Lake Analytics Account Name>'

Создание клиентов

resourceClient = ResourceManagementClient(credentials, subid)
adlaAcctClient = DataLakeAnalyticsAccountManagementClient(credentials, subid)
adlaJobClient = DataLakeAnalyticsJobManagementClient(
    credentials, 'azuredatalakeanalytics.net')

Создание группы ресурсов Azure

armGroupResult = resourceClient.resource_groups.create_or_update(
    rg, ResourceGroup(location=location))

Создание учетной записи Data Lake Analytics

Сначала создайте учетную запись магазина.

adlsAcctResult = adlsAcctClient.account.begin_create(
	rg,
	adls,
	DataLakeStoreAccount(
		location=location)
	)
).wait()

Затем создайте учетную запись ADLA, которая использует это хранилище.

adlaAcctResult = adlaAcctClient.account.create(
    rg,
    adla,
    DataLakeAnalyticsAccount(
        location=location,
        default_data_lake_store_account=adls,
        data_lake_store_accounts=[DataLakeStoreAccountInformation(name=adls)]
    )
).wait()

Отправка задания

script = """
@a  = 
    SELECT * FROM 
        (VALUES
            ("Contoso", 1500.0),
            ("Woodgrove", 2700.0)
        ) AS 
              D( customer, amount );
OUTPUT @a
    TO "/data.csv"
    USING Outputters.Csv();
"""

jobId = str(uuid.uuid4())
jobResult = adlaJobClient.job.create(
    adla,
    jobId,
    JobInformation(
        name='Sample Job',
        type='USql',
        properties=USqlJobProperties(script=script)
    )
)

Дождитесь завершения задания

jobResult = adlaJobClient.job.get(adla, jobId)
while(jobResult.state != JobState.ended):
    print('Job is not yet done, waiting for 3 seconds. Current state: ' +
          jobResult.state.value)
    time.sleep(3)
    jobResult = adlaJobClient.job.get(adla, jobId)

print('Job finished with result: ' + jobResult.result.value)

Список конвейеров и повторений

В зависимости от того, имеют ли задания метаданные конвейера или повторения, можно отобразить конвейеры и повторения.

pipelines = adlaJobClient.pipeline.list(adla)
for p in pipelines:
    print('Pipeline: ' + p.name + ' ' + p.pipelineId)

recurrences = adlaJobClient.recurrence.list(adla)
for r in recurrences:
    print('Recurrence: ' + r.name + ' ' + r.recurrenceId)

Управление политиками вычислений

Объект DataLakeAnalyticsAccountManagementClient предоставляет методы управления политиками вычислений для учетной записи Data Lake Analytics.

Список политик вычислений

Следующий код извлекает список политик вычислений для учетной записи Data Lake Analytics.

policies = adlaAcctClient.compute_policies.list_by_account(rg, adla)
for p in policies:
    print('Name: ' + p.name + 'Type: ' + p.object_type + 'Max AUs / job: ' +
          p.max_degree_of_parallelism_per_job + 'Min priority / job: ' + p.min_priority_per_job)

Создание новой политики вычислений

Следующий код создает новую политику вычислений для учетной записи Data Lake Analytics, задав максимальное число единиц AUS, доступных указанному пользователю, значение 50, а минимальный приоритет задания — 250.

userAadObjectId = "3b097601-4912-4d41-b9d2-78672fc2acde"
newPolicyParams = CreateOrUpdateComputePolicyParameters(
    userAadObjectId, "User", 50, 250)
adlaAcctClient.compute_policies.create_or_update(
    rg, adla, "GaryMcDaniel", newPolicyParams)

Дальнейшие действия