Use Python User Defined Functions (UDF) with Apache Hive and Apache Pig in HDInsight
Article
Learn how to use Python user-defined functions (UDF) with Apache Hive and Apache Pig in Apache Hadoop on Azure HDInsight.
Python on HDInsight
Python2.7 is installed by default on HDInsight 3.0 and later. Apache Hive can be used with this version of Python for stream processing. Stream processing uses STDOUT and STDIN to pass data between Hive and the UDF.
HDInsight also includes Jython, which is a Python implementation written in Java. Jython runs directly on the Java Virtual Machine and doesn't use streaming. Jython is the recommended Python interpreter when using Python with Pig.
The URI scheme for your clusters primary storage. This would be wasb:// for Azure Storage, abfs:// for Azure Data Lake Storage Gen2 or adl:// for Azure Data Lake Storage Gen1. If secure transfer is enabled for Azure Storage, the URI would be wasbs://. See also, secure transfer.
Possible change to storage configuration. See Storage configuration if using storage account kind BlobStorage.
Optional. If planning to use PowerShell, you need the AZ module installed.
Note
The storage account used in this article was Azure Storage with secure transfer enabled and thus wasbs is used throughout the article.
Storage configuration
No action is required if the storage account used is of kind Storage (general purpose v1) or StorageV2 (general purpose v2). The process in this article produces output to at least /tezstaging. A default hadoop configuration contains /tezstaging in the fs.azure.page.blob.dir configuration variable in core-site.xml for service HDFS. This configuration causes output to the directory to be page blobs, which aren't supported for storage account kind BlobStorage. To use BlobStorage for this article, remove /tezstaging from the fs.azure.page.blob.dir configuration variable. The configuration can be accessed from the Ambari UI. Otherwise, you receive the error message: Page blob is not supported for this account type.
Warning
The steps in this document make the following assumptions:
You create the Python scripts on your local development environment.
You upload the scripts to HDInsight using either the scp command or the provided PowerShell script.
Create the scripts inside the cloud shell environment.
Use scp to upload the files from the cloud shell to HDInsight.
Use ssh from the cloud shell to connect to HDInsight and run the examples.
Apache Hive UDF
Python can be used as a UDF from Hive through the HiveQL TRANSFORM statement. For example, the following HiveQL invokes the hiveudf.py file stored in the default Azure Storage account for the cluster.
HiveQL
add file wasbs:///hiveudf.py;
SELECT TRANSFORM (clientid, devicemake, devicemodel)
USING 'python hiveudf.py' AS
(clientid string, phoneLabel string, phoneHash string)
FROM hivesampletable
ORDER BY clientid LIMIT 50;
Here's what this example does:
The add file statement at the beginning of the file adds the hiveudf.py file to the distributed cache, so it's accessible by all nodes in the cluster.
The SELECT TRANSFORM ... USING statement selects data from the hivesampletable. It also passes the clientid, devicemake, and devicemodel values to the hiveudf.py script.
The AS clause describes the fields returned from hiveudf.py.
Create file
On your development environment, create a text file named hiveudf.py. Use the following code as the contents of the file:
The trailing newline character is removed using string.strip(line, "\n ").
When doing stream processing, a single line contains all the values with a tab character between each value. So string.split(line, "\t") can be used to split the input at each tab, returning just the fields.
When processing is complete, the output must be written to STDOUT as a single line, with a tab between each field. For example, print "\t".join([clientid, phone_label, hashlib.md5(phone_label).hexdigest()]).
The while loop repeats until no line is read.
The script output is a concatenation of the input values for devicemake and devicemodel, and a hash of the concatenated value.
Upload file (shell)
The following command replaces sshuser with the actual username if different. Replace mycluster with the actual cluster name. Ensure your working directory is where the file is located.
Use scp to copy the files to your HDInsight cluster. Edit and enter the command:
Enter the following query at the 0: jdbc:hive2://headnodehost:10001/> prompt:
hive
add file wasbs:///hiveudf.py;
SELECT TRANSFORM (clientid, devicemake, devicemodel)
USING 'python hiveudf.py' AS
(clientid string, phoneLabel string, phoneHash string)
FROM hivesampletable
ORDER BY clientid LIMIT 50;
Once the last line entered, the job should start. Once the job completes, it returns output similar to the following example:
Output
100041 RIM 9650 d476f3687700442549a83fac4560c51c
100041 RIM 9650 d476f3687700442549a83fac4560c51c
100042 Apple iPhone 4.2.x 375ad9a0ddc4351536804f1d5d0ea9b9
100042 Apple iPhone 4.2.x 375ad9a0ddc4351536804f1d5d0ea9b9
100042 Apple iPhone 4.2.x 375ad9a0ddc4351536804f1d5d0ea9b9
To exit Beeline, enter the following command:
hive
!q
Upload file (PowerShell)
PowerShell can also be used to remotely run Hive queries. Ensure your working directory is where hiveudf.py is located. Use the following PowerShell script to run a Hive query that uses the hiveudf.py script:
PowerShell
# Login to your Azure subscription# Is there an active Azure subscription?$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
Connect-AzAccount
}
# If you have multiple subscriptions, set the one to use# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"# Revise file path as needed$pathToStreamingFile = ".\hiveudf.py"# Get cluster info$clusterName = Read-Host -Prompt"Enter the HDInsight cluster name"$clusterInfo = Get-AzHDInsightCluster -ClusterName$clusterName$resourceGroup = $clusterInfo.ResourceGroup
$storageAccountName=$clusterInfo.DefaultStorageAccount.split('.')[0]
$container=$clusterInfo.DefaultStorageContainer
$storageAccountKey=(Get-AzStorageAccountKey `
-ResourceGroupName$resourceGroup `
-Name$storageAccountName)[0].Value
# Create an Azure Storage context$context = New-AzStorageContext `
-StorageAccountName$storageAccountName `
-StorageAccountKey$storageAccountKey# Upload local files to an Azure Storage blobSet-AzStorageBlobContent `
-File$pathToStreamingFile `
-Blob"hiveudf.py" `
-Container$container `
-Context$context
# Script should stop on failures$ErrorActionPreference = "Stop"# Login to your Azure subscription# Is there an active Azure subscription?$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
Connect-AzAccount
}
# If you have multiple subscriptions, set the one to use# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"# Get cluster info$clusterName = Read-Host -Prompt"Enter the HDInsight cluster name"$creds=Get-Credential -UserName"admin" -Message"Enter the login for the cluster"$HiveQuery = "add file wasbs:///hiveudf.py;" +
"SELECT TRANSFORM (clientid, devicemake, devicemodel) " +
"USING 'python hiveudf.py' AS " +
"(clientid string, phoneLabel string, phoneHash string) " +
"FROM hivesampletable " +
"ORDER BY clientid LIMIT 50;"# Create Hive job object$jobDefinition = New-AzHDInsightHiveJobDefinition `
-Query$HiveQuery# For status bar updates$activity="Hive query"# Progress bar (optional)Write-Progress -Activity$activity -Status"Starting query..."# Start defined Azure HDInsight job on specified cluster.$job = Start-AzHDInsightJob `
-ClusterName$clusterName `
-JobDefinition$jobDefinition `
-HttpCredential$creds# Progress bar (optional)Write-Progress -Activity$activity -Status"Waiting on query to complete..."# Wait for completion or failure of specified jobWait-AzHDInsightJob `
-JobId$job.JobId `
-ClusterName$clusterName `
-HttpCredential$creds# Uncomment the following to see stderr output<#
Get-AzHDInsightJobOutput `
-Clustername $clusterName `
-JobId $job.JobId `
-HttpCredential $creds `
-DisplayOutputType StandardError
#># Progress bar (optional)Write-Progress -Activity$activity -Status"Retrieving output..."# Gets the log outputGet-AzHDInsightJobOutput `
-Clustername$clusterName `
-JobId$job.JobId `
-HttpCredential$creds
The output for the Hive job should appear similar to the following example:
Output
100041 RIM 9650 d476f3687700442549a83fac4560c51c
100041 RIM 9650 d476f3687700442549a83fac4560c51c
100042 Apple iPhone 4.2.x 375ad9a0ddc4351536804f1d5d0ea9b9
100042 Apple iPhone 4.2.x 375ad9a0ddc4351536804f1d5d0ea9b9
100042 Apple iPhone 4.2.x 375ad9a0ddc4351536804f1d5d0ea9b9
Apache Pig UDF
A Python script can be used as a UDF from Pig through the GENERATE statement. You can run the script using either Jython or C Python.
Jython runs on the JVM, and can natively be called from Pig.
C Python is an external process, so the data from Pig on the JVM is sent out to the script running in a Python process. The output of the Python script is sent back into Pig.
To specify the Python interpreter, use register when referencing the Python script. The following examples register scripts with Pig as myfuncs:
To use Jython: register '/path/to/pigudf.py' using jython as myfuncs;
To use C Python: register '/path/to/pigudf.py' using streaming_python as myfuncs;
Important
When using Jython, the path to the pig_jython file can be either a local path or a WASBS:// path. However, when using C Python, you must reference a file on the local file system of the node that you are using to submit the Pig job.
Once past registration, the Pig Latin for this example is the same for both:
pig
LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);
LOG = FILTER LOGS by LINE is not null;
DETAILS = FOREACH LOG GENERATE myfuncs.create_structure(LINE);
DUMP DETAILS;
Here's what this example does:
The first line loads the sample data file, sample.log into LOGS. It also defines each record as a chararray.
The next line filters out any null values, storing the result of the operation into LOG.
Next, it iterates over the records in LOG and uses GENERATE to invoke the create_structure method contained in the Python/Jython script loaded as myfuncs. LINE is used to pass the current record to the function.
Finally, the outputs are dumped to STDOUT using the DUMP command. This command displays the results after the operation completes.
Create file
On your development environment, create a text file named pigudf.py. Use the following code as the contents of the file:
Python
# Uncomment the following if using C Python#from pig_util import outputSchema@outputSchema("log: {(date:chararray, time:chararray, classname:chararray, level:chararray, detail:chararray)}")defcreate_structure(input):if (input.startswith('java.lang.Exception')):
input = input[21:len(input)] + ' - java.lang.Exception'
date, time, classname, level, detail = input.split(' ', 4)
return date, time, classname, level, detail
In the Pig Latin example, the LINE input is defined as a chararray because there's no consistent schema for the input. The Python script transforms the data into a consistent schema for output.
The @outputSchema statement defines the format of the data that is returned to Pig. In this case, it's a data bag, which is a Pig data type. The bag contains the following fields, all of which are chararray (strings):
date - the date the log entry was created
time - the time the log entry was created
classname - the class name the entry was created for
level - the log level
detail - verbose details for the log entry
Next, the def create_structure(input) defines the function that Pig passes line items to.
The example data, sample.log, mostly conforms to the date, time, classname, level, and detail schema. However, it contains a few lines that begin with *java.lang.Exception*. These lines must be modified to match the schema. The if statement checks for those, then massages the input data to move the *java.lang.Exception* string to the end, bringing the data in-line with the expected output schema.
Next, the split command is used to split the data at the first four space characters. The output is assigned into date, time, classname, level, and detail.
Finally, the values are returned to Pig.
When the data is returned to Pig, it has a consistent schema as defined in the @outputSchema statement.
Upload file (shell)
In the commands below, replace sshuser with the actual username if different. Replace mycluster with the actual cluster name. Ensure your working directory is where the file is located.
Use scp to copy the files to your HDInsight cluster. Edit and enter the command:
Use SSH to connect to the cluster. Edit and enter the command:
Windows Command Prompt
ssh sshuser@mycluster-ssh.azurehdinsight.net
From the SSH session, add the Python files uploaded previously to the storage for the cluster.
Bash
hdfs dfs -put pigudf.py /pigudf.py
Use Pig UDF (shell)
To connect to pig, use the following command from your open SSH session:
Bash
pig
Enter the following statements at the grunt> prompt:
pig
Register wasbs:///pigudf.py using jython as myfuncs;
LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);
LOG = FILTER LOGS by LINE is not null;
DETAILS = foreach LOG generate myfuncs.create_structure(LINE);
DUMP DETAILS;
After you enter the following line, the job should start. Once the job completes, it returns output similar to the following data:
Output
((2012-02-03,20:11:56,SampleClass5,[TRACE],verbose detail for id 990982084))
((2012-02-03,20:11:56,SampleClass7,[TRACE],verbose detail for id 1560323914))
((2012-02-03,20:11:56,SampleClass8,[DEBUG],detail for id 2083681507))
((2012-02-03,20:11:56,SampleClass3,[TRACE],verbose detail for id 1718828806))
((2012-02-03,20:11:56,SampleClass3,[INFO],everything normal for id 530537821))
Use quit to exit the Grunt shell, and then use the following to edit the pigudf.py file on the local file system:
Bash
nano pigudf.py
Once in the editor, uncomment the following line by removing the # character from the beginning of the line:
Bash
#from pig_util import outputSchema
This line modifies the Python script to work with C Python instead of Jython. Once the change has been made, use Ctrl+X to exit the editor. Select Y, and then Enter to save the changes.
Use the pig command to start the shell again. Once you are at the grunt> prompt, use the following to run the Python script using the C Python interpreter.
pig
Register 'pigudf.py' using streaming_python as myfuncs;
LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);
LOG = FILTER LOGS by LINE is not null;
DETAILS = foreach LOG generate myfuncs.create_structure(LINE);
DUMP DETAILS;
Once this job completes, you should see the same output as when you previously ran the script using Jython.
Upload file (PowerShell)
PowerShell can also be used to remotely run Hive queries. Ensure your working directory is where pigudf.py is located. Use the following PowerShell script to run a Hive query that uses the pigudf.py script:
PowerShell
# Login to your Azure subscription# Is there an active Azure subscription?$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
Connect-AzAccount
}
# If you have multiple subscriptions, set the one to use# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"# Revise file path as needed$pathToJythonFile = ".\pigudf.py"# Get cluster info$clusterName = Read-Host -Prompt"Enter the HDInsight cluster name"$clusterInfo = Get-AzHDInsightCluster -ClusterName$clusterName$resourceGroup = $clusterInfo.ResourceGroup
$storageAccountName=$clusterInfo.DefaultStorageAccount.split('.')[0]
$container=$clusterInfo.DefaultStorageContainer
$storageAccountKey=(Get-AzStorageAccountKey `
-ResourceGroupName$resourceGroup `
-Name$storageAccountName)[0].Value
# Create an Azure Storage context$context = New-AzStorageContext `
-StorageAccountName$storageAccountName `
-StorageAccountKey$storageAccountKey# Upload local files to an Azure Storage blobSet-AzStorageBlobContent `
-File$pathToJythonFile `
-Blob"pigudf.py" `
-Container$container `
-Context$context
Use Pig UDF (PowerShell)
Note
When remotely submitting a job using PowerShell, it is not possible to use C Python as the interpreter.
PowerShell can also be used to run Pig Latin jobs. To run a Pig Latin job that uses the pigudf.py script, use the following PowerShell script:
PowerShell
# Script should stop on failures$ErrorActionPreference = "Stop"# Login to your Azure subscription# Is there an active Azure subscription?$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
Connect-AzAccount
}
# Get cluster info$clusterName = Read-Host -Prompt"Enter the HDInsight cluster name"$creds=Get-Credential -UserName"admin" -Message"Enter the login for the cluster"$PigQuery = "Register wasbs:///pigudf.py using jython as myfuncs;" +
"LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);" +
"LOG = FILTER LOGS by LINE is not null;" +
"DETAILS = foreach LOG generate myfuncs.create_structure(LINE);" +
"DUMP DETAILS;"# Create Pig job object$jobDefinition = New-AzHDInsightPigJobDefinition -Query$PigQuery# For status bar updates$activity="Pig job"# Progress bar (optional)Write-Progress -Activity$activity -Status"Starting job..."# Start defined Azure HDInsight job on specified cluster.$job = Start-AzHDInsightJob `
-ClusterName$clusterName `
-JobDefinition$jobDefinition `
-HttpCredential$creds# Progress bar (optional)Write-Progress -Activity$activity -Status"Waiting for the Pig job to complete..."# Wait for completion or failure of specified jobWait-AzHDInsightJob `
-Job$job.JobId `
-ClusterName$clusterName `
-HttpCredential$creds# Uncomment the following to see stderr output<#
Get-AzHDInsightJobOutput `
-Clustername $clusterName `
-JobId $job.JobId `
-HttpCredential $creds `
-DisplayOutputType StandardError
#># Progress bar (optional)Write-Progress -Activity$activity"Retrieving output..."# Gets the log outputGet-AzHDInsightJobOutput `
-Clustername$clusterName `
-JobId$job.JobId `
-HttpCredential$creds
The output for the Pig job should appear similar to the following data:
Output
((2012-02-03,20:11:56,SampleClass5,[TRACE],verbose detail for id 990982084))
((2012-02-03,20:11:56,SampleClass7,[TRACE],verbose detail for id 1560323914))
((2012-02-03,20:11:56,SampleClass8,[DEBUG],detail for id 2083681507))
((2012-02-03,20:11:56,SampleClass3,[TRACE],verbose detail for id 1718828806))
((2012-02-03,20:11:56,SampleClass3,[INFO],everything normal for id 530537821))
Troubleshooting
Errors when running jobs
When running the hive job, you may encounter an error similar to the following text:
Output
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: [Error 20001]: An error occurred while reading or writing to your custom script. It may have crashed with an error.
This problem may be caused by the line endings in the Python file. Many Windows editors default to using CRLF as the line ending, but Linux applications usually expect LF.
You can use the following PowerShell statements to remove the CR characters before uploading the file to HDInsight:
PowerShell
Write-Progress -Activity$activity -Status"Waiting for the Pig job to complete..."# Wait for completion or failure of specified job
PowerShell scripts
Both of the example PowerShell scripts used to run the examples contain a commented line that displays error output for the job. If you aren't seeing the expected output for the job, uncomment the following line and see if the error information indicates a problem.
PowerShell
$activity="Pig job"# Progress bar (optional)Write-Progress -Activity$activity -Status"Starting job..."
The error information (STDERR) and the result of the job (STDOUT) are also logged to your HDInsight storage.
Azure HPC is a purpose-built cloud capability for HPC & AI workload, using leading-edge processors and HPC-class InfiniBand interconnect, to deliver the best application performance, scalability, and value. Azure HPC enables users to unlock innovation, productivity, and business agility, through a highly available range of HPC & AI technologies that can be dynamically allocated as your business and technical needs change. This learning path is a series of modules that help you get started on Azure HPC - you