Udostępnij za pośrednictwem


start a Pig + Jython job in HDInsight thru WebHCat

You can also use HDInsight with Hive + Python.

The drawback of the latter is that you use streaming between Hive and Python. In Hadoop streaming is just a way to call stdin/stdout inter process communication. So if you just do simple operations like string concatenations between two fields in Python it may be slow. the good things is that hive has user defined functions and also standard ones that help do all the simple things (like string concatenation).

There’s a way to use Python language without using streaming: just run Python in the JVM (remember Hadoop is written in Java). Python in the JVM is Jython. And Pig (an equivalent to Hive that has its own scripting language, instead of using SQL) can call Jython scripts.

With HDInsight 3.0 which became generally available recently, you can use that kind of feature. Here’s how. In order to launch the job here, I use a script from Linux that leverages WebHCat / Templeton REST API from a Linux machine. Here is how.

The Python script that launches the job is the following:

 import requests #https://pypi.python.org/pypi/requests

clusterName='monclusterhadoop'
clusterAdmin='cornac'
clusterPassword='ChangeWithY0urs!'

#get WebHCat status
webHCatUrl='https://' + clusterName + '.azurehdinsight.net/templeton/v1/status'

r = requests.get(webHCatUrl, auth=(clusterAdmin, clusterPassword))

print r.status_code
print r.json()

#submit a pig job:
# https://docs.hortonworks.com/HDPDocuments/HDP1/HDP-Win-1.3.0/ds_HCatalog/pig.html

webHCatUrl='https://' + clusterName + '.azurehdinsight.net/templeton/v1/pig'

hive_params={'user.name':clusterAdmin,
             'file':'wasb://demo@monstockageazure.blob.core.windows.net/scripts/pig_python/pig_python.pig',
             'statusdir': '/wasbwork/pig_from_python'}

r = requests.post(webHCatUrl, auth=(clusterAdmin, clusterPassword), data=hive_params)
print r.status_code
print r.json()

the pig job looks like this:

Register 'wasb://demo@monstockageazure.blob.core.windows.net/scripts/pig_python/pig_python.py' using jython as myfuncs;

a = load 'wasb://demo@monstockageazure.blob.core.windows.net/data/ref_villes' using PigStorage(' ') as (ville:chararray);

b = foreach a generate ville, myfuncs.helloworld(), myfuncs.square(3);

store b into '/wasbwork/pigresult';

the Python script called by Pig and defines a few sample basic functions is the following:

 #!/usr/bin/python

@outputSchema("word:chararray")
def helloworld():
    return ('Hello, World')
 
@outputSchema("t:(word:chararray,num:long)")
def complex(word):
    return (str(word),long(word)*long(word))

@outputSchemaFunction("squareSchema")
def square(num):   
    return ((num)*(num))   

@schemaFunction("squareSchema") 
def squareSchema(input):   
    return input   

# No decorator - bytearray 
def concat(str):   
    return str+str

Source data (ref_villes) looks like this (first lines) :

 paris
marseille
lyon
toulouse
nice
nantes
strasbourg
montpellier
bordeaux
lille
rennes
reims
le havre
saint-etienne
toulon
grenoble

the output (part-m-00000) looks like this

 paris    Hello, World    9
marseille    Hello, World    9
lyon    Hello, World    9
toulouse    Hello, World    9
nice    Hello, World    9
nantes    Hello, World    9
strasbourg    Hello, World    9
montpellier    Hello, World    9
bordeaux    Hello, World    9
lille    Hello, World    9
rennes    Hello, World    9
reims    Hello, World    9
le    Hello, World    9
saint-etienne    Hello, World    9
toulon    Hello, World    9
grenoble    Hello, World    9

the execution report looks like this (stderr):

 2014-03-21 11:50:59,951 [main] INFO  org.apache.pig.Main - Apache Pig version 0.12.0.2.0.7.0-1551 (r: unknown) compiled Feb 19 2014, 11:47:04
2014-03-21 11:50:59,951 [main] INFO  org.apache.pig.Main - Logging error messages to: C:\apps\dist\hadoop-2.2.0.2.0.7.0-1551\logs\pig_1395402659935.log
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/C:/apps/dist/hadoop-2.2.0.2.0.7.0-1551/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/C:/apps/dist/pig-0.12.0.2.0.7.0-1551/pig-0.12.0.2.0.7.0-1551.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See https://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2014-03-21 11:51:00,810 [main] INFO  org.apache.pig.impl.util.Utils - Default bootup file D:\Users\hdp/.pigbootup not found
2014-03-21 11:51:00,997 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
2014-03-21 11:51:00,997 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
2014-03-21 11:51:00,997 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: wasb://monclusterhadoop@monstockageazure.blob.core.windows.net
2014-03-21 11:51:01,451 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
2014-03-21 11:51:01,841 [main] INFO  org.apache.pig.scripting.jython.JythonScriptEngine - created tmp python.cachedir=D:\Users\hdp\AppData\Local\Temp\pig_jython_5196260548692206718
2014-03-21 11:51:03,951 [main] WARN  org.apache.pig.scripting.jython.JythonScriptEngine - pig.cmd.args.remainders is empty. This is not expected unless on testing.
2014-03-21 11:51:04,560 [main] INFO  org.apache.pig.scripting.jython.JythonScriptEngine - Register scripting UDF: myfuncs.complex
2014-03-21 11:51:04,560 [main] INFO  org.apache.pig.scripting.jython.JythonScriptEngine - Register scripting UDF: myfuncs.square
2014-03-21 11:51:04,576 [main] INFO  org.apache.pig.scripting.jython.JythonScriptEngine - Register scripting UDF: myfuncs.helloworld
2014-03-21 11:51:04,576 [main] INFO  org.apache.pig.scripting.jython.JythonScriptEngine - Register scripting UDF: myfuncs.concat
2014-03-21 11:51:04,701 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
2014-03-21 11:51:04,951 [main] INFO  org.apache.pig.scripting.jython.JythonFunction - Schema 'word:chararray' defined for func helloworld
2014-03-21 11:51:05,232 [main] INFO  org.apache.pig.tools.pigstats.ScriptState - Pig features used in the script: UNKNOWN
2014-03-21 11:51:05,326 [main] INFO  org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer - {RULES_ENABLED=[AddForEach, ColumnMapKeyPrune, DuplicateForEachColumnRewrite, GroupByConstParallelSetter, ImplicitSplitInserter, LimitOptimizer, LoadTypeCastInserter, MergeFilter, MergeForEach, NewPartitionFilterOptimizer, PartitionFilterOptimizer, PushDownForEachFlatten, PushUpFilter, SplitFilter, StreamTypeCastInserter], RULES_DISABLED=[FilterLogicExpressionSimplifier]}
2014-03-21 11:51:05,482 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - mapred.textoutputformat.separator is deprecated. Instead, use mapreduce.output.textoutputformat.separator
2014-03-21 11:51:05,763 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - File concatenation threshold: 100 optimistic? false
2014-03-21 11:51:05,810 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 1
2014-03-21 11:51:05,810 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 1
2014-03-21 11:51:06,091 [main] INFO  org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnode0/100.86.204.54:9010
2014-03-21 11:51:06,263 [main] INFO  org.apache.pig.tools.pigstats.ScriptState - Pig script settings are added to the job
2014-03-21 11:51:06,263 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - mapred.job.reduce.markreset.buffer.percent is deprecated. Instead, use mapreduce.reduce.markreset.buffer.percent
2014-03-21 11:51:06,263 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - mapred.job.reduce.markreset.buffer.percent is not set, set to default 0.3
2014-03-21 11:51:06,263 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - mapred.output.compress is deprecated. Instead, use mapreduce.output.fileoutputformat.compress
2014-03-21 11:51:06,279 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - creating jar file Job608857099241848139.jar
2014-03-21 11:51:14,294 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - jar file Job608857099241848139.jar created
2014-03-21 11:51:14,294 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - mapred.jar is deprecated. Instead, use mapreduce.job.jar
2014-03-21 11:51:14,341 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Setting up single store job
2014-03-21 11:51:14,341 [main] INFO  org.apache.pig.data.SchemaTupleFrontend - Key [pig.schematuple] is false, will not generate code.
2014-03-21 11:51:14,341 [main] INFO  org.apache.pig.data.SchemaTupleFrontend - Starting process to move generated code to distributed cache
2014-03-21 11:51:14,341 [main] INFO  org.apache.pig.data.SchemaTupleFrontend - Setting key [pig.schematuple.classes] with classes to deserialize []
2014-03-21 11:51:14,404 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 1 map-reduce job(s) waiting for submission.
2014-03-21 11:51:14,404 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - mapred.job.tracker.http.address is deprecated. Instead, use mapreduce.jobtracker.http.address
2014-03-21 11:51:14,404 [JobControl] INFO  org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnode0/100.86.204.54:9010
2014-03-21 11:51:14,513 [JobControl] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
2014-03-21 11:51:16,154 [JobControl] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 2
2014-03-21 11:51:16,154 [JobControl] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 2
2014-03-21 11:51:16,185 [JobControl] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths (combined) to process : 1
2014-03-21 11:51:16,435 [JobControl] INFO  org.apache.hadoop.mapreduce.JobSubmitter - number of splits:1
2014-03-21 11:51:16,732 [JobControl] INFO  org.apache.hadoop.mapreduce.JobSubmitter - Submitting tokens for job: job_1395391185318_0006
2014-03-21 11:51:16,732 [JobControl] INFO  org.apache.hadoop.mapreduce.JobSubmitter - Kind: mapreduce.job, Service: job_1395391185318_0005, Ident: (org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier@45d45314)
2014-03-21 11:51:16,763 [JobControl] INFO  org.apache.hadoop.mapreduce.JobSubmitter - Kind: RM_DELEGATION_TOKEN, Service: 100.86.204.54:9010, Ident: (owner=cornac, renewer=mr token, realUser=hdp, issueDate=1395402643673, maxDate=1396007443673, sequenceNumber=5, masterKeyId=2)
2014-03-21 11:51:17,154 [JobControl] INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1395391185318_0006 to ResourceManager at headnode0/100.86.204.54:9010
2014-03-21 11:51:17,232 [JobControl] INFO  org.apache.hadoop.mapreduce.Job - The url to track the job: https://headnode0:9014/proxy/application_1395391185318_0006/
2014-03-21 11:51:17,232 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - HadoopJobId: job_1395391185318_0006
2014-03-21 11:51:17,232 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Processing aliases a,b
2014-03-21 11:51:17,232 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - detailed locations: M: a[3,4],b[-1,-1] C:  R: 
2014-03-21 11:51:17,279 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 0% complete
2014-03-21 11:51:34,575 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 50% complete
2014-03-21 11:51:37,981 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
2014-03-21 11:51:38,028 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete
2014-03-21 11:51:38,028 [main] INFO  org.apache.pig.tools.pigstats.SimplePigStats - Script Statistics: 

HadoopVersion    PigVersion    UserId    StartedAt    FinishedAt    Features
2.2.0.2.0.7.0-1551    0.12.0.2.0.7.0-1551    hdp    2014-03-21 11:51:06    2014-03-21 11:51:38    UNKNOWN

Success!

Job Stats (time in seconds):
JobId    Maps    Reduces    MaxMapTime    MinMapTIme    AvgMapTime    MedianMapTime    MaxReduceTime    MinReduceTime    AvgReduceTime    MedianReducetime    Alias    Feature    Outputs
job_1395391185318_0006    1    0    6    6    6    6    n/a    n/a    n/a    n/a    a,b    MAP_ONLY    /wasbwork/pigresult,

Input(s):
Successfully read 260 records from: "wasb://demo@monstockageazure.blob.core.windows.net/data/ref_villes"

Output(s):
Successfully stored 260 records in: "/wasbwork/pigresult"

Counters:
Total records written : 260
Total bytes written : 0
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0

Job DAG:
job_1395391185318_0006


2014-03-21 11:51:38,278 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!

Smile

Benjamin (@benjguin)