Sdílet prostřednictvím


Použití uživatelem definovaných funkcí Pythonu (UDF) s Apache Hivem a Apache Pigem ve službě HDInsight

Naučte se používat uživatelem definované funkce Pythonu (UDF) s Apache Hivem a Apache Pigem v Apache Hadoopu ve službě Azure HDInsight.

Python ve službě HDInsight

Python2.7 je ve výchozím nastavení nainstalovaný ve službě HDInsight 3.0 a novějším. Apache Hive se dá použít s touto verzí Pythonu ke zpracování datových proudů. Zpracování datových proudů používá k předávání dat mezi Hivem a uživatelem definovaným uživatelem stDOUT a STDIN.

HDInsight také obsahuje Jython, což je implementace Pythonu napsaná v Javě. Jython běží přímo na virtuálním počítači Java a nepoužívá streamování. Jython je doporučený interpret Pythonu při použití Pythonu s Pigem.

Požadavky

Poznámka:

Účet úložiště použitý v tomto článku byl Azure Storage s povoleným zabezpečeným přenosem , a proto wasbs se používá v celém článku.

Konfigurace úložiště

Není vyžadována žádná akce, pokud použitý účet úložiště je druh Storage (general purpose v1) nebo StorageV2 (general purpose v2). Proces v tomto článku vytvoří výstup alespoň /tezstaging. Výchozí konfigurace hadoopu obsahuje /tezstaging v fs.azure.page.blob.dir konfigurační proměnné pro core-site.xml službu HDFS. Tato konfigurace způsobí, že výstup adresáře bude objekty blob stránky, které nejsou podporované pro druh BlobStorageúčtu úložiště . Chcete-li použít BlobStorage pro tento článek, odeberte /tezstaging z fs.azure.page.blob.dir konfigurační proměnné. Ke konfiguraci je možné přistupovat z uživatelského rozhraní Ambari. V opačném případě se zobrazí chybová zpráva: Page blob is not supported for this account type.

Upozorňující

Kroky v tomto dokumentu předpokládají následující předpoklady:

  • Skripty Pythonu vytvoříte v místním vývojovém prostředí.
  • Skripty nahrajete do HDInsight pomocí scp příkazu nebo zadaného skriptu PowerShellu.

Pokud chcete použít Azure Cloud Shell (bash) pro práci se službou HDInsight, musíte:

  • Vytvořte skripty v prostředí Cloud Shell.
  • Slouží scp k nahrání souborů z Cloud Shellu do SLUŽBY HDInsight.
  • Pomocí ssh cloud shellu se připojte ke službě HDInsight a spusťte příklady.

Apache Hive UDF

Python se dá použít jako UDF z Hivu prostřednictvím příkazu HiveQL TRANSFORM . Například následující HiveQL vyvolá hiveudf.py soubor uložený ve výchozím účtu Azure Storage pro cluster.

add file wasbs:///hiveudf.py;

SELECT TRANSFORM (clientid, devicemake, devicemodel)
    USING 'python hiveudf.py' AS
    (clientid string, phoneLabel string, phoneHash string)
FROM hivesampletable
ORDER BY clientid LIMIT 50;

Tady je tento příklad:

  1. Příkaz add file na začátku souboru přidá hiveudf.py soubor do distribuované mezipaměti, takže je přístupný všemi uzly v clusteru.
  2. Příkaz SELECT TRANSFORM ... USING vybere data z objektu hivesampletable. Předává také hodnoty clientid, devicemake a devicemodel do hiveudf.py skriptu.
  3. Klauzule AS popisuje pole vrácená z hiveudf.py.

Vytvořit soubor

Ve vývojovém prostředí vytvořte textový soubor s názvem hiveudf.py. Jako obsah souboru použijte následující kód:

#!/usr/bin/env python
import sys
import string
import hashlib

while True:
    line = sys.stdin.readline()
    if not line:
        break

    line = string.strip(line, "\n ")
    clientid, devicemake, devicemodel = string.split(line, "\t")
    phone_label = devicemake + ' ' + devicemodel
    print "\t".join([clientid, phone_label, hashlib.md5(phone_label).hexdigest()])

Tento skript provádí následující akce:

  1. Načte řádek dat ze služby STDIN.
  2. Koncový znak nového řádku se odebere pomocí string.strip(line, "\n ")znaku .
  3. Při zpracování datových proudů obsahuje jeden řádek všechny hodnoty se znakem tabulátoru mezi jednotlivými hodnotami. Vstup je tedy string.split(line, "\t") možné použít k rozdělení vstupu na každé kartě a vrácení pouze polí.
  4. Po dokončení zpracování musí být výstup zapsán do výstupu STDOUT jako jeden řádek s tabulátorem mezi jednotlivými poli. Například print "\t".join([clientid, phone_label, hashlib.md5(phone_label).hexdigest()]).
  5. Smyčka while se opakuje, dokud se nečte line .

Výstup skriptu je zřetězení vstupních hodnot a devicemake devicemodelhodnota hash zřetězené hodnoty.

Nahrání souboru (prostředí)

Následující příkaz nahradí sshuser skutečné uživatelské jméno, pokud se liší. Nahraďte mycluster skutečným názvem clusteru. Ujistěte se, že je v pracovním adresáři umístění souboru.

  1. Slouží scp ke kopírování souborů do clusteru HDInsight. Upravte a zadejte příkaz:

    scp hiveudf.py sshuser@mycluster-ssh.azurehdinsight.net:
    
  2. Připojte se ke clusteru pomocí SSH. Upravte a zadejte příkaz:

    ssh sshuser@mycluster-ssh.azurehdinsight.net
    
  3. V relaci SSH přidejte soubory Pythonu nahrané dříve do úložiště clusteru.

    hdfs dfs -put hiveudf.py /hiveudf.py
    

Použití uživatelem definovaného uživatelem Hivu (shell)

  1. Pokud se chcete připojit k Hivu, použijte následující příkaz z otevřené relace SSH:

    beeline -u 'jdbc:hive2://headnodehost:10001/;transportMode=http'
    

    Tento příkaz spustí klienta Beeline.

  2. Na příkazovém 0: jdbc:hive2://headnodehost:10001/> řádku zadejte následující dotaz:

    add file wasbs:///hiveudf.py;
    SELECT TRANSFORM (clientid, devicemake, devicemodel)
        USING 'python hiveudf.py' AS
        (clientid string, phoneLabel string, phoneHash string)
    FROM hivesampletable
    ORDER BY clientid LIMIT 50;
    
  3. Po zadání posledního řádku by se měla úloha spustit. Po dokončení úlohy vrátí výstup podobný následujícímu příkladu:

    100041    RIM 9650    d476f3687700442549a83fac4560c51c
    100041    RIM 9650    d476f3687700442549a83fac4560c51c
    100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
    100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
    100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
    
  4. Pokud chcete beeline ukončit, zadejte následující příkaz:

    !q
    

Nahrání souboru (PowerShell)

PowerShell se dá použít také ke vzdálenému spouštění dotazů Hive. Ujistěte se, že hiveudf.py se nachází váš pracovní adresář. Pomocí následujícího skriptu PowerShellu spusťte dotaz Hive, který tento skript používá hiveudf.py :

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Connect-AzAccount
}

# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"

# Revise file path as needed
$pathToStreamingFile = ".\hiveudf.py"

# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$clusterInfo = Get-AzHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageAccountName=$clusterInfo.DefaultStorageAccount.split('.')[0]
$container=$clusterInfo.DefaultStorageContainer
$storageAccountKey=(Get-AzStorageAccountKey `
   -ResourceGroupName $resourceGroup `
   -Name $storageAccountName)[0].Value

# Create an Azure Storage context
$context = New-AzStorageContext `
    -StorageAccountName $storageAccountName `
    -StorageAccountKey $storageAccountKey

# Upload local files to an Azure Storage blob
Set-AzStorageBlobContent `
    -File $pathToStreamingFile `
    -Blob "hiveudf.py" `
    -Container $container `
    -Context $context

Poznámka:

Další informace o nahrávání souborů najdete v tématu Nahrání dat pro úlohy Apache Hadoop v dokumentu HDInsight .

Použití uživatelem definovaného uživatelem Hivu

# Script should stop on failures
$ErrorActionPreference = "Stop"

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Connect-AzAccount
}

# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"

# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -UserName "admin" -Message "Enter the login for the cluster"

$HiveQuery = "add file wasbs:///hiveudf.py;" +
                "SELECT TRANSFORM (clientid, devicemake, devicemodel) " +
                "USING 'python hiveudf.py' AS " +
                "(clientid string, phoneLabel string, phoneHash string) " +
                "FROM hivesampletable " +
                "ORDER BY clientid LIMIT 50;"

# Create Hive job object
$jobDefinition = New-AzHDInsightHiveJobDefinition `
    -Query $HiveQuery

# For status bar updates
$activity="Hive query"

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Starting query..."

# Start defined Azure HDInsight job on specified cluster.
$job = Start-AzHDInsightJob `
    -ClusterName $clusterName `
    -JobDefinition $jobDefinition `
    -HttpCredential $creds

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Waiting on query to complete..."

# Wait for completion or failure of specified job
Wait-AzHDInsightJob `
    -JobId $job.JobId `
    -ClusterName $clusterName `
    -HttpCredential $creds

# Uncomment the following to see stderr output
<#
Get-AzHDInsightJobOutput `
   -Clustername $clusterName `
   -JobId $job.JobId `
   -HttpCredential $creds `
   -DisplayOutputType StandardError
#>

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Retrieving output..."

# Gets the log output
Get-AzHDInsightJobOutput `
    -Clustername $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds

Výstup pro úlohu Hive by se měl podobat následujícímu příkladu:

100041    RIM 9650    d476f3687700442549a83fac4560c51c
100041    RIM 9650    d476f3687700442549a83fac4560c51c
100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9

Apache Pig UDF

Skript Pythonu se dá použít jako UDF z Pigu prostřednictvím příkazu GENERATE . Skript můžete spustit pomocí Jythonu nebo C Pythonu.

  • Jython běží na JVM a lze volat z Pig.
  • C Python je externí proces, takže data z Pigu na JVM se odesílají do skriptu spuštěného v procesu Pythonu. Výstup skriptu Pythonu se odešle zpět do Pigu.

Pokud chcete určit interpret Pythonu, použijte register ho při odkazování na skript Pythonu. Následující příklady registrují skripty ve službě Pig jako myfuncs:

  • Použití Jythonu: register '/path/to/pigudf.py' using jython as myfuncs;
  • Použití jazyka C Python: register '/path/to/pigudf.py' using streaming_python as myfuncs;

Důležité

Při použití Jythonu může být cesta k souboru pig_jython místní nebo WASBS:// cesta. Při použití jazyka C Python však musíte odkazovat na soubor v místním systému souborů uzlu, který používáte k odeslání úlohy Pig.

Po předchozí registraci je v tomto příkladu latinka pig stejná pro oba:

LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);
LOG = FILTER LOGS by LINE is not null;
DETAILS = FOREACH LOG GENERATE myfuncs.create_structure(LINE);
DUMP DETAILS;

Tady je tento příklad:

  1. První řádek načte ukázkový datový soubor sample.log do LOGSsouboru . Definuje také každý záznam jako chararrayzáznam .
  2. Další řádek vyfiltruje všechny hodnoty null a uloží výsledek operace do LOG.
  3. Dále iteruje přes záznamy a LOG používá GENERATE k vyvolání create_structure metody obsažené ve skriptu Python/Jython načtený jako myfuncs. LINE slouží k předání aktuálního záznamu do funkce.
  4. Výstupy se nakonec pomocí příkazu vypíše do výstupu DUMP STDOUT. Tento příkaz zobrazí výsledky po dokončení operace.

Vytvořit soubor

Ve vývojovém prostředí vytvořte textový soubor s názvem pigudf.py. Jako obsah souboru použijte následující kód:

# Uncomment the following if using C Python
#from pig_util import outputSchema


@outputSchema("log: {(date:chararray, time:chararray, classname:chararray, level:chararray, detail:chararray)}")
def create_structure(input):
    if (input.startswith('java.lang.Exception')):
        input = input[21:len(input)] + ' - java.lang.Exception'
    date, time, classname, level, detail = input.split(' ', 4)
    return date, time, classname, level, detail

V příkladu Pig Latin je vstup definován jako chararray, LINE protože pro vstup neexistuje konzistentní schéma. Skript Pythonu transformuje data na konzistentní schéma pro výstup.

  1. Příkaz @outputSchema definuje formát dat vrácených službě Pig. V tomto případě se jedná o datovou tašku, což je datový typ Pig. Taška obsahuje následující pole, z nichž všechny jsou chararray (řetězce):

    • date – datum vytvoření položky protokolu
    • time – čas vytvoření položky protokolu
    • classname – název třídy, pro kterou byla položka vytvořena
    • level – úroveň protokolu
    • detail – podrobné podrobnosti o položce protokolu
  2. Dále definuje funkci, def create_structure(input) do které Pig předává řádkové položky.

  3. Ukázková data, sample.logvětšinou odpovídají datu, času, názvu třídy, úrovni a schématu podrobností. Obsahuje však několik řádků, které začínají *java.lang.Exception*. Tyto řádky musí být upraveny tak, aby odpovídaly schématu. Tento if příkaz zkontroluje tyto údaje a pak provede masáž vstupních dat, aby se řetězec přesunul *java.lang.Exception* na konec, čímž se data zarovnají do řádku s očekávaným výstupním schématem.

  4. Dále se split příkaz použije k rozdělení dat na prvních čtyřech znamech mezer. Výstup je přiřazen do date, time, classname, level, a detail.

  5. Nakonec se hodnoty vrátí do Pigu.

Když se data vrátí do Pigu, mají konzistentní schéma definované v @outputSchema příkazu.

Nahrání souboru (prostředí)

Vnásledujícíchch sshuser Nahraďte mycluster skutečným názvem clusteru. Ujistěte se, že je v pracovním adresáři umístění souboru.

  1. Slouží scp ke kopírování souborů do clusteru HDInsight. Upravte a zadejte příkaz:

    scp pigudf.py sshuser@mycluster-ssh.azurehdinsight.net:
    
  2. Připojte se ke clusteru pomocí SSH. Upravte a zadejte příkaz:

    ssh sshuser@mycluster-ssh.azurehdinsight.net
    
  3. V relaci SSH přidejte soubory Pythonu nahrané dříve do úložiště clusteru.

    hdfs dfs -put pigudf.py /pigudf.py
    

Použití nástroje Pig UDF (shell)

  1. Pokud se chcete připojit k pigu, použijte následující příkaz z otevřené relace SSH:

    pig
    
  2. Na příkazovém grunt> řádku zadejte následující příkazy:

    Register wasbs:///pigudf.py using jython as myfuncs;
    LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);
    LOG = FILTER LOGS by LINE is not null;
    DETAILS = foreach LOG generate myfuncs.create_structure(LINE);
    DUMP DETAILS;
    
  3. Po zadání následujícího řádku by se měla úloha spustit. Po dokončení úlohy vrátí výstup podobný následujícím datům:

    ((2012-02-03,20:11:56,SampleClass5,[TRACE],verbose detail for id 990982084))
    ((2012-02-03,20:11:56,SampleClass7,[TRACE],verbose detail for id 1560323914))
    ((2012-02-03,20:11:56,SampleClass8,[DEBUG],detail for id 2083681507))
    ((2012-02-03,20:11:56,SampleClass3,[TRACE],verbose detail for id 1718828806))
    ((2012-02-03,20:11:56,SampleClass3,[INFO],everything normal for id 530537821))
    
  4. Ukončete quit prostředí Grunt a pak pomocí následujícího příkazu upravte soubor pigudf.py v místním systému souborů:

    nano pigudf.py
    
  5. V editoru odkomentujte následující řádek odebráním # znaku od začátku řádku:

    #from pig_util import outputSchema
    

    Tento řádek upraví skript Pythonu tak, aby místo Jythonu fungoval s jazykem C Python. Po provedení změny ukončete editor stisknutím ctrl+X . Vyberte Y a potom enter , aby se změny uložily.

  6. pig Pomocí příkazu znovu spusťte prostředí. Jakmile budete na příkazovém grunt> řádku, pomocí následujícího příkazu spusťte skript Pythonu pomocí interpretu jazyka C Python.

    Register 'pigudf.py' using streaming_python as myfuncs;
    LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);
    LOG = FILTER LOGS by LINE is not null;
    DETAILS = foreach LOG generate myfuncs.create_structure(LINE);
    DUMP DETAILS;
    

    Po dokončení této úlohy by se měl zobrazit stejný výstup jako při předchozím spuštění skriptu pomocí Jythonu.

Nahrání souboru (PowerShell)

PowerShell se dá použít také ke vzdálenému spouštění dotazů Hive. Ujistěte se, že pigudf.py se nachází váš pracovní adresář. Pomocí následujícího skriptu PowerShellu spusťte dotaz Hive, který tento skript používá pigudf.py :

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Connect-AzAccount
}

# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"

# Revise file path as needed
$pathToJythonFile = ".\pigudf.py"


# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$clusterInfo = Get-AzHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageAccountName=$clusterInfo.DefaultStorageAccount.split('.')[0]
$container=$clusterInfo.DefaultStorageContainer
$storageAccountKey=(Get-AzStorageAccountKey `
   -ResourceGroupName $resourceGroup `
   -Name $storageAccountName)[0].Value

# Create an Azure Storage context
$context = New-AzStorageContext `
    -StorageAccountName $storageAccountName `
    -StorageAccountKey $storageAccountKey

# Upload local files to an Azure Storage blob
Set-AzStorageBlobContent `
    -File $pathToJythonFile `
    -Blob "pigudf.py" `
    -Container $container `
    -Context $context

Použití funkce Pig UDF (PowerShell)

Poznámka:

Při vzdáleném odesílání úlohy pomocí PowerShellu není možné jako interpret použít jazyk C Python.

PowerShell se dá použít také ke spouštění úloh Pig Latin. Pokud chcete spustit úlohu Pig Latin, která tento skript používá, použijte následující skript PowerShellu pigudf.py :

# Script should stop on failures
$ErrorActionPreference = "Stop"

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Connect-AzAccount
}

# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -UserName "admin" -Message "Enter the login for the cluster"


$PigQuery = "Register wasbs:///pigudf.py using jython as myfuncs;" +
            "LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);" +
            "LOG = FILTER LOGS by LINE is not null;" +
            "DETAILS = foreach LOG generate myfuncs.create_structure(LINE);" +
            "DUMP DETAILS;"

# Create Pig job object
$jobDefinition = New-AzHDInsightPigJobDefinition -Query $PigQuery

# For status bar updates
$activity="Pig job"

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Starting job..."

# Start defined Azure HDInsight job on specified cluster.
$job = Start-AzHDInsightJob `
    -ClusterName $clusterName `
    -JobDefinition $jobDefinition `
    -HttpCredential $creds

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Waiting for the Pig job to complete..."

# Wait for completion or failure of specified job
Wait-AzHDInsightJob `
    -Job $job.JobId `
    -ClusterName $clusterName `
    -HttpCredential $creds

# Uncomment the following to see stderr output
<#
Get-AzHDInsightJobOutput `
    -Clustername $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds `
    -DisplayOutputType StandardError
#>

# Progress bar (optional)
Write-Progress -Activity $activity "Retrieving output..."

# Gets the log output
Get-AzHDInsightJobOutput `
    -Clustername $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds

Výstup úlohy Pig by se měl podobat následujícím datům:

((2012-02-03,20:11:56,SampleClass5,[TRACE],verbose detail for id 990982084))
((2012-02-03,20:11:56,SampleClass7,[TRACE],verbose detail for id 1560323914))
((2012-02-03,20:11:56,SampleClass8,[DEBUG],detail for id 2083681507))
((2012-02-03,20:11:56,SampleClass3,[TRACE],verbose detail for id 1718828806))
((2012-02-03,20:11:56,SampleClass3,[INFO],everything normal for id 530537821))

Řešení problému

Chyby při spouštění úloh

Při spuštění úlohy Hive se může zobrazit chyba podobná následujícímu textu:

Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: [Error 20001]: An error occurred while reading or writing to your custom script. It may have crashed with an error.

Příčinou tohoto problému může být konec řádku v souboru Pythonu. Mnoho editorů Windows jako konec řádku používá CRLF, ale linuxové aplikace obvykle očekávají LF.

Před nahráním souboru do HDInsight můžete pomocí následujících příkazů PowerShellu odebrat znaky CR:

Write-Progress -Activity $activity -Status "Waiting for the Pig job to complete..."

# Wait for completion or failure of specified job

Skripty PowerShellu

Oba ukázkové skripty PowerShellu použité ke spuštění příkladů obsahují okomentovaný řádek, který zobrazuje výstup chyby pro úlohu. Pokud se pro úlohu nezobrazuje očekávaný výstup, odkomentujte následující řádek a zkontrolujte, jestli informace o chybě značí problém.

$activity="Pig job"

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Starting job..."

Informace o chybě (STDERR) a výsledek úlohy (STDOUT) se také zaznamenávají do vašeho úložiště HDInsight.

Pro tuto úlohu... Podívejte se na tyto soubory v kontejneru objektů blob.
Hive /HivePython/stderr

/HivePython/stdout

Pig /PigPython/stderr

/PigPython/stdout

Další kroky

Pokud potřebujete načíst moduly Pythonu, které nejsou ve výchozím nastavení k dispozici, přečtěte si, jak nasadit modul do Azure HDInsight.

Další způsoby použití Pig, Hive a další informace o použití MapReduce najdete v následujících dokumentech: