Использование определяемых пользователем функций Python с Apache Hive и Apache Pig в HDInsight
Узнайте, как использовать определяемые пользователем функции (UDF) Python с Apache Hive и Apache Pig в Apache Hadoop на кластерах Azure HDInsight.
Python в HDInsight
Python2.7
устанавливается по умолчанию в HDInsight 3.0 и более поздних версиях. Apache Hive можно использовать с этой версией Python для потоковой обработки. При этом для передачи данных между Hive и определяемой пользователем функцией используется STDOUT и STDIN.
В состав HDInsight также входят Jython, который представляет собой реализацию Python, написанную на Java. Jython выполняется непосредственно на виртуальной машине Java и не использует потоковую передачу. Jython является рекомендуемым интерпретатором Python при использовании Python с Pig.
Необходимые компоненты
- Кластер Hadoop в HDInsight. Ознакомьтесь со статьей Краткое руководство. Использование Apache Hadoop и Apache Hive в Azure HDInsight с шаблоном Resource Manager.
- Клиент SSH. Дополнительные сведения см. в руководстве по подключению к HDInsight (Apache Hadoop) с помощью SSH.
- Схема универсального кода ресурса (URI) для основного хранилища кластеров.
wasb://
для службы хранилища Azure,abfs://
для Azure Data Lake Storage 2-го поколения или adl:// для Azure Data Lake Storage 1-го поколения. Если для службы хранилища Azure включено безопасное перемещение, URI будет таким: wasbs://. См. также сведения о безопасной передаче. - Возможное изменение конфигурации хранилища. Если используется учетная запись хранения типа
BlobStorage
, см. раздел Конфигурация хранилища. - Необязательно. Если вы планируете использовать PowerShell, необходимо установить модуль AZ.
Примечание.
Учетная запись хранения, используемая в этой статье, относится к службе хранилища Azure с включенным безопасным перемещением, поэтому здесь везде используется wasbs
.
Конфигурация хранилища
Если используемая учетная запись хранения относится к типу Storage (general purpose v1)
или StorageV2 (general purpose v2)
, ничего делать не нужно. Процесс, приведенный в этой статье, создает по крайней мере /tezstaging
выходные данные. Конфигурация hadoop по умолчанию содержится /tezstaging
в переменной fs.azure.page.blob.dir
core-site.xml
конфигурации для службы HDFS
. Эта конфигурация приводит к выводу в каталог страничных BLOB-объектов, которые не поддерживаются для типа BlobStorage
учетной записи хранения. Чтобы использовать BlobStorage
для процесса, описанного в этой статье, удалите /tezstaging
из переменной конфигурации fs.azure.page.blob.dir
. Для доступа к конфигурации используйте пользовательский интерфейс Ambari. В противном случае вы получите сообщение об ошибке: Page blob is not supported for this account type.
Предупреждение
Шаги в этом документе основаны на следующих предположениях:
- Вы создаете скрипты Python в локальной среде разработки.
- Вы отправляете скрипты в HDInsight, используя команду
scp
или предоставленный скрипт PowerShell.
Если для работы с HDInsight вы хотите использовать Azure Cloud Shell (bash), вам необходимо:
- Создать скрипты в среде Cloud Shell.
- Использовать
scp
для отправки файлов из Cloud Shell в HDInsight. - Использовать
ssh
из Cloud Shell для подключения к HDInsight и выполнения примеров.
Определяемая пользователем функция Apache Hive
Скрипт Python можно использовать в качестве определяемой пользователем функции из Hive через HiveQL с помощью инструкции TRANSFORM
. Например, следующий запрос HiveQL вызывает файл hiveudf.py
, хранящийся в учетной записи хранения Azure по умолчанию для кластера.
add file wasbs:///hiveudf.py;
SELECT TRANSFORM (clientid, devicemake, devicemodel)
USING 'python hiveudf.py' AS
(clientid string, phoneLabel string, phoneHash string)
FROM hivesampletable
ORDER BY clientid LIMIT 50;
Вот что делает данный пример:
- Инструкция
add file
в начале файла добавляет файлhiveudf.py
в распределенный кэш, и он становится доступен всем узлам кластера. - Инструкция
SELECT TRANSFORM ... USING
выбирает данные изhivesampletable
. Она также передает параметры clientid, devicemake и devicemodel в скриптhiveudf.py
. - Предложение
AS
описывает поля, возвращаемые изhiveudf.py
.
Создать файл
В среде разработки создайте текстовый файл с именем hiveudf.py
. Используйте следующий код в качестве содержимого файла:
#!/usr/bin/env python
import sys
import string
import hashlib
while True:
line = sys.stdin.readline()
if not line:
break
line = string.strip(line, "\n ")
clientid, devicemake, devicemodel = string.split(line, "\t")
phone_label = devicemake + ' ' + devicemodel
print "\t".join([clientid, phone_label, hashlib.md5(phone_label).hexdigest()])
Сценарий выполняет следующие действия:
- Считывается строка данных из STDIN.
- Стоящий в конце знак новой строки удаляется с помощью
string.strip(line, "\n ")
. - При обработке потока в одной строке будут содержаться все значения, разделенные символом табуляции. Поэтому можно использовать
string.split(line, "\t")
для разделения входящих данных при каждой табуляции, возвращая лишь поля. - По завершении обработки результат должен быть записан в поток STDOUT в виде одной строки, с разделенными символами табуляции полями. Например,
print "\t".join([clientid, phone_label, hashlib.md5(phone_label).hexdigest()])
. - Цикл
while
повторяется до тех пор, пока считываетсяline
.
Выходные данные скрипта представляют собой объединенные входные значения для devicemake
и devicemodel
, а также хэш для объединенного значения.
Отправка файла (оболочка)
Следующая команда заменяет sshuser
фактическое имя пользователя, если отличается. Вместо mycluster
укажите реальное имя кластера. Рабочей должна быть папка, в которой находится файл.
Используйте
scp
для копирования файлов в кластер HDInsight. Измените и введите команду:scp hiveudf.py sshuser@mycluster-ssh.azurehdinsight.net:
Используйте SSH, чтобы подключиться к кластеру. Измените и введите команду:
ssh sshuser@mycluster-ssh.azurehdinsight.net
В сеансе SSH добавьте отправленные ранее файлы Python в хранилище для кластера.
hdfs dfs -put hiveudf.py /hiveudf.py
Использование Hive UDF (оболочка)
Чтобы подключиться к Hive, в открытом сеансе SSH введите следующую команду:
beeline -u 'jdbc:hive2://headnodehost:10001/;transportMode=http'
Эта команда запускает клиент Beeline.
Введите следующий запрос
0: jdbc:hive2://headnodehost:10001/>
в командной строке:add file wasbs:///hiveudf.py; SELECT TRANSFORM (clientid, devicemake, devicemodel) USING 'python hiveudf.py' AS (clientid string, phoneLabel string, phoneHash string) FROM hivesampletable ORDER BY clientid LIMIT 50;
После ввода последней строки задание должно начинаться. По завершении задания эта команда возвращает выходные данные следующего вида:
100041 RIM 9650 d476f3687700442549a83fac4560c51c 100041 RIM 9650 d476f3687700442549a83fac4560c51c 100042 Apple iPhone 4.2.x 375ad9a0ddc4351536804f1d5d0ea9b9 100042 Apple iPhone 4.2.x 375ad9a0ddc4351536804f1d5d0ea9b9 100042 Apple iPhone 4.2.x 375ad9a0ddc4351536804f1d5d0ea9b9
Чтобы выйти из Beeline, введите следующую команду:
!q
Отправка файла (PowerShell)
PowerShell также можно использовать для удаленного запуска запросов на использование Hive. Рабочей должна быть папка, в которой находится hiveudf.py
. Чтобы выполнить запрос Hive, использующий скрипт hiveudf.py
, примените следующий скрипт PowerShell:
# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
Connect-AzAccount
}
# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"
# Revise file path as needed
$pathToStreamingFile = ".\hiveudf.py"
# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$clusterInfo = Get-AzHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageAccountName=$clusterInfo.DefaultStorageAccount.split('.')[0]
$container=$clusterInfo.DefaultStorageContainer
$storageAccountKey=(Get-AzStorageAccountKey `
-ResourceGroupName $resourceGroup `
-Name $storageAccountName)[0].Value
# Create an Azure Storage context
$context = New-AzStorageContext `
-StorageAccountName $storageAccountName `
-StorageAccountKey $storageAccountKey
# Upload local files to an Azure Storage blob
Set-AzStorageBlobContent `
-File $pathToStreamingFile `
-Blob "hiveudf.py" `
-Container $container `
-Context $context
Примечание.
Дополнительные сведения об отправке файлов см. в статье Отправка данных для заданий Apache Hadoop в HDInsight.
Использование Hive UDF
# Script should stop on failures
$ErrorActionPreference = "Stop"
# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
Connect-AzAccount
}
# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"
# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -UserName "admin" -Message "Enter the login for the cluster"
$HiveQuery = "add file wasbs:///hiveudf.py;" +
"SELECT TRANSFORM (clientid, devicemake, devicemodel) " +
"USING 'python hiveudf.py' AS " +
"(clientid string, phoneLabel string, phoneHash string) " +
"FROM hivesampletable " +
"ORDER BY clientid LIMIT 50;"
# Create Hive job object
$jobDefinition = New-AzHDInsightHiveJobDefinition `
-Query $HiveQuery
# For status bar updates
$activity="Hive query"
# Progress bar (optional)
Write-Progress -Activity $activity -Status "Starting query..."
# Start defined Azure HDInsight job on specified cluster.
$job = Start-AzHDInsightJob `
-ClusterName $clusterName `
-JobDefinition $jobDefinition `
-HttpCredential $creds
# Progress bar (optional)
Write-Progress -Activity $activity -Status "Waiting on query to complete..."
# Wait for completion or failure of specified job
Wait-AzHDInsightJob `
-JobId $job.JobId `
-ClusterName $clusterName `
-HttpCredential $creds
# Uncomment the following to see stderr output
<#
Get-AzHDInsightJobOutput `
-Clustername $clusterName `
-JobId $job.JobId `
-HttpCredential $creds `
-DisplayOutputType StandardError
#>
# Progress bar (optional)
Write-Progress -Activity $activity -Status "Retrieving output..."
# Gets the log output
Get-AzHDInsightJobOutput `
-Clustername $clusterName `
-JobId $job.JobId `
-HttpCredential $creds
Результат выполнения задания Hive должен выглядеть аналогично следующему примеру:
100041 RIM 9650 d476f3687700442549a83fac4560c51c
100041 RIM 9650 d476f3687700442549a83fac4560c51c
100042 Apple iPhone 4.2.x 375ad9a0ddc4351536804f1d5d0ea9b9
100042 Apple iPhone 4.2.x 375ad9a0ddc4351536804f1d5d0ea9b9
100042 Apple iPhone 4.2.x 375ad9a0ddc4351536804f1d5d0ea9b9
Определяемая пользователем функция Apache Pig
Скрипт Python можно использовать в виде определяемой пользователем функции из Pig с использованием инструкции GENERATE
. Вы можете запустить скрипт с помощью Jython или CPython.
- Jython работает на виртуальной машине Java и изначально может вызываться из Pig.
- CPython является внешним процессом, поэтому данные из Pig на JVM отправляются в скрипт, выполняющийся в процессе Python. Выходные данные скрипта Python отправляются обратно в Pig.
Чтобы указать интерпретатор Python, используйте register
при указании ссылки на скрипт Python. Следующие примеры регистрируют скрипты с Pig в качестве myfuncs
:
- Для использования Jython:
register '/path/to/pigudf.py' using jython as myfuncs;
- Для использования CPython:
register '/path/to/pigudf.py' using streaming_python as myfuncs;
Внимание
При использовании Jython путь к файлу pig_jython может быть локальным или указан как WASBS://. Но при использовании CPython необходимо указать ссылку на файл в локальной файловой системе узла, который используется для отправки задания Pig.
После регистрации язык Pig Latin будет одинаковым для обоих примеров:
LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);
LOG = FILTER LOGS by LINE is not null;
DETAILS = FOREACH LOG GENERATE myfuncs.create_structure(LINE);
DUMP DETAILS;
Вот что делает данный пример:
- Первая строка загружает образец файла данных
sample.log
вLOGS
. Она также определяет каждую запись как массив символовchararray
. - Следующая строка отфильтровывает все пустые значения, сохраняя результат работы в
LOG
. - Затем выполняется итерация по записям в
LOG
и используется инструкцияGENERATE
для вызова методаcreate_structure
, содержащегося в скрипте Python или Jython, загруженном какmyfuncs
.LINE
используется для передачи текущей записи в функцию. - Наконец, выходные данные сбрасываются в поток STDOUT командой
DUMP
. После завершения операции эта команда выведет результат.
Создать файл
В среде разработки создайте текстовый файл с именем pigudf.py
. Используйте следующий код в качестве содержимого файла:
# Uncomment the following if using C Python
#from pig_util import outputSchema
@outputSchema("log: {(date:chararray, time:chararray, classname:chararray, level:chararray, detail:chararray)}")
def create_structure(input):
if (input.startswith('java.lang.Exception')):
input = input[21:len(input)] + ' - java.lang.Exception'
date, time, classname, level, detail = input.split(' ', 4)
return date, time, classname, level, detail
В примере с Pig Latin входные данные LINE
определены как массив знаков, т. к. согласованной схемы входных данных нет. Скрипт Python выполняет преобразование данных в согласованную схему на выходе.
Инструкция
@outputSchema
задает формат данных, в котором они возвращаются в Pig. В данном случае это data bag, являющийся типом данных Pig. Корзина содержит следующие поля, все они имеют тип "Массив строк" (строки):- date — дата создания записи журнала;
- date — время создания записи журнала;
- classname — имя класса, для которого создана запись;
- level — уровень журналирования;
- detail — подробная информация о записи журнала.
Затем
def create_structure(input)
определяет функцию, в которую Pig отправляет строковые элементы.Данные для примера,
sample.log
, в основном соответствуют схеме даты, времени, имени класса, уровня и подробной информации. Однако он содержит несколько строк, начинающихся с*java.lang.Exception*
. Эти строки должны быть изменены в соответствии со схемой. Инструкцияif
проверяет на наличие таких строк, затем обрабатывает входные данные, переставляя строку*java.lang.Exception*
в конец, формируя данные в соответствии с ожидаемой схемой.Затем команда
split
используется для разделения данных по первым четырем символам пробела. Выходным данным присваиваются значенияdate
,time
,classname
,level
иdetail
.И результаты возвращаются в Pig.
Когда данные возвращаются в Pig, они имеют согласованную схему, определенную инструкцией @outputSchema
.
Отправка файла (оболочка)
В командах ниже вместо sshuser
укажите реальное имя пользователя, если оно отличается. Вместо mycluster
укажите реальное имя кластера. Рабочей должна быть папка, в которой находится файл.
Используйте
scp
для копирования файлов в кластер HDInsight. Измените и введите команду:scp pigudf.py sshuser@mycluster-ssh.azurehdinsight.net:
Используйте SSH, чтобы подключиться к кластеру. Измените и введите команду:
ssh sshuser@mycluster-ssh.azurehdinsight.net
В сеансе SSH добавьте отправленные ранее файлы Python в хранилище для кластера.
hdfs dfs -put pigudf.py /pigudf.py
Использование Pig UDF (оболочка)
Чтобы подключиться к Pig, в открытом сеансе SSH введите следующую команду:
pig
В окне запроса
grunt>
введите следующие операторы:Register wasbs:///pigudf.py using jython as myfuncs; LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray); LOG = FILTER LOGS by LINE is not null; DETAILS = foreach LOG generate myfuncs.create_structure(LINE); DUMP DETAILS;
После ввода следующей строки задание должно начинаться. По завершении задания эта команда возвращает выходные данные следующего вида:
((2012-02-03,20:11:56,SampleClass5,[TRACE],verbose detail for id 990982084)) ((2012-02-03,20:11:56,SampleClass7,[TRACE],verbose detail for id 1560323914)) ((2012-02-03,20:11:56,SampleClass8,[DEBUG],detail for id 2083681507)) ((2012-02-03,20:11:56,SampleClass3,[TRACE],verbose detail for id 1718828806)) ((2012-02-03,20:11:56,SampleClass3,[INFO],everything normal for id 530537821))
Используйте
quit
для выхода из оболочки Grunt, а затем следующую команду для изменения файла pigudf.py в локальной файловой системе:nano pigudf.py
Войдите в редактор и раскомментируйте следующую строку, удалив символ
#
в начале строки.#from pig_util import outputSchema
Эта строка изменяет сценарий Python для работы с C Python вместо Jython. Закончив вносить изменения, нажмите клавиши CTRL+X, чтобы выйти из редактора. Выберите Y (Да) и нажмите клавишу ВВОД, чтобы сохранить изменения.
Используйте команду
pig
, чтобы снова запустить оболочку. При появлении запросаgrunt>
введите следующие инструкции, чтобы запустить сценарий Python с помощью интерпретатора CPython.Register 'pigudf.py' using streaming_python as myfuncs; LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray); LOG = FILTER LOGS by LINE is not null; DETAILS = foreach LOG generate myfuncs.create_structure(LINE); DUMP DETAILS;
Когда это задание будет выполнено, вы увидите такой же результат, как при запуске сценария с помощью Jython.
Отправка файла (PowerShell)
PowerShell также можно использовать для удаленного запуска запросов на использование Hive. Рабочей должна быть папка, в которой находится pigudf.py
. Чтобы выполнить запрос Hive, использующий скрипт pigudf.py
, примените следующий скрипт PowerShell:
# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
Connect-AzAccount
}
# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"
# Revise file path as needed
$pathToJythonFile = ".\pigudf.py"
# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$clusterInfo = Get-AzHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageAccountName=$clusterInfo.DefaultStorageAccount.split('.')[0]
$container=$clusterInfo.DefaultStorageContainer
$storageAccountKey=(Get-AzStorageAccountKey `
-ResourceGroupName $resourceGroup `
-Name $storageAccountName)[0].Value
# Create an Azure Storage context
$context = New-AzStorageContext `
-StorageAccountName $storageAccountName `
-StorageAccountKey $storageAccountKey
# Upload local files to an Azure Storage blob
Set-AzStorageBlobContent `
-File $pathToJythonFile `
-Blob "pigudf.py" `
-Container $container `
-Context $context
Использование Pig UDF (PowerShell)
Примечание.
При удаленной отправке задания с помощью PowerShell нельзя использовать CPython в качестве интерпретатора.
PowerShell также можно использовать для запуска заданий Pig Latin. Чтобы выполнить задание Pig Latin, использующее скрипт pigudf.py
, примените следующий скрипт PowerShell:
# Script should stop on failures
$ErrorActionPreference = "Stop"
# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
Connect-AzAccount
}
# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -UserName "admin" -Message "Enter the login for the cluster"
$PigQuery = "Register wasbs:///pigudf.py using jython as myfuncs;" +
"LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);" +
"LOG = FILTER LOGS by LINE is not null;" +
"DETAILS = foreach LOG generate myfuncs.create_structure(LINE);" +
"DUMP DETAILS;"
# Create Pig job object
$jobDefinition = New-AzHDInsightPigJobDefinition -Query $PigQuery
# For status bar updates
$activity="Pig job"
# Progress bar (optional)
Write-Progress -Activity $activity -Status "Starting job..."
# Start defined Azure HDInsight job on specified cluster.
$job = Start-AzHDInsightJob `
-ClusterName $clusterName `
-JobDefinition $jobDefinition `
-HttpCredential $creds
# Progress bar (optional)
Write-Progress -Activity $activity -Status "Waiting for the Pig job to complete..."
# Wait for completion or failure of specified job
Wait-AzHDInsightJob `
-Job $job.JobId `
-ClusterName $clusterName `
-HttpCredential $creds
# Uncomment the following to see stderr output
<#
Get-AzHDInsightJobOutput `
-Clustername $clusterName `
-JobId $job.JobId `
-HttpCredential $creds `
-DisplayOutputType StandardError
#>
# Progress bar (optional)
Write-Progress -Activity $activity "Retrieving output..."
# Gets the log output
Get-AzHDInsightJobOutput `
-Clustername $clusterName `
-JobId $job.JobId `
-HttpCredential $creds
Результат выполнения задания Pig должен выглядеть аналогично следующим данным:
((2012-02-03,20:11:56,SampleClass5,[TRACE],verbose detail for id 990982084))
((2012-02-03,20:11:56,SampleClass7,[TRACE],verbose detail for id 1560323914))
((2012-02-03,20:11:56,SampleClass8,[DEBUG],detail for id 2083681507))
((2012-02-03,20:11:56,SampleClass3,[TRACE],verbose detail for id 1718828806))
((2012-02-03,20:11:56,SampleClass3,[INFO],everything normal for id 530537821))
Устранение неполадок
Ошибки при выполнении заданий
При выполнении задания hive может возникнуть ошибка, аналогичная приведенной ниже:
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: [Error 20001]: An error occurred while reading or writing to your custom script. It may have crashed with an error.
Эта проблема может быть вызвана символами окончания строк в файле Python. Многие редакторы Windows по умолчанию используют символы CRLF, но в приложениях Linux обычно ожидается использование символа LF.
Вы можете использовать следующие команды PowerShell для удаления символов CR перед передачей файла в HDInsight:
Write-Progress -Activity $activity -Status "Waiting for the Pig job to complete..."
# Wait for completion or failure of specified job
Скрипты PowerShell
Оба примера скриптов PowerShell, используемых для запуска примеров, содержат закомментированную строку, которая отображает вывод ошибок для задания. Если вы не видите ожидаемых результатов задания, раскомментируйте следующую строку и посмотрите, нет ли в описании ошибки информации о проблеме.
$activity="Pig job"
# Progress bar (optional)
Write-Progress -Activity $activity -Status "Starting job..."
Сведения об ошибках (STDERR) и результат выполнения задания (STDOUT) также записываются в хранилище HDInsight.
Для данного задания... | Смотрите эти файлы в контейнере |
---|---|
Куст | /HivePython/stderr /HivePython/stdout |
Pig, | /PigPython/stderr /PigPython/stdout |
Следующие шаги
Если вам нужно загрузить модули Python, которые не поставляются по умолчанию, см. статью How to deploy a Python module to Windows Azure HDInsight (Как развернуть модуль Python в Windows Azure HDInsight).
Сведения о других способах использования Pig и Hive и дополнительную информацию об использовании MapReduce см. в следующих документах: