Partilhar via


Usar funções definidas pelo usuário (UDF) do Python com o Apache Hive e o Apache Pig no HDInsight

Saiba como usar funções definidas pelo usuário (UDF) Python com o Apache Hive e o Apache Pig no Apache Hadoop no Azure HDInsight.

Python no HDInsight

Python2.7 é instalado por padrão no HDInsight 3.0 e posterior. O Apache Hive pode ser usado com esta versão do Python para processamento de fluxo. O processamento de fluxo usa STDOUT e STDIN para passar dados entre o Hive e o UDF.

O HDInsight também inclui o Jython, que é uma implementação Python escrita em Java. O Jython é executado diretamente na Java Virtual Machine e não usa streaming. Jython é o interpretador Python recomendado ao usar Python com Pig.

Pré-requisitos

  • Um cluster Hadoop no HDInsight. Consulte Introdução ao HDInsight no Linux.
  • Um cliente SSH. Para obter mais informações, veja Ligar ao HDInsight (Apache Hadoop) através de SSH.
  • O esquema de URI para o armazenamento primário de clusters. Isso seria wasb:// para o Armazenamento do Azure, abfs:// para o Azure Data Lake Storage Gen2 ou adl:// para o Azure Data Lake Storage Gen1. Se a transferência segura estiver habilitada para o Armazenamento do Azure, o URI será wasbs://. Consulte também, transferência segura.
  • Possível alteração na configuração de armazenamento. Consulte Configuração de armazenamento se estiver usando o tipo BlobStoragede conta de armazenamento .
  • Opcional. Se estiver planejando usar o PowerShell, você precisará do módulo AZ instalado.

Nota

A conta de armazenamento usada neste artigo foi o Armazenamento do Azure com transferência segura habilitada e, portanto, wasbs é usada em todo o artigo.

Configuração do armazenamento

Nenhuma ação será necessária se a conta de armazenamento usada for do tipo Storage (general purpose v1) ou StorageV2 (general purpose v2). O processo neste artigo produz saída para pelo menos /tezstaging. Uma configuração hadoop padrão contém /tezstaging na fs.azure.page.blob.dir variável de configuração em core-site.xml for service HDFS. Essa configuração faz com que a saída para o diretório seja blobs de página, que não são suportados para o tipo BlobStoragede conta de armazenamento. Para usar BlobStorage neste artigo, remova /tezstaging da fs.azure.page.blob.dir variável de configuração. A configuração pode ser acessada a partir da interface do usuário do Ambari. Caso contrário, receberá a mensagem de erro: Page blob is not supported for this account type.

Aviso

As etapas deste documento partem das seguintes suposições:

  • Você cria os scripts Python em seu ambiente de desenvolvimento local.
  • Carregue os scripts para o HDInsight usando o scp comando ou o script do PowerShell fornecido.

Se você quiser usar o Azure Cloud Shell (bash) para trabalhar com o HDInsight, deve:

  • Crie os scripts dentro do ambiente de shell de nuvem.
  • Use scp para carregar os arquivos do shell da nuvem para o HDInsight.
  • Use ssh a partir do shell da nuvem para se conectar ao HDInsight e executar os exemplos.

Apache Hive UDF

Python pode ser usado como um UDF do Hive através da instrução HiveQL TRANSFORM . Por exemplo, o HiveQL a seguir invoca o hiveudf.py arquivo armazenado na conta padrão do Armazenamento do Azure para o cluster.

add file wasbs:///hiveudf.py;

SELECT TRANSFORM (clientid, devicemake, devicemodel)
    USING 'python hiveudf.py' AS
    (clientid string, phoneLabel string, phoneHash string)
FROM hivesampletable
ORDER BY clientid LIMIT 50;

Veja o que este exemplo faz:

  1. A add file instrução no início do arquivo adiciona o hiveudf.py arquivo ao cache distribuído, para que seja acessível por todos os nós no cluster.
  2. A SELECT TRANSFORM ... USING instrução seleciona dados do hivesampletable. Ele também passa os valores clientid, devicemake e devicemodel para o hiveudf.py script.
  3. A AS cláusula descreve os campos retornados de hiveudf.py.

Criar ficheiro

Em seu ambiente de desenvolvimento, crie um arquivo de texto chamado hiveudf.py. Use o seguinte código como o conteúdo do arquivo:

#!/usr/bin/env python
import sys
import string
import hashlib

while True:
    line = sys.stdin.readline()
    if not line:
        break

    line = string.strip(line, "\n ")
    clientid, devicemake, devicemodel = string.split(line, "\t")
    phone_label = devicemake + ' ' + devicemodel
    print "\t".join([clientid, phone_label, hashlib.md5(phone_label).hexdigest()])

Este script executa as seguintes ações:

  1. Lê uma linha de dados de STDIN.
  2. O caractere de nova linha à direita é removido usando string.strip(line, "\n ").
  3. Ao fazer o processamento de fluxo, uma única linha contém todos os valores com um caractere de tabulação entre cada valor. Assim, string.split(line, "\t") pode ser usado para dividir a entrada em cada guia, retornando apenas os campos.
  4. Quando o processamento estiver concluído, a saída deve ser gravada no STDOUT como uma única linha, com uma guia entre cada campo. Por exemplo, print "\t".join([clientid, phone_label, hashlib.md5(phone_label).hexdigest()]).
  5. O while loop se repete até que não line seja lido.

A saída do script é uma concatenação dos valores de entrada de devicemake e devicemodel, bem como um hash do valor concatenado.

Carregar arquivo (shell)

O comando a seguir substitui sshuser pelo nome de usuário real, se diferente. Substitua mycluster pelo nome real do cluster. Verifique se o diretório de trabalho está onde o arquivo está localizado.

  1. Use scp para copiar os arquivos para o cluster HDInsight. Edite e digite o comando:

    scp hiveudf.py sshuser@mycluster-ssh.azurehdinsight.net:
    
  2. Use SSH para se conectar ao cluster. Edite e digite o comando:

    ssh sshuser@mycluster-ssh.azurehdinsight.net
    
  3. Na sessão SSH, adicione os arquivos Python carregados anteriormente ao armazenamento do cluster.

    hdfs dfs -put hiveudf.py /hiveudf.py
    

Usar Hive UDF (shell)

  1. Para se conectar ao Hive, use o seguinte comando da sua sessão SSH aberta:

    beeline -u 'jdbc:hive2://headnodehost:10001/;transportMode=http'
    

    Este comando inicia o cliente Beeline.

  2. Digite a seguinte consulta no 0: jdbc:hive2://headnodehost:10001/> prompt:

    add file wasbs:///hiveudf.py;
    SELECT TRANSFORM (clientid, devicemake, devicemodel)
        USING 'python hiveudf.py' AS
        (clientid string, phoneLabel string, phoneHash string)
    FROM hivesampletable
    ORDER BY clientid LIMIT 50;
    
  3. Assim que a última linha for inserida, o trabalho deve ser iniciado. Quando o trabalho for concluído, ele retornará uma saída semelhante ao exemplo a seguir:

    100041    RIM 9650    d476f3687700442549a83fac4560c51c
    100041    RIM 9650    d476f3687700442549a83fac4560c51c
    100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
    100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
    100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
    
  4. Para sair do Beeline, digite o seguinte comando:

    !q
    

Carregar arquivo (PowerShell)

O PowerShell também pode ser usado para executar remotamente consultas do Hive. Verifique se o diretório de trabalho está onde hiveudf.py está localizado. Use o seguinte script do PowerShell para executar uma consulta do Hive que usa o hiveudf.py script:

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Connect-AzAccount
}

# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"

# Revise file path as needed
$pathToStreamingFile = ".\hiveudf.py"

# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$clusterInfo = Get-AzHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageAccountName=$clusterInfo.DefaultStorageAccount.split('.')[0]
$container=$clusterInfo.DefaultStorageContainer
$storageAccountKey=(Get-AzStorageAccountKey `
   -ResourceGroupName $resourceGroup `
   -Name $storageAccountName)[0].Value

# Create an Azure Storage context
$context = New-AzStorageContext `
    -StorageAccountName $storageAccountName `
    -StorageAccountKey $storageAccountKey

# Upload local files to an Azure Storage blob
Set-AzStorageBlobContent `
    -File $pathToStreamingFile `
    -Blob "hiveudf.py" `
    -Container $container `
    -Context $context

Nota

Para obter mais informações sobre como carregar arquivos, consulte o documento Carregar dados para trabalhos do Apache Hadoop no HDInsight .

Usar UDF do Hive

# Script should stop on failures
$ErrorActionPreference = "Stop"

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Connect-AzAccount
}

# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"

# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -UserName "admin" -Message "Enter the login for the cluster"

$HiveQuery = "add file wasbs:///hiveudf.py;" +
                "SELECT TRANSFORM (clientid, devicemake, devicemodel) " +
                "USING 'python hiveudf.py' AS " +
                "(clientid string, phoneLabel string, phoneHash string) " +
                "FROM hivesampletable " +
                "ORDER BY clientid LIMIT 50;"

# Create Hive job object
$jobDefinition = New-AzHDInsightHiveJobDefinition `
    -Query $HiveQuery

# For status bar updates
$activity="Hive query"

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Starting query..."

# Start defined Azure HDInsight job on specified cluster.
$job = Start-AzHDInsightJob `
    -ClusterName $clusterName `
    -JobDefinition $jobDefinition `
    -HttpCredential $creds

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Waiting on query to complete..."

# Wait for completion or failure of specified job
Wait-AzHDInsightJob `
    -JobId $job.JobId `
    -ClusterName $clusterName `
    -HttpCredential $creds

# Uncomment the following to see stderr output
<#
Get-AzHDInsightJobOutput `
   -Clustername $clusterName `
   -JobId $job.JobId `
   -HttpCredential $creds `
   -DisplayOutputType StandardError
#>

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Retrieving output..."

# Gets the log output
Get-AzHDInsightJobOutput `
    -Clustername $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds

A saída para o trabalho do Hive deve ser semelhante ao exemplo a seguir:

100041    RIM 9650    d476f3687700442549a83fac4560c51c
100041    RIM 9650    d476f3687700442549a83fac4560c51c
100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9

Apache Porco UDF

Um script Python pode ser usado como um UDF do Pig através da GENERATE instrução. Você pode executar o script usando Jython ou C Python.

  • Jython é executado na JVM e pode ser chamado nativamente a partir do Pig.
  • C Python é um processo externo, portanto, os dados do Pig na JVM são enviados para o script em execução em um processo Python. A saída do script Python é enviada de volta para o Pig.

Para especificar o interpretador Python, use register ao fazer referência ao script Python. Os exemplos a seguir registram scripts com o Pig como myfuncs:

  • Para usar Jython: register '/path/to/pigudf.py' using jython as myfuncs;
  • Para usar C Python: register '/path/to/pigudf.py' using streaming_python as myfuncs;

Importante

Ao usar o Jython, o caminho para o arquivo pig_jython pode ser um caminho local ou um caminho WASBS://. No entanto, ao usar C Python, você deve fazer referência a um arquivo no sistema de arquivos local do nó que você está usando para enviar o trabalho do Pig.

Uma vez passado o registro, o Pig Latin para este exemplo é o mesmo para ambos:

LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);
LOG = FILTER LOGS by LINE is not null;
DETAILS = FOREACH LOG GENERATE myfuncs.create_structure(LINE);
DUMP DETAILS;

Veja o que este exemplo faz:

  1. A primeira linha carrega o arquivo sample.log de dados de exemplo em LOGS. Também define cada registo como um chararrayficheiro .
  2. A próxima linha filtra quaisquer valores nulos, armazenando o resultado da operação em LOG.
  3. Em seguida, ele itera sobre os registros e LOG usa GENERATE para invocar o create_structure método contido no script Python/Jython carregado como myfuncs. LINE é usado para passar o registro atual para a função.
  4. Finalmente, as saídas são despejadas para STDOUT usando o DUMP comando. Este comando exibe os resultados após a conclusão da operação.

Criar ficheiro

Em seu ambiente de desenvolvimento, crie um arquivo de texto chamado pigudf.py. Use o seguinte código como o conteúdo do arquivo:

# Uncomment the following if using C Python
#from pig_util import outputSchema


@outputSchema("log: {(date:chararray, time:chararray, classname:chararray, level:chararray, detail:chararray)}")
def create_structure(input):
    if (input.startswith('java.lang.Exception')):
        input = input[21:len(input)] + ' - java.lang.Exception'
    date, time, classname, level, detail = input.split(' ', 4)
    return date, time, classname, level, detail

No exemplo do Pig Latin, a LINE entrada é definida como um chararray porque não há um esquema consistente para a entrada. O script Python transforma os dados em um esquema consistente para saída.

  1. A @outputSchema instrução define o formato dos dados que são retornados ao Pig. Neste caso, é um saco de dados, que é um tipo de dados Pig. O saco contém os seguintes campos, todos os quais são chararray (strings):

    • data - a data em que a entrada de log foi criada
    • time - a hora em que a entrada de log foi criada
    • classname - o nome da classe para a qual a entrada foi criada
    • nível - o nível de log
    • detalhe - detalhes detalhados para a entrada de log
  2. Em seguida, o define a função para a qual o def create_structure(input) Pig passa itens de linha.

  3. Os dados de exemplo, sample.log, geralmente estão em conformidade com a data, hora, nome da classe, nível e esquema de detalhes. No entanto, ele contém algumas linhas que começam com *java.lang.Exception*. Essas linhas devem ser modificadas para corresponder ao esquema. A if instrução verifica esses dados e, em seguida, massageia os dados de entrada para mover a *java.lang.Exception* cadeia de caracteres até o final, alinhando os dados com o esquema de saída esperado.

  4. Em seguida, o split comando é usado para dividir os dados nos primeiros quatro caracteres de espaço. A saída é atribuída em date, time, classname, level, e detail.

  5. Finalmente, os valores são devolvidos ao Pig.

Quando os dados são retornados ao Pig, eles têm um esquema consistente, conforme definido na @outputSchema instrução.

Carregar arquivo (shell)

Nos comandos abaixo, substitua sshuser pelo nome de usuário real, se diferente. Substitua mycluster pelo nome real do cluster. Verifique se o diretório de trabalho está onde o arquivo está localizado.

  1. Use scp para copiar os arquivos para o cluster HDInsight. Edite e digite o comando:

    scp pigudf.py sshuser@mycluster-ssh.azurehdinsight.net:
    
  2. Use SSH para se conectar ao cluster. Edite e digite o comando:

    ssh sshuser@mycluster-ssh.azurehdinsight.net
    
  3. Na sessão SSH, adicione os arquivos Python carregados anteriormente ao armazenamento do cluster.

    hdfs dfs -put pigudf.py /pigudf.py
    

Use Pig UDF (shell)

  1. Para se conectar ao pig, use o seguinte comando da sua sessão SSH aberta:

    pig
    
  2. Insira as seguintes instruções no grunt> prompt:

    Register wasbs:///pigudf.py using jython as myfuncs;
    LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);
    LOG = FILTER LOGS by LINE is not null;
    DETAILS = foreach LOG generate myfuncs.create_structure(LINE);
    DUMP DETAILS;
    
  3. Depois de inserir a seguinte linha, o trabalho deve ser iniciado. Quando o trabalho for concluído, ele retornará uma saída semelhante aos seguintes dados:

    ((2012-02-03,20:11:56,SampleClass5,[TRACE],verbose detail for id 990982084))
    ((2012-02-03,20:11:56,SampleClass7,[TRACE],verbose detail for id 1560323914))
    ((2012-02-03,20:11:56,SampleClass8,[DEBUG],detail for id 2083681507))
    ((2012-02-03,20:11:56,SampleClass3,[TRACE],verbose detail for id 1718828806))
    ((2012-02-03,20:11:56,SampleClass3,[INFO],everything normal for id 530537821))
    
  4. Use quit para sair do shell Grunt e, em seguida, use o seguinte para editar o arquivo pigudf.py no sistema de arquivos local:

    nano pigudf.py
    
  5. Uma vez no editor, descomente a seguinte linha removendo o # caractere do início da linha:

    #from pig_util import outputSchema
    

    Esta linha modifica o script Python para trabalhar com C Python em vez de Jython. Depois que a alteração for feita, use Ctrl+X para sair do editor. Selecione Y e, em seguida , Enter para salvar as alterações.

  6. Use o pig comando para iniciar o shell novamente. Quando estiver no grunt> prompt, use o seguinte para executar o script Python usando o interpretador C Python.

    Register 'pigudf.py' using streaming_python as myfuncs;
    LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);
    LOG = FILTER LOGS by LINE is not null;
    DETAILS = foreach LOG generate myfuncs.create_structure(LINE);
    DUMP DETAILS;
    

    Quando esse trabalho for concluído, você verá a mesma saída de quando executou o script anteriormente usando o Jython.

Carregar arquivo (PowerShell)

O PowerShell também pode ser usado para executar remotamente consultas do Hive. Verifique se o diretório de trabalho está onde pigudf.py está localizado. Use o seguinte script do PowerShell para executar uma consulta do Hive que usa o pigudf.py script:

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Connect-AzAccount
}

# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"

# Revise file path as needed
$pathToJythonFile = ".\pigudf.py"


# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$clusterInfo = Get-AzHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageAccountName=$clusterInfo.DefaultStorageAccount.split('.')[0]
$container=$clusterInfo.DefaultStorageContainer
$storageAccountKey=(Get-AzStorageAccountKey `
   -ResourceGroupName $resourceGroup `
   -Name $storageAccountName)[0].Value

# Create an Azure Storage context
$context = New-AzStorageContext `
    -StorageAccountName $storageAccountName `
    -StorageAccountKey $storageAccountKey

# Upload local files to an Azure Storage blob
Set-AzStorageBlobContent `
    -File $pathToJythonFile `
    -Blob "pigudf.py" `
    -Container $container `
    -Context $context

Usar UDF do Pig (PowerShell)

Nota

Ao enviar remotamente um trabalho usando o PowerShell, não é possível usar o C Python como interpretador.

O PowerShell também pode ser usado para executar trabalhos do Pig Latin. Para executar um trabalho Pig Latin que usa o pigudf.py script, use o seguinte script do PowerShell:

# Script should stop on failures
$ErrorActionPreference = "Stop"

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Connect-AzAccount
}

# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -UserName "admin" -Message "Enter the login for the cluster"


$PigQuery = "Register wasbs:///pigudf.py using jython as myfuncs;" +
            "LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);" +
            "LOG = FILTER LOGS by LINE is not null;" +
            "DETAILS = foreach LOG generate myfuncs.create_structure(LINE);" +
            "DUMP DETAILS;"

# Create Pig job object
$jobDefinition = New-AzHDInsightPigJobDefinition -Query $PigQuery

# For status bar updates
$activity="Pig job"

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Starting job..."

# Start defined Azure HDInsight job on specified cluster.
$job = Start-AzHDInsightJob `
    -ClusterName $clusterName `
    -JobDefinition $jobDefinition `
    -HttpCredential $creds

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Waiting for the Pig job to complete..."

# Wait for completion or failure of specified job
Wait-AzHDInsightJob `
    -Job $job.JobId `
    -ClusterName $clusterName `
    -HttpCredential $creds

# Uncomment the following to see stderr output
<#
Get-AzHDInsightJobOutput `
    -Clustername $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds `
    -DisplayOutputType StandardError
#>

# Progress bar (optional)
Write-Progress -Activity $activity "Retrieving output..."

# Gets the log output
Get-AzHDInsightJobOutput `
    -Clustername $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds

A saída para o trabalho Pig deve ser semelhante aos seguintes dados:

((2012-02-03,20:11:56,SampleClass5,[TRACE],verbose detail for id 990982084))
((2012-02-03,20:11:56,SampleClass7,[TRACE],verbose detail for id 1560323914))
((2012-02-03,20:11:56,SampleClass8,[DEBUG],detail for id 2083681507))
((2012-02-03,20:11:56,SampleClass3,[TRACE],verbose detail for id 1718828806))
((2012-02-03,20:11:56,SampleClass3,[INFO],everything normal for id 530537821))

Resolução de Problemas

Erros ao executar trabalhos

Ao executar o trabalho de hive, você pode encontrar um erro semelhante ao seguinte texto:

Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: [Error 20001]: An error occurred while reading or writing to your custom script. It may have crashed with an error.

Esse problema pode ser causado pelas terminações de linha no arquivo Python. Muitos editores do Windows usam CRLF como terminação de linha, mas os aplicativos Linux geralmente esperam LF.

Você pode usar as seguintes instruções do PowerShell para remover os caracteres CR antes de carregar o arquivo no HDInsight:

Write-Progress -Activity $activity -Status "Waiting for the Pig job to complete..."

# Wait for completion or failure of specified job

Scripts do PowerShell

Ambos os scripts PowerShell de exemplo usados para executar os exemplos contêm uma linha comentada que exibe a saída de erro para o trabalho. Se você não estiver vendo a saída esperada para o trabalho, descomente a linha a seguir e veja se as informações de erro indicam um problema.

$activity="Pig job"

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Starting job..."

As informações de erro (STDERR) e o resultado do trabalho (STDOUT) também são registrados no armazenamento do HDInsight.

Para este trabalho... Observe esses arquivos no contêiner de blob
Ramo de registo /HivePython/stderr

/HivePython/stdout

Pig /PigPython/stderr

/PigPython/stdout

Próximos passos

Se você precisar carregar módulos Python que não são fornecidos por padrão, consulte Como implantar um módulo no Azure HDInsight.

Para obter outras maneiras de usar o Pig, Hive e aprender sobre como usar o MapReduce, consulte os seguintes documentos: