Usar funções definidas pelo usuário (UDF) do Python com o Apache Hive e o Apache Pig no HDInsight
Saiba como usar funções definidas pelo usuário (UDF) Python com o Apache Hive e o Apache Pig no Apache Hadoop no Azure HDInsight.
Python no HDInsight
Python2.7
é instalado por padrão no HDInsight 3.0 e posterior. O Apache Hive pode ser usado com esta versão do Python para processamento de fluxo. O processamento de fluxo usa STDOUT e STDIN para passar dados entre o Hive e o UDF.
O HDInsight também inclui o Jython, que é uma implementação Python escrita em Java. O Jython é executado diretamente na Java Virtual Machine e não usa streaming. Jython é o interpretador Python recomendado ao usar Python com Pig.
Pré-requisitos
- Um cluster Hadoop no HDInsight. Consulte Introdução ao HDInsight no Linux.
- Um cliente SSH. Para obter mais informações, veja Ligar ao HDInsight (Apache Hadoop) através de SSH.
- O esquema de URI para o armazenamento primário de clusters. Isso seria
wasb://
para o Armazenamento do Azure,abfs://
para o Azure Data Lake Storage Gen2 ou adl:// para o Azure Data Lake Storage Gen1. Se a transferência segura estiver habilitada para o Armazenamento do Azure, o URI será wasbs://. Consulte também, transferência segura. - Possível alteração na configuração de armazenamento. Consulte Configuração de armazenamento se estiver usando o tipo
BlobStorage
de conta de armazenamento . - Opcional. Se estiver planejando usar o PowerShell, você precisará do módulo AZ instalado.
Nota
A conta de armazenamento usada neste artigo foi o Armazenamento do Azure com transferência segura habilitada e, portanto, wasbs
é usada em todo o artigo.
Configuração do armazenamento
Nenhuma ação será necessária se a conta de armazenamento usada for do tipo Storage (general purpose v1)
ou StorageV2 (general purpose v2)
. O processo neste artigo produz saída para pelo menos /tezstaging
. Uma configuração hadoop padrão contém /tezstaging
na fs.azure.page.blob.dir
variável de configuração em core-site.xml
for service HDFS
. Essa configuração faz com que a saída para o diretório seja blobs de página, que não são suportados para o tipo BlobStorage
de conta de armazenamento. Para usar BlobStorage
neste artigo, remova /tezstaging
da fs.azure.page.blob.dir
variável de configuração. A configuração pode ser acessada a partir da interface do usuário do Ambari. Caso contrário, receberá a mensagem de erro: Page blob is not supported for this account type.
Aviso
As etapas deste documento partem das seguintes suposições:
- Você cria os scripts Python em seu ambiente de desenvolvimento local.
- Carregue os scripts para o HDInsight usando o
scp
comando ou o script do PowerShell fornecido.
Se você quiser usar o Azure Cloud Shell (bash) para trabalhar com o HDInsight, deve:
- Crie os scripts dentro do ambiente de shell de nuvem.
- Use
scp
para carregar os arquivos do shell da nuvem para o HDInsight. - Use
ssh
a partir do shell da nuvem para se conectar ao HDInsight e executar os exemplos.
Apache Hive UDF
Python pode ser usado como um UDF do Hive através da instrução HiveQL TRANSFORM
. Por exemplo, o HiveQL a seguir invoca o hiveudf.py
arquivo armazenado na conta padrão do Armazenamento do Azure para o cluster.
add file wasbs:///hiveudf.py;
SELECT TRANSFORM (clientid, devicemake, devicemodel)
USING 'python hiveudf.py' AS
(clientid string, phoneLabel string, phoneHash string)
FROM hivesampletable
ORDER BY clientid LIMIT 50;
Veja o que este exemplo faz:
- A
add file
instrução no início do arquivo adiciona ohiveudf.py
arquivo ao cache distribuído, para que seja acessível por todos os nós no cluster. - A
SELECT TRANSFORM ... USING
instrução seleciona dados dohivesampletable
. Ele também passa os valores clientid, devicemake e devicemodel para ohiveudf.py
script. - A
AS
cláusula descreve os campos retornados dehiveudf.py
.
Criar ficheiro
Em seu ambiente de desenvolvimento, crie um arquivo de texto chamado hiveudf.py
. Use o seguinte código como o conteúdo do arquivo:
#!/usr/bin/env python
import sys
import string
import hashlib
while True:
line = sys.stdin.readline()
if not line:
break
line = string.strip(line, "\n ")
clientid, devicemake, devicemodel = string.split(line, "\t")
phone_label = devicemake + ' ' + devicemodel
print "\t".join([clientid, phone_label, hashlib.md5(phone_label).hexdigest()])
Este script executa as seguintes ações:
- Lê uma linha de dados de STDIN.
- O caractere de nova linha à direita é removido usando
string.strip(line, "\n ")
. - Ao fazer o processamento de fluxo, uma única linha contém todos os valores com um caractere de tabulação entre cada valor. Assim,
string.split(line, "\t")
pode ser usado para dividir a entrada em cada guia, retornando apenas os campos. - Quando o processamento estiver concluído, a saída deve ser gravada no STDOUT como uma única linha, com uma guia entre cada campo. Por exemplo,
print "\t".join([clientid, phone_label, hashlib.md5(phone_label).hexdigest()])
. - O
while
loop se repete até que nãoline
seja lido.
A saída do script é uma concatenação dos valores de entrada de devicemake
e devicemodel
, bem como um hash do valor concatenado.
Carregar arquivo (shell)
O comando a seguir substitui sshuser
pelo nome de usuário real, se diferente. Substitua mycluster
pelo nome real do cluster. Verifique se o diretório de trabalho está onde o arquivo está localizado.
Use
scp
para copiar os arquivos para o cluster HDInsight. Edite e digite o comando:scp hiveudf.py sshuser@mycluster-ssh.azurehdinsight.net:
Use SSH para se conectar ao cluster. Edite e digite o comando:
ssh sshuser@mycluster-ssh.azurehdinsight.net
Na sessão SSH, adicione os arquivos Python carregados anteriormente ao armazenamento do cluster.
hdfs dfs -put hiveudf.py /hiveudf.py
Usar Hive UDF (shell)
Para se conectar ao Hive, use o seguinte comando da sua sessão SSH aberta:
beeline -u 'jdbc:hive2://headnodehost:10001/;transportMode=http'
Este comando inicia o cliente Beeline.
Digite a seguinte consulta no
0: jdbc:hive2://headnodehost:10001/>
prompt:add file wasbs:///hiveudf.py; SELECT TRANSFORM (clientid, devicemake, devicemodel) USING 'python hiveudf.py' AS (clientid string, phoneLabel string, phoneHash string) FROM hivesampletable ORDER BY clientid LIMIT 50;
Assim que a última linha for inserida, o trabalho deve ser iniciado. Quando o trabalho for concluído, ele retornará uma saída semelhante ao exemplo a seguir:
100041 RIM 9650 d476f3687700442549a83fac4560c51c 100041 RIM 9650 d476f3687700442549a83fac4560c51c 100042 Apple iPhone 4.2.x 375ad9a0ddc4351536804f1d5d0ea9b9 100042 Apple iPhone 4.2.x 375ad9a0ddc4351536804f1d5d0ea9b9 100042 Apple iPhone 4.2.x 375ad9a0ddc4351536804f1d5d0ea9b9
Para sair do Beeline, digite o seguinte comando:
!q
Carregar arquivo (PowerShell)
O PowerShell também pode ser usado para executar remotamente consultas do Hive. Verifique se o diretório de trabalho está onde hiveudf.py
está localizado. Use o seguinte script do PowerShell para executar uma consulta do Hive que usa o hiveudf.py
script:
# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
Connect-AzAccount
}
# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"
# Revise file path as needed
$pathToStreamingFile = ".\hiveudf.py"
# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$clusterInfo = Get-AzHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageAccountName=$clusterInfo.DefaultStorageAccount.split('.')[0]
$container=$clusterInfo.DefaultStorageContainer
$storageAccountKey=(Get-AzStorageAccountKey `
-ResourceGroupName $resourceGroup `
-Name $storageAccountName)[0].Value
# Create an Azure Storage context
$context = New-AzStorageContext `
-StorageAccountName $storageAccountName `
-StorageAccountKey $storageAccountKey
# Upload local files to an Azure Storage blob
Set-AzStorageBlobContent `
-File $pathToStreamingFile `
-Blob "hiveudf.py" `
-Container $container `
-Context $context
Nota
Para obter mais informações sobre como carregar arquivos, consulte o documento Carregar dados para trabalhos do Apache Hadoop no HDInsight .
Usar UDF do Hive
# Script should stop on failures
$ErrorActionPreference = "Stop"
# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
Connect-AzAccount
}
# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"
# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -UserName "admin" -Message "Enter the login for the cluster"
$HiveQuery = "add file wasbs:///hiveudf.py;" +
"SELECT TRANSFORM (clientid, devicemake, devicemodel) " +
"USING 'python hiveudf.py' AS " +
"(clientid string, phoneLabel string, phoneHash string) " +
"FROM hivesampletable " +
"ORDER BY clientid LIMIT 50;"
# Create Hive job object
$jobDefinition = New-AzHDInsightHiveJobDefinition `
-Query $HiveQuery
# For status bar updates
$activity="Hive query"
# Progress bar (optional)
Write-Progress -Activity $activity -Status "Starting query..."
# Start defined Azure HDInsight job on specified cluster.
$job = Start-AzHDInsightJob `
-ClusterName $clusterName `
-JobDefinition $jobDefinition `
-HttpCredential $creds
# Progress bar (optional)
Write-Progress -Activity $activity -Status "Waiting on query to complete..."
# Wait for completion or failure of specified job
Wait-AzHDInsightJob `
-JobId $job.JobId `
-ClusterName $clusterName `
-HttpCredential $creds
# Uncomment the following to see stderr output
<#
Get-AzHDInsightJobOutput `
-Clustername $clusterName `
-JobId $job.JobId `
-HttpCredential $creds `
-DisplayOutputType StandardError
#>
# Progress bar (optional)
Write-Progress -Activity $activity -Status "Retrieving output..."
# Gets the log output
Get-AzHDInsightJobOutput `
-Clustername $clusterName `
-JobId $job.JobId `
-HttpCredential $creds
A saída para o trabalho do Hive deve ser semelhante ao exemplo a seguir:
100041 RIM 9650 d476f3687700442549a83fac4560c51c
100041 RIM 9650 d476f3687700442549a83fac4560c51c
100042 Apple iPhone 4.2.x 375ad9a0ddc4351536804f1d5d0ea9b9
100042 Apple iPhone 4.2.x 375ad9a0ddc4351536804f1d5d0ea9b9
100042 Apple iPhone 4.2.x 375ad9a0ddc4351536804f1d5d0ea9b9
Apache Porco UDF
Um script Python pode ser usado como um UDF do Pig através da GENERATE
instrução. Você pode executar o script usando Jython ou C Python.
- Jython é executado na JVM e pode ser chamado nativamente a partir do Pig.
- C Python é um processo externo, portanto, os dados do Pig na JVM são enviados para o script em execução em um processo Python. A saída do script Python é enviada de volta para o Pig.
Para especificar o interpretador Python, use register
ao fazer referência ao script Python. Os exemplos a seguir registram scripts com o Pig como myfuncs
:
- Para usar Jython:
register '/path/to/pigudf.py' using jython as myfuncs;
- Para usar C Python:
register '/path/to/pigudf.py' using streaming_python as myfuncs;
Importante
Ao usar o Jython, o caminho para o arquivo pig_jython pode ser um caminho local ou um caminho WASBS://. No entanto, ao usar C Python, você deve fazer referência a um arquivo no sistema de arquivos local do nó que você está usando para enviar o trabalho do Pig.
Uma vez passado o registro, o Pig Latin para este exemplo é o mesmo para ambos:
LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);
LOG = FILTER LOGS by LINE is not null;
DETAILS = FOREACH LOG GENERATE myfuncs.create_structure(LINE);
DUMP DETAILS;
Veja o que este exemplo faz:
- A primeira linha carrega o arquivo
sample.log
de dados de exemplo emLOGS
. Também define cada registo como umchararray
ficheiro . - A próxima linha filtra quaisquer valores nulos, armazenando o resultado da operação em
LOG
. - Em seguida, ele itera sobre os registros e
LOG
usaGENERATE
para invocar ocreate_structure
método contido no script Python/Jython carregado comomyfuncs
.LINE
é usado para passar o registro atual para a função. - Finalmente, as saídas são despejadas para STDOUT usando o
DUMP
comando. Este comando exibe os resultados após a conclusão da operação.
Criar ficheiro
Em seu ambiente de desenvolvimento, crie um arquivo de texto chamado pigudf.py
. Use o seguinte código como o conteúdo do arquivo:
# Uncomment the following if using C Python
#from pig_util import outputSchema
@outputSchema("log: {(date:chararray, time:chararray, classname:chararray, level:chararray, detail:chararray)}")
def create_structure(input):
if (input.startswith('java.lang.Exception')):
input = input[21:len(input)] + ' - java.lang.Exception'
date, time, classname, level, detail = input.split(' ', 4)
return date, time, classname, level, detail
No exemplo do Pig Latin, a LINE
entrada é definida como um chararray porque não há um esquema consistente para a entrada. O script Python transforma os dados em um esquema consistente para saída.
A
@outputSchema
instrução define o formato dos dados que são retornados ao Pig. Neste caso, é um saco de dados, que é um tipo de dados Pig. O saco contém os seguintes campos, todos os quais são chararray (strings):- data - a data em que a entrada de log foi criada
- time - a hora em que a entrada de log foi criada
- classname - o nome da classe para a qual a entrada foi criada
- nível - o nível de log
- detalhe - detalhes detalhados para a entrada de log
Em seguida, o define a função para a qual o
def create_structure(input)
Pig passa itens de linha.Os dados de exemplo,
sample.log
, geralmente estão em conformidade com a data, hora, nome da classe, nível e esquema de detalhes. No entanto, ele contém algumas linhas que começam com*java.lang.Exception*
. Essas linhas devem ser modificadas para corresponder ao esquema. Aif
instrução verifica esses dados e, em seguida, massageia os dados de entrada para mover a*java.lang.Exception*
cadeia de caracteres até o final, alinhando os dados com o esquema de saída esperado.Em seguida, o
split
comando é usado para dividir os dados nos primeiros quatro caracteres de espaço. A saída é atribuída emdate
,time
,classname
,level
, edetail
.Finalmente, os valores são devolvidos ao Pig.
Quando os dados são retornados ao Pig, eles têm um esquema consistente, conforme definido na @outputSchema
instrução.
Carregar arquivo (shell)
Nos comandos abaixo, substitua sshuser
pelo nome de usuário real, se diferente. Substitua mycluster
pelo nome real do cluster. Verifique se o diretório de trabalho está onde o arquivo está localizado.
Use
scp
para copiar os arquivos para o cluster HDInsight. Edite e digite o comando:scp pigudf.py sshuser@mycluster-ssh.azurehdinsight.net:
Use SSH para se conectar ao cluster. Edite e digite o comando:
ssh sshuser@mycluster-ssh.azurehdinsight.net
Na sessão SSH, adicione os arquivos Python carregados anteriormente ao armazenamento do cluster.
hdfs dfs -put pigudf.py /pigudf.py
Use Pig UDF (shell)
Para se conectar ao pig, use o seguinte comando da sua sessão SSH aberta:
pig
Insira as seguintes instruções no
grunt>
prompt:Register wasbs:///pigudf.py using jython as myfuncs; LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray); LOG = FILTER LOGS by LINE is not null; DETAILS = foreach LOG generate myfuncs.create_structure(LINE); DUMP DETAILS;
Depois de inserir a seguinte linha, o trabalho deve ser iniciado. Quando o trabalho for concluído, ele retornará uma saída semelhante aos seguintes dados:
((2012-02-03,20:11:56,SampleClass5,[TRACE],verbose detail for id 990982084)) ((2012-02-03,20:11:56,SampleClass7,[TRACE],verbose detail for id 1560323914)) ((2012-02-03,20:11:56,SampleClass8,[DEBUG],detail for id 2083681507)) ((2012-02-03,20:11:56,SampleClass3,[TRACE],verbose detail for id 1718828806)) ((2012-02-03,20:11:56,SampleClass3,[INFO],everything normal for id 530537821))
Use
quit
para sair do shell Grunt e, em seguida, use o seguinte para editar o arquivo pigudf.py no sistema de arquivos local:nano pigudf.py
Uma vez no editor, descomente a seguinte linha removendo o
#
caractere do início da linha:#from pig_util import outputSchema
Esta linha modifica o script Python para trabalhar com C Python em vez de Jython. Depois que a alteração for feita, use Ctrl+X para sair do editor. Selecione Y e, em seguida , Enter para salvar as alterações.
Use o
pig
comando para iniciar o shell novamente. Quando estiver nogrunt>
prompt, use o seguinte para executar o script Python usando o interpretador C Python.Register 'pigudf.py' using streaming_python as myfuncs; LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray); LOG = FILTER LOGS by LINE is not null; DETAILS = foreach LOG generate myfuncs.create_structure(LINE); DUMP DETAILS;
Quando esse trabalho for concluído, você verá a mesma saída de quando executou o script anteriormente usando o Jython.
Carregar arquivo (PowerShell)
O PowerShell também pode ser usado para executar remotamente consultas do Hive. Verifique se o diretório de trabalho está onde pigudf.py
está localizado. Use o seguinte script do PowerShell para executar uma consulta do Hive que usa o pigudf.py
script:
# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
Connect-AzAccount
}
# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"
# Revise file path as needed
$pathToJythonFile = ".\pigudf.py"
# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$clusterInfo = Get-AzHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageAccountName=$clusterInfo.DefaultStorageAccount.split('.')[0]
$container=$clusterInfo.DefaultStorageContainer
$storageAccountKey=(Get-AzStorageAccountKey `
-ResourceGroupName $resourceGroup `
-Name $storageAccountName)[0].Value
# Create an Azure Storage context
$context = New-AzStorageContext `
-StorageAccountName $storageAccountName `
-StorageAccountKey $storageAccountKey
# Upload local files to an Azure Storage blob
Set-AzStorageBlobContent `
-File $pathToJythonFile `
-Blob "pigudf.py" `
-Container $container `
-Context $context
Usar UDF do Pig (PowerShell)
Nota
Ao enviar remotamente um trabalho usando o PowerShell, não é possível usar o C Python como interpretador.
O PowerShell também pode ser usado para executar trabalhos do Pig Latin. Para executar um trabalho Pig Latin que usa o pigudf.py
script, use o seguinte script do PowerShell:
# Script should stop on failures
$ErrorActionPreference = "Stop"
# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
Connect-AzAccount
}
# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -UserName "admin" -Message "Enter the login for the cluster"
$PigQuery = "Register wasbs:///pigudf.py using jython as myfuncs;" +
"LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);" +
"LOG = FILTER LOGS by LINE is not null;" +
"DETAILS = foreach LOG generate myfuncs.create_structure(LINE);" +
"DUMP DETAILS;"
# Create Pig job object
$jobDefinition = New-AzHDInsightPigJobDefinition -Query $PigQuery
# For status bar updates
$activity="Pig job"
# Progress bar (optional)
Write-Progress -Activity $activity -Status "Starting job..."
# Start defined Azure HDInsight job on specified cluster.
$job = Start-AzHDInsightJob `
-ClusterName $clusterName `
-JobDefinition $jobDefinition `
-HttpCredential $creds
# Progress bar (optional)
Write-Progress -Activity $activity -Status "Waiting for the Pig job to complete..."
# Wait for completion or failure of specified job
Wait-AzHDInsightJob `
-Job $job.JobId `
-ClusterName $clusterName `
-HttpCredential $creds
# Uncomment the following to see stderr output
<#
Get-AzHDInsightJobOutput `
-Clustername $clusterName `
-JobId $job.JobId `
-HttpCredential $creds `
-DisplayOutputType StandardError
#>
# Progress bar (optional)
Write-Progress -Activity $activity "Retrieving output..."
# Gets the log output
Get-AzHDInsightJobOutput `
-Clustername $clusterName `
-JobId $job.JobId `
-HttpCredential $creds
A saída para o trabalho Pig deve ser semelhante aos seguintes dados:
((2012-02-03,20:11:56,SampleClass5,[TRACE],verbose detail for id 990982084))
((2012-02-03,20:11:56,SampleClass7,[TRACE],verbose detail for id 1560323914))
((2012-02-03,20:11:56,SampleClass8,[DEBUG],detail for id 2083681507))
((2012-02-03,20:11:56,SampleClass3,[TRACE],verbose detail for id 1718828806))
((2012-02-03,20:11:56,SampleClass3,[INFO],everything normal for id 530537821))
Resolução de Problemas
Erros ao executar trabalhos
Ao executar o trabalho de hive, você pode encontrar um erro semelhante ao seguinte texto:
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: [Error 20001]: An error occurred while reading or writing to your custom script. It may have crashed with an error.
Esse problema pode ser causado pelas terminações de linha no arquivo Python. Muitos editores do Windows usam CRLF como terminação de linha, mas os aplicativos Linux geralmente esperam LF.
Você pode usar as seguintes instruções do PowerShell para remover os caracteres CR antes de carregar o arquivo no HDInsight:
Write-Progress -Activity $activity -Status "Waiting for the Pig job to complete..."
# Wait for completion or failure of specified job
Scripts do PowerShell
Ambos os scripts PowerShell de exemplo usados para executar os exemplos contêm uma linha comentada que exibe a saída de erro para o trabalho. Se você não estiver vendo a saída esperada para o trabalho, descomente a linha a seguir e veja se as informações de erro indicam um problema.
$activity="Pig job"
# Progress bar (optional)
Write-Progress -Activity $activity -Status "Starting job..."
As informações de erro (STDERR) e o resultado do trabalho (STDOUT) também são registrados no armazenamento do HDInsight.
Para este trabalho... | Observe esses arquivos no contêiner de blob |
---|---|
Ramo de registo | /HivePython/stderr /HivePython/stdout |
Pig | /PigPython/stderr /PigPython/stdout |
Próximos passos
Se você precisar carregar módulos Python que não são fornecidos por padrão, consulte Como implantar um módulo no Azure HDInsight.
Para obter outras maneiras de usar o Pig, Hive e aprender sobre como usar o MapReduce, consulte os seguintes documentos: