Sdílet prostřednictvím


Použití uživatelem definovaných funkcí Pythonu (UDF) s Apache Hivem a Apache Pigem ve službě HDInsight

Naučte se používat uživatelem definované funkce Pythonu (UDF) s Apache Hivem a Apache Pigem v Apache Hadoopu ve službě Azure HDInsight.

Python ve službě HDInsight

Python2.7 je ve výchozím nastavení nainstalovaný ve službě HDInsight 3.0 a novějším. Apache Hive se dá použít s touto verzí Pythonu ke zpracování datových proudů. Zpracování datových proudů používá k předávání dat mezi Hivem a uživatelem definovanou funkcí STDOUT a STDIN.

HDInsight také obsahuje Jython, což je implementace Pythonu napsaná v Javě. Jython běží přímo na virtuálním počítači Java a nepoužívá streamování. Jython je doporučený interpret Pythonu při použití Pythonu s Pigem.

Požadavky

Poznámka:

Účet úložiště použitý v tomto článku byl Azure Storage s povoleným zabezpečeným přenosem , a proto wasbs se používá v celém článku.

Konfigurace úložiště

Není vyžadována žádná akce, pokud použitý účet úložiště je druh Storage (general purpose v1) nebo StorageV2 (general purpose v2). Proces popsaný v tomto článku vytvoří výstup alespoň /tezstaging. Výchozí konfigurace hadoopu obsahuje /tezstaging v fs.azure.page.blob.dir konfigurační proměnné pro core-site.xml službu HDFS. Tato konfigurace způsobí, že výstup do adresáře budou stránkové objekty blob, které nejsou podporovány pro druh účtu úložiště BlobStorage. Chcete-li použít BlobStorage pro tento článek, odeberte /tezstaging z fs.azure.page.blob.dir konfigurační proměnné. Ke konfiguraci je možné přistupovat z uživatelského rozhraní Ambari. V opačném případě se zobrazí chybová zpráva: Page blob is not supported for this account type.

Varování

Kroky v tomto dokumentu předpokládají následující předpoklady:

  • Skripty Pythonu vytvoříte v místním vývojovém prostředí.
  • Skripty nahrajete do HDInsight pomocí scp příkazu nebo zadaného skriptu PowerShellu.

Pokud chcete použít Azure Cloud Shell (bash) pro práci se službou HDInsight, musíte:

  • Vytvořte skripty v prostředí Cloud Shell.
  • Použijte scp k přenosu souborů z Cloud Shell do HDInsight.
  • Pomocí ssh cloud shellu se připojte ke službě HDInsight a spusťte příklady.

Apache Hive UDF

Python se dá použít jako UDF z Hivu prostřednictvím příkazu HiveQL TRANSFORM . Například následující HiveQL vyvolá hiveudf.py soubor uložený ve výchozím účtu Azure Storage pro cluster.

add file wasbs:///hiveudf.py;

SELECT TRANSFORM (clientid, devicemake, devicemodel)
    USING 'python hiveudf.py' AS
    (clientid string, phoneLabel string, phoneHash string)
FROM hivesampletable
ORDER BY clientid LIMIT 50;

Tady je, co tento příklad dělá:

  1. Příkaz add file na začátku souboru přidá hiveudf.py soubor do distribuované mezipaměti, takže je přístupný všemi uzly v clusteru.
  2. Příkaz SELECT TRANSFORM ... USING vybere data z objektu hivesampletable. Předává také hodnoty clientid, devicemake a devicemodel do hiveudf.py skriptu.
  3. Klauzule AS popisuje pole vrácená z hiveudf.py.

Vytvořit soubor

Ve vývojovém prostředí vytvořte textový soubor s názvem hiveudf.py. Jako obsah souboru použijte následující kód:

#!/usr/bin/env python
import sys
import string
import hashlib

while True:
    line = sys.stdin.readline()
    if not line:
        break

    line = string.strip(line, "\n ")
    clientid, devicemake, devicemodel = string.split(line, "\t")
    phone_label = devicemake + ' ' + devicemodel
    print "\t".join([clientid, phone_label, hashlib.md5(phone_label).hexdigest()])

Tento skript provádí následující akce:

  1. Načte řádek dat ze služby STDIN.
  2. Koncový znak nového řádku se odebere pomocí string.strip(line, "\n ").
  3. Při zpracování datových proudů obsahuje jeden řádek všechny hodnoty se znakem tabulátoru mezi jednotlivými hodnotami. Takže string.split(line, "\t") lze použít k rozdělení vstupu u každé tabulátory a vrátí pouze pole.
  4. Po dokončení zpracování musí být výstup zapsán do STDOUT jako jeden řádek s tabulátorem mezi poli. Například print "\t".join([clientid, phone_label, hashlib.md5(phone_label).hexdigest()]).
  5. Smyčka while se opakuje, dokud není line přečten.

Výstup skriptu je zřetězení vstupních hodnot pro devicemake a devicemodel a hash zřetězené hodnoty.

Nahrání souboru (příkazový řádek)

Následující příkaz nahradí sshuser skutečným uživatelským jménem, pokud se liší. Nahraďte mycluster skutečným názvem clusteru. Ujistěte se, že váš pracovní adresář odpovídá umístění souboru.

  1. Použijte scp ke kopírování souborů do clusteru HDInsight. Upravte a zadejte příkaz:

    scp hiveudf.py sshuser@mycluster-ssh.azurehdinsight.net:
    
  2. Připojte se ke clusteru pomocí SSH. Upravte a zadejte příkaz:

    ssh sshuser@mycluster-ssh.azurehdinsight.net
    
  3. V relaci SSH přidejte soubory Pythonu nahrané dříve do úložiště clusteru.

    hdfs dfs -put hiveudf.py /hiveudf.py
    

Použití uživatelsky definované funkce Hive (shell)

  1. Pokud se chcete připojit k Hivu, použijte následující příkaz z otevřené relace SSH:

    beeline -u 'jdbc:hive2://headnodehost:10001/;transportMode=http'
    

    Tento příkaz spustí klienta Beeline.

  2. Na příkazovém 0: jdbc:hive2://headnodehost:10001/> řádku zadejte následující dotaz:

    add file wasbs:///hiveudf.py;
    SELECT TRANSFORM (clientid, devicemake, devicemodel)
        USING 'python hiveudf.py' AS
        (clientid string, phoneLabel string, phoneHash string)
    FROM hivesampletable
    ORDER BY clientid LIMIT 50;
    
  3. Po zadání posledního řádku by se měla úloha spustit. Po dokončení úlohy vrátí výstup podobný následujícímu příkladu:

    100041    RIM 9650    d476f3687700442549a83fac4560c51c
    100041    RIM 9650    d476f3687700442549a83fac4560c51c
    100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
    100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
    100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
    
  4. Pokud chcete beeline ukončit, zadejte následující příkaz:

    !q
    

Nahrání souboru (PowerShell)

PowerShell se dá použít také ke vzdálenému spouštění dotazů Hive. Ujistěte se, že váš pracovní adresář je tam, kde se nachází hiveudf.py. Pomocí následujícího skriptu PowerShellu spusťte dotaz Hive, který používá skript hiveudf.py.

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Connect-AzAccount
}

# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"

# Revise file path as needed
$pathToStreamingFile = ".\hiveudf.py"

# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$clusterInfo = Get-AzHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageAccountName=$clusterInfo.DefaultStorageAccount.split('.')[0]
$container=$clusterInfo.DefaultStorageContainer
$storageAccountKey=(Get-AzStorageAccountKey `
   -ResourceGroupName $resourceGroup `
   -Name $storageAccountName)[0].Value

# Create an Azure Storage context
$context = New-AzStorageContext `
    -StorageAccountName $storageAccountName `
    -StorageAccountKey $storageAccountKey

# Upload local files to an Azure Storage blob
Set-AzStorageBlobContent `
    -File $pathToStreamingFile `
    -Blob "hiveudf.py" `
    -Container $container `
    -Context $context

Poznámka:

Další informace o nahrávání souborů najdete v tématu Nahrání dat pro úlohy Apache Hadoop v dokumentu HDInsight .

Použijte uživatelskou funkci Hive (UDF)

# Script should stop on failures
$ErrorActionPreference = "Stop"

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Connect-AzAccount
}

# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"

# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -UserName "admin" -Message "Enter the login for the cluster"

$HiveQuery = "add file wasbs:///hiveudf.py;" +
                "SELECT TRANSFORM (clientid, devicemake, devicemodel) " +
                "USING 'python hiveudf.py' AS " +
                "(clientid string, phoneLabel string, phoneHash string) " +
                "FROM hivesampletable " +
                "ORDER BY clientid LIMIT 50;"

# Create Hive job object
$jobDefinition = New-AzHDInsightHiveJobDefinition `
    -Query $HiveQuery

# For status bar updates
$activity="Hive query"

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Starting query..."

# Start defined Azure HDInsight job on specified cluster.
$job = Start-AzHDInsightJob `
    -ClusterName $clusterName `
    -JobDefinition $jobDefinition `
    -HttpCredential $creds

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Waiting on query to complete..."

# Wait for completion or failure of specified job
Wait-AzHDInsightJob `
    -JobId $job.JobId `
    -ClusterName $clusterName `
    -HttpCredential $creds

# Uncomment the following to see stderr output
<#
Get-AzHDInsightJobOutput `
   -Clustername $clusterName `
   -JobId $job.JobId `
   -HttpCredential $creds `
   -DisplayOutputType StandardError
#>

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Retrieving output..."

# Gets the log output
Get-AzHDInsightJobOutput `
    -Clustername $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds

Výstup pro úlohu Hive by se měl podobat následujícímu příkladu:

100041    RIM 9650    d476f3687700442549a83fac4560c51c
100041    RIM 9650    d476f3687700442549a83fac4560c51c
100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9

Apache Pig UDF

Skript Pythonu se dá použít jako UDF z Pigu prostřednictvím příkazu GENERATE . Skript můžete spustit pomocí Jythonu nebo C Pythonu.

  • Jython běží na JVM a lze jej volat nativně z frameworku Pig.
  • C Python je externí proces, takže data z Pigu na JVM se odesílají do skriptu spuštěného v procesu Pythonu. Výstup skriptu Pythonu se odešle zpět do Pigu.

Pokud chcete určit interpret Pythonu, použijte register ho při odkazování na skript Pythonu. Následující příklady registrují skripty ve službě Pig jako myfuncs:

  • Použití Jythonu: register '/path/to/pigudf.py' using jython as myfuncs;
  • Použití jazyka C Python: register '/path/to/pigudf.py' using streaming_python as myfuncs;

Důležité

Při použití Jythonu může být cesta k souboru pig_jython místní nebo WASBS:// cesta. Při použití jazyka C Python však musíte odkazovat na soubor v místním systému souborů uzlu, který používáte k odeslání úlohy Pig.

Jakmile projdete registrací, prasečí latina je v tomto příkladu stejná pro oba:

LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);
LOG = FILTER LOGS by LINE is not null;
DETAILS = FOREACH LOG GENERATE myfuncs.create_structure(LINE);
DUMP DETAILS;

Zde je to, co tento příklad dělá:

  1. První řádek načte ukázkový datový soubor sample.log do LOGS. Definuje také každý záznam jako chararrayzáznam .
  2. Další řádek vyfiltruje všechny hodnoty null a uloží výsledek operace do LOG.
  3. Dále iteruje přes záznamy v LOG a používá GENERATE k vyvolání metody create_structure obsažené ve skriptu Python/Jython načteném jako myfuncs. LINE slouží k předání aktuálního záznamu do funkce.
  4. Výstupy se nakonec pomocí příkazu DUMP vypíšou do STDOUT. Tento příkaz zobrazí výsledky po dokončení operace.

Vytvořit soubor

Ve vývojovém prostředí vytvořte textový soubor s názvem pigudf.py. Jako obsah souboru použijte následující kód:

# Uncomment the following if using C Python
#from pig_util import outputSchema


@outputSchema("log: {(date:chararray, time:chararray, classname:chararray, level:chararray, detail:chararray)}")
def create_structure(input):
    if (input.startswith('java.lang.Exception')):
        input = input[21:len(input)] + ' - java.lang.Exception'
    date, time, classname, level, detail = input.split(' ', 4)
    return date, time, classname, level, detail

V příkladu Pig Latin je LINE vstup definován jako chararray, protože pro vstup neexistuje konzistentní schéma. Skript Pythonu transformuje data na konzistentní schéma pro výstup.

  1. Příkaz @outputSchema definuje formát dat vrácených službě Pig. V tomto případě se jedná o data bag, což je datový typ Pig. Taška obsahuje následující pole, z nichž všechny jsou chararray (řetězce):

    • date – datum vytvoření položky protokolu
    • time – čas vytvoření položky protokolu
    • classname – název třídy, pro kterou byla položka vytvořena
    • level – úroveň záznamu
    • detail – podrobné informace o položce protokolu
  2. Následně def create_structure(input) definuje funkci, které Pig předává řádkové položky.

  3. Ukázková data, sample.logvětšinou odpovídají datu, času, názvu třídy, úrovni a schématu podrobností. Obsahuje však několik řádků, které začínají *java.lang.Exception*. Tyto řádky musí být upraveny tak, aby odpovídaly schématu. Tento if příkaz zkontroluje tyto skutečnosti a pak upraví vstupní data, aby se řetězec *java.lang.Exception* přesunul na konec, čímž se data uvedou do souladu s očekávaným výstupním schématem.

  4. Dále se příkaz split použije k rozdělení dat na prvních čtyřech znacích mezer. Výstup je přiřazen do date, time, classname, level, a detail.

  5. Nakonec se hodnoty vrátí do Pig.

Když se data vrátí do Pigu, mají konzistentní schéma definované v @outputSchema příkazu.

Nahrání souboru (shell)

V následujících příkazech nahraďte sshuser skutečným uživatelským jménem, pokud se liší. Nahraďte mycluster skutečným názvem clusteru. Ujistěte se, že váš pracovní adresář je tam, kde je umístěn soubor.

  1. Použijte scp ke kopírování souborů do clusteru HDInsight. Upravte a zadejte příkaz:

    scp pigudf.py sshuser@mycluster-ssh.azurehdinsight.net:
    
  2. Připojte se ke clusteru pomocí SSH. Upravte a zadejte příkaz:

    ssh sshuser@mycluster-ssh.azurehdinsight.net
    
  3. V relaci SSH přidejte soubory Pythonu nahrané dříve do úložiště clusteru.

    hdfs dfs -put pigudf.py /pigudf.py
    

Použití nástroje Pig UDF (shell)

  1. Pokud se chcete připojit k pigu, použijte následující příkaz z otevřené relace SSH:

    pig
    
  2. Na příkazovém grunt> řádku zadejte následující příkazy:

    Register wasbs:///pigudf.py using jython as myfuncs;
    LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);
    LOG = FILTER LOGS by LINE is not null;
    DETAILS = foreach LOG generate myfuncs.create_structure(LINE);
    DUMP DETAILS;
    
  3. Po zadání následujícího řádku by se měla úloha spustit. Po dokončení úlohy vrátí výstup podobný následujícím datům:

    ((2012-02-03,20:11:56,SampleClass5,[TRACE],verbose detail for id 990982084))
    ((2012-02-03,20:11:56,SampleClass7,[TRACE],verbose detail for id 1560323914))
    ((2012-02-03,20:11:56,SampleClass8,[DEBUG],detail for id 2083681507))
    ((2012-02-03,20:11:56,SampleClass3,[TRACE],verbose detail for id 1718828806))
    ((2012-02-03,20:11:56,SampleClass3,[INFO],everything normal for id 530537821))
    
  4. Ukončete shell Grunt pomocí quit, poté použijte následující příkaz k úpravě souboru pigudf.py v místním systému souborů:

    nano pigudf.py
    
  5. V editoru odkomentujte následující řádek odebráním # znaku od začátku řádku:

    #from pig_util import outputSchema
    

    Tento řádek upraví skript Pythonu tak, aby místo Jythonu fungoval s jazykem C Python. Po provedení změny ukončete editor stisknutím ctrl+X . Vyberte Y a potom enter , aby se změny uložily.

  6. Použijte příkaz pig ke znovuspuštění shellu. Jakmile budete na příkazovém grunt> řádku, pomocí následujícího příkazu spusťte skript Pythonu pomocí interpretu jazyka C Python.

    Register 'pigudf.py' using streaming_python as myfuncs;
    LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);
    LOG = FILTER LOGS by LINE is not null;
    DETAILS = foreach LOG generate myfuncs.create_structure(LINE);
    DUMP DETAILS;
    

    Po dokončení této úlohy by se měl zobrazit stejný výstup jako při předchozím spuštění skriptu pomocí Jythonu.

Nahrání souboru (PowerShell)

PowerShell se dá použít také ke vzdálenému spouštění dotazů Hive. Ujistěte se, že váš pracovní adresář je tam, kde se nachází pigudf.py. Pomocí následujícího skriptu v PowerShellu spusťte dotaz Hive, který používá skript pigudf.py:

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Connect-AzAccount
}

# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"

# Revise file path as needed
$pathToJythonFile = ".\pigudf.py"


# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$clusterInfo = Get-AzHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageAccountName=$clusterInfo.DefaultStorageAccount.split('.')[0]
$container=$clusterInfo.DefaultStorageContainer
$storageAccountKey=(Get-AzStorageAccountKey `
   -ResourceGroupName $resourceGroup `
   -Name $storageAccountName)[0].Value

# Create an Azure Storage context
$context = New-AzStorageContext `
    -StorageAccountName $storageAccountName `
    -StorageAccountKey $storageAccountKey

# Upload local files to an Azure Storage blob
Set-AzStorageBlobContent `
    -File $pathToJythonFile `
    -Blob "pigudf.py" `
    -Container $container `
    -Context $context

Použití funkce Pig UDF (PowerShell)

Poznámka:

Při vzdáleném odesílání úlohy pomocí PowerShellu není možné jako interpret použít jazyk C Python.

PowerShell se dá použít také ke spouštění úloh Pig Latin. Pokud chcete spustit úlohu Pig Latin, která tento skript používá, použijte následující skript PowerShellu pigudf.py :

# Script should stop on failures
$ErrorActionPreference = "Stop"

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Connect-AzAccount
}

# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -UserName "admin" -Message "Enter the login for the cluster"


$PigQuery = "Register wasbs:///pigudf.py using jython as myfuncs;" +
            "LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);" +
            "LOG = FILTER LOGS by LINE is not null;" +
            "DETAILS = foreach LOG generate myfuncs.create_structure(LINE);" +
            "DUMP DETAILS;"

# Create Pig job object
$jobDefinition = New-AzHDInsightPigJobDefinition -Query $PigQuery

# For status bar updates
$activity="Pig job"

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Starting job..."

# Start defined Azure HDInsight job on specified cluster.
$job = Start-AzHDInsightJob `
    -ClusterName $clusterName `
    -JobDefinition $jobDefinition `
    -HttpCredential $creds

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Waiting for the Pig job to complete..."

# Wait for completion or failure of specified job
Wait-AzHDInsightJob `
    -Job $job.JobId `
    -ClusterName $clusterName `
    -HttpCredential $creds

# Uncomment the following to see stderr output
<#
Get-AzHDInsightJobOutput `
    -Clustername $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds `
    -DisplayOutputType StandardError
#>

# Progress bar (optional)
Write-Progress -Activity $activity "Retrieving output..."

# Gets the log output
Get-AzHDInsightJobOutput `
    -Clustername $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds

Výstup úlohy Pig by se měl podobat následujícím datům:

((2012-02-03,20:11:56,SampleClass5,[TRACE],verbose detail for id 990982084))
((2012-02-03,20:11:56,SampleClass7,[TRACE],verbose detail for id 1560323914))
((2012-02-03,20:11:56,SampleClass8,[DEBUG],detail for id 2083681507))
((2012-02-03,20:11:56,SampleClass3,[TRACE],verbose detail for id 1718828806))
((2012-02-03,20:11:56,SampleClass3,[INFO],everything normal for id 530537821))

Řešení problému

Chyby při spouštění úloh

Při spuštění úlohy Hive se může zobrazit chyba podobná následujícímu textu:

Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: [Error 20001]: An error occurred while reading or writing to your custom script. It may have crashed with an error.

Příčinou tohoto problému může být konec řádku v souboru Pythonu. Mnoho editorů Windows jako konec řádku používá CRLF, ale linuxové aplikace obvykle očekávají LF.

Před nahráním souboru do HDInsight můžete pomocí následujících příkazů PowerShellu odebrat znaky CR:

Write-Progress -Activity $activity -Status "Waiting for the Pig job to complete..."

# Wait for completion or failure of specified job

Skripty PowerShellu

Oba ukázkové skripty PowerShellu použité ke spuštění příkladů obsahují okomentovaný řádek, který zobrazuje výstup chyby pro úlohu. Pokud se pro úlohu nezobrazuje očekávaný výstup, odkomentujte následující řádek a zkontrolujte, jestli informace o chybě značí problém.

$activity="Pig job"

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Starting job..."

Informace o chybě (STDERR) a výsledek úlohy (STDOUT) se také zaznamenávají do vašeho úložiště HDInsight.

Pro tuto práci... Podívejte se na tyto soubory v blob kontejneru.
Hive /HivePython/stderr

/HivePython/stdout

Pig /PigPython/stderr

/PigPython/stdout

Další kroky

Pokud potřebujete načíst moduly Pythonu, které nejsou ve výchozím nastavení k dispozici, přečtěte si, jak nasadit modul do Azure HDInsight.

Další způsoby použití Pig, Hive a další informace o použití MapReduce najdete v následujících dokumentech: