Aangepaste activiteiten gebruiken in een Azure Data Factory- of Azure Synapse Analytics-pijplijn

Van toepassing op: Azure Data Factory Azure Synapse Analytics

Tip

Data Factory in Microsoft Fabric is de volgende generatie van Azure Data Factory, met een eenvoudigere architectuur, ingebouwde AI en nieuwe functies. Als u nieuw bent in gegevensintegratie, begint u met Fabric Data Factory. Bestaande ADF-workloads kunnen upgraden naar Fabric om toegang te krijgen tot nieuwe mogelijkheden voor gegevenswetenschap, realtime analyses en rapportage.

Er zijn twee soorten activiteiten die u kunt gebruiken in een Azure Data Factory- of Synapse-pijplijn.

Als u gegevens wilt verplaatsen naar/van een gegevensarchief dat de service niet ondersteunt of gegevens wilt transformeren/verwerken op een manier die niet wordt ondersteund door de service, kunt u een aangepaste activiteit maken met uw eigen gegevensverplaatsings- of transformatielogica en de activiteit in een pijplijn gebruiken. Met de aangepaste activiteit wordt uw aangepaste codelogica uitgevoerd op een Azure Batch-pool met virtuele machines.

Notitie

U wordt aangeraden de Azure Az PowerShell-module te gebruiken om te communiceren met Azure. Zie Install Azure PowerShell om aan de slag te gaan. Zie Migrate Azure PowerShell van AzureRM naar Az voor meer informatie over het migreren naar de Az PowerShell-module.

Zie de volgende artikelen als u geen gebruik hebt van Azure Batch service:

Belangrijk

Bij het maken van een nieuwe Azure Batch-pool moet VirtualMachineConfiguration worden gebruikt en NIET 'CloudServiceConfiguration'.

Aangepaste activiteiten toevoegen aan een pijplijn met de gebruikersinterface (UI)

Voer de volgende stappen uit om een aangepaste activiteit in een pijplijn te gebruiken:

  1. Zoek naar Custom in het deelvenster Activiteiten van de pijplijn en sleep een Custom-activiteit naar het pijplijncanvas.

  2. Selecteer de nieuwe aangepaste activiteit op het canvas als deze nog niet is geselecteerd.

  3. Selecteer het tabblad Azure Batch om een nieuwe Azure Batch gekoppelde service te selecteren of te maken waarmee de aangepaste activiteit wordt uitgevoerd.

    Toont de gebruikersinterface voor een aangepaste activiteit.

  4. Selecteer het tabblad Settings en geef een opdracht op die moet worden uitgevoerd op de Azure Batch en optionele geavanceerde details.

    Toont de gebruikersinterface voor het tabblad Instellingen voor een aangepaste activiteit.

Azure Batch gekoppelde service

Met de volgende JSON wordt een voorbeeld van een Azure Batch gekoppelde service gedefinieerd. Zie Ondersteunde rekenomgevingen voor meer informatie

{
    "name": "AzureBatchLinkedService",
    "properties": {
        "type": "AzureBatch",
        "typeProperties": {
            "accountName": "batchaccount",
            "accessKey": {
                "type": "SecureString",
                "value": "access key"
            },
            "batchUri": "https://batchaccount.region.batch.azure.com",
            "poolName": "poolname",
            "linkedServiceName": {
                "referenceName": "StorageLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }
}

Voor meer informatie over Azure Batch gekoppelde services, zie het artikel Compute gekoppelde services.

Aangepaste activiteit

Het volgende JSON-fragment definieert een pijplijn met een eenvoudige aangepaste activiteit. De activiteitsdefinitie heeft een verwijzing naar de Azure Batch gekoppelde service.

{
  "name": "MyCustomActivityPipeline",
  "properties": {
    "description": "Custom activity sample",
    "activities": [{
      "type": "Custom",
      "name": "MyCustomActivity",
      "linkedServiceName": {
        "referenceName": "AzureBatchLinkedService",
        "type": "LinkedServiceReference"
      },
      "typeProperties": {
        "command": "helloworld.exe",
        "folderPath": "customactv2/helloworld",
        "resourceLinkedService": {
          "referenceName": "StorageLinkedService",
          "type": "LinkedServiceReference"
        }
      }
    }]
  }
}

In dit voorbeeld is de helloworld.exe een aangepaste toepassing die is opgeslagen in de map customactv2/helloworld van het Azure Storage-account dat wordt gebruikt in de resourceLinkedService. De aangepaste activiteit verzendt deze aangepaste toepassing die moet worden uitgevoerd op Azure Batch. U kunt de opdracht vervangen door elke gewenste toepassing die kan worden uitgevoerd op het doelbesturingssysteem van de Azure Batch Pool-knooppunten.

In de volgende tabel worden namen en beschrijvingen beschreven van eigenschappen die specifiek zijn voor deze activiteit.

Eigenschap Beschrijving Vereist
naam Naam van de activiteit in de pijplijn Ja
beschrijving Tekst die beschrijft wat de activiteit doet. Nee
type Voor aangepaste activiteit is het activiteitstype Aangepast. Ja
naam van gekoppelde service Gekoppelde service voor Azure Batch. Zie het artikel Gekoppelde services berekenen om meer te weten te komen over deze gekoppelde service. Ja
opdracht Opdracht van de aangepaste toepassing die moet worden uitgevoerd. Als de toepassing al beschikbaar is op het Azure Batch-poolknooppunt, kunnen de resourceLinkedService en folderPath worden overgeslagen. U kunt bijvoorbeeld de opdracht opgeven die moet worden cmd /c dir, wat systeemeigen wordt ondersteund door de Windows Batch-poolknooppunt. Ja
resourceLinkedService Azure Storage gekoppelde service aan het opslagaccount waar de aangepaste toepassing is opgeslagen Nee*
folderPath Pad naar de map van de aangepaste toepassing en alle bijbehorende afhankelijkheden

Als u afhankelijkheden hebt opgeslagen in submappen, dat wil gezegd: in een hiërarchische mapstructuur onder folderPath: de mapstructuur wordt momenteel afgevlakt wanneer de bestanden naar Azure Batch worden gekopieerd. Dat wil gezegd, alle bestanden worden gekopieerd naar één map zonder submappen. U kunt dit gedrag omzeilen door de bestanden te comprimeren, het gecomprimeerde bestand te kopiëren en het vervolgens op te heffen met aangepaste code op de gewenste locatie.
Nee*
referentieobjecten Een matrix van bestaande gekoppelde services en gegevenssets. De gekoppelde services en gegevenssets waarnaar wordt verwezen, worden doorgegeven aan de aangepaste toepassing in JSON-indeling, zodat uw aangepaste code kan verwijzen naar resources van de service Nee
uitgebreideEigenschappen Door de gebruiker gedefinieerde eigenschappen die kunnen worden doorgegeven aan de aangepaste toepassing in JSON-indeling, zodat uw aangepaste code kan verwijzen naar aanvullende eigenschappen Nee
retentietijdInDagen De bewaartijd voor de bestanden die zijn verzonden voor aangepaste activiteit. De standaardwaarde is 30 dagen. Nee

* De eigenschappen resourceLinkedService en folderPath moeten beide worden opgegeven of beide worden weggelaten.

Notitie

Als u gekoppelde services doorgeeft als referenceObjects in Custom Activity, is het een goede beveiligingspraktijk om een gekoppelde Azure Key Vault ingeschakelde service door te geven (omdat deze geen beveiligde tekenreeksen bevat) en de referenties rechtstreeks vanuit Key Vault uit de code op te halen met behulp van een geheime naam. U vindt een voorbeeld hier dat verwijst naar een AKV-ingeschakelde gekoppelde service, de referenties ophaalt uit Key Vault en vervolgens de opslag in de code benadert.

Notitie

Momenteel wordt alleen Azure Blob-opslag ondersteund voor resourceLinkedService in aangepaste activiteit. Dit is de enige gekoppelde service die standaard wordt gemaakt en geen optie om andere connectors zoals ADLS Gen2 te kiezen.

Aangepaste activiteitsmachtigingen

Met de aangepaste activiteit wordt het Azure Batch automatisch gebruikersaccount ingesteld op On-beheerderstoegang met taakbereik (de standaardspecificatie voor automatische gebruikers). U kunt het machtigingsniveau van het automatische gebruikersaccount niet wijzigen. Voor meer informatie, zie Taken uitvoeren onder gebruikersaccounts in Batch | Automatische gebruikersaccounts.

Opdrachten uitvoeren

U kunt een opdracht rechtstreeks uitvoeren met behulp van Aangepaste activiteit. In het volgende voorbeeld wordt de opdracht 'echo hello world' uitgevoerd op de doel-Azure Batch Pool-knooppunten en wordt de uitvoer afgedrukt naar stdout.

{
  "name": "MyCustomActivity",
  "properties": {
    "description": "Custom activity sample",
    "activities": [{
      "type": "Custom",
      "name": "MyCustomActivity",
      "linkedServiceName": {
        "referenceName": "AzureBatchLinkedService",
        "type": "LinkedServiceReference"
      },
      "typeProperties": {
        "command": "cmd /c echo hello world"
      }
    }]
  }
}

Objecten en eigenschappen doorgeven

In dit voorbeeld ziet u hoe u de referenceObjects en extendedProperties kunt gebruiken om objecten en door de gebruiker gedefinieerde eigenschappen van de service door te geven aan uw aangepaste toepassing.

{
  "name": "MyCustomActivityPipeline",
  "properties": {
    "description": "Custom activity sample",
    "activities": [{
      "type": "Custom",
      "name": "MyCustomActivity",
      "linkedServiceName": {
        "referenceName": "AzureBatchLinkedService",
        "type": "LinkedServiceReference"
      },
      "typeProperties": {
        "command": "SampleApp.exe",
        "folderPath": "customactv2/SampleApp",
        "resourceLinkedService": {
          "referenceName": "StorageLinkedService",
          "type": "LinkedServiceReference"
        },
        "referenceObjects": {
          "linkedServices": [{
            "referenceName": "AzureBatchLinkedService",
            "type": "LinkedServiceReference"
          }]
        },
        "extendedProperties": {          
          "connectionString": {
            "type": "SecureString",
            "value": "aSampleSecureString"
          },
          "PropertyBagPropertyName1": "PropertyBagValue1",
          "propertyBagPropertyName2": "PropertyBagValue2",
          "dateTime1": "2015-04-12T12:13:14Z"
        }
      }
    }]
  }
}

Wanneer de activiteit wordt uitgevoerd, worden referenceObjects en extendedProperties opgeslagen in de volgende bestanden die zijn geïmplementeerd in dezelfde uitvoeringsmap van de SampleApp.exe:

  • activity.json

    Slaat de extendedProperties en eigenschappen van de aangepaste activiteit op.

  • linkedServices.json

    Slaat een matrix van gekoppelde services op die zijn gedefinieerd in de eigenschap referenceObjects.

  • datasets.json

    Slaat een matrix op van gegevenssets die zijn gedefinieerd in de eigenschap referenceObjects.

De volgende voorbeeldcode laat zien hoe de SampleApp.exe toegang heeft tot de vereiste gegevens uit JSON-bestanden:

using Newtonsoft.Json;
using System;
using System.IO;

namespace SampleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            //From Extend Properties
            dynamic activity = JsonConvert.DeserializeObject(File.ReadAllText("activity.json"));
            Console.WriteLine(activity.typeProperties.extendedProperties.connectionString.value);

            // From LinkedServices
            dynamic linkedServices = JsonConvert.DeserializeObject(File.ReadAllText("linkedServices.json"));
            Console.WriteLine(linkedServices[0].properties.typeProperties.accountName);
        }
    }
}

Uitvoerresultaten ophalen

U kunt een pijplijnuitvoering starten met behulp van de volgende PowerShell-opdracht:

$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName

Wanneer de pijplijn wordt uitgevoerd, kunt u de uitvoeringen controleren met de volgende opdrachten:

while ($True) {
    $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)

    if(!$result) {
        Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
    }
    elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
        Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
    }
    else {
        Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
        $result
        break
    }
    ($result | Format-List | Out-String)
    Start-Sleep -Seconds 15
}

Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
$result.Output -join "`r`n"

Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
$result.Error -join "`r`n"

De stdout en stderr van uw aangepaste toepassing worden opgeslagen in de adfjobs container in de Azure Storage Gekoppelde service die u hebt gedefinieerd bij het maken van Azure Batch Gekoppelde service met een GUID van de taak. U kunt het volledige pad vinden in de activiteitsuitvoer zoals weergegeven in het volgende fragment.

Pipeline ' MyCustomActivity' run finished. Result:

ResourceGroupName : resourcegroupname
DataFactoryName   : datafactoryname
ActivityName      : MyCustomActivity
PipelineRunId     : xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
PipelineName      : MyCustomActivity
Input             : {command}
Output            : {exitcode, outputs, effectiveIntegrationRuntime}
LinkedServiceName :
ActivityRunStart  : 10/5/2017 3:33:06 PM
ActivityRunEnd    : 10/5/2017 3:33:28 PM
DurationInMs      : 21203
Status            : Succeeded
Error             : {errorCode, message, failureType, target}

Activity Output section:
"exitcode": 0
"outputs": [
  "https://<container>.blob.core.windows.net/adfjobs/<GUID>/output/stdout.txt",
  "https://<container>.blob.core.windows.net/adfjobs/<GUID>/output/stderr.txt"
]
"effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
Activity Error section:
"errorCode": ""
"message": ""
"failureType": ""
"target": "MyCustomActivity"

Als u de inhoud van stdout.txt in downstreamactiviteiten wilt gebruiken, kunt u het pad naar het stdout.txt bestand ophalen in de expressie '@activity('MyCustomActivity').output.outputs[0]'.

Belangrijk

  • De activity.json, linkedServices.json en datasets.json worden opgeslagen in de runtimemap van de Batch-taak. In dit voorbeeld worden de bestanden activity.json, linkedServices.json en datasets.json opgeslagen in deze locatie https://adfv2storage.blob.core.windows.net/adfjobs/<GUID>/runtime/. Indien nodig moet u ze afzonderlijk opschonen.
  • Voor gekoppelde services die gebruikmaken van de Self-Hosted Integration Runtime, worden de gevoelige informatie, zoals sleutels of wachtwoorden, versleuteld door de Self-Hosted Integration Runtime om ervoor te zorgen dat referenties in de door de klant gedefinieerde privénetwerkomgeving blijven. Sommige gevoelige velden kunnen ontbreken wanneer er op deze manier naar uw aangepaste toepassingscode wordt verwezen. Gebruik SecureString in extendedProperties in plaats van indien nodig verwijzingen naar gekoppelde services te gebruiken.

Uitvoer doorgeven aan een andere activiteit

U kunt aangepaste waarden vanuit uw code in een aangepaste activiteit terugsturen naar de service. U kunt dit doen door ze naar outputs.json te schrijven vanuit uw toepassing. De service kopieert de inhoud van outputs.json en voegt deze toe aan de activiteitsuitvoer als de waarde van de customOutput eigenschap. (De groottelimiet is 2 MB.) Als u de inhoud van outputs.json in downstreamactiviteiten wilt gebruiken, kunt u de waarde ophalen met behulp van de expressie @activity('<MyCustomActivity>').output.customOutput.

SecureString-uitvoer ophalen

Gevoelige eigenschapswaarden die zijn aangewezen als type SecureString, zoals wordt weergegeven in sommige voorbeelden in dit artikel, worden gemaskeerd op het tabblad Bewaking in de gebruikersinterface. Bij de uitvoering van de pijplijn wordt echter een SecureString-eigenschap als platte tekst geserialiseerd in het activity.json bestand als JSON. Voorbeeld:

"extendedProperties": {
  "connectionString": {
    "type": "SecureString",
    "value": "aSampleSecureString"
  }
}

Deze serialisatie is niet echt veilig en is niet bedoeld om veilig te zijn. Het doel is een aanwijzing voor de dienst om de waarde op het tabblad Bewaking te maskeren.

Als u toegang wilt krijgen tot eigenschappen van het type SecureString vanuit een aangepaste activiteit, leest u het activity.json bestand, dat in dezelfde map wordt geplaatst als uw .EXE, deserialiseert u de JSON en opent u vervolgens de JSON-eigenschap (extendedProperties => [propertyName] => value).

Automatisch schalen van Azure Batch

U kunt ook een Azure Batch-pool maken met de functie autoscale. U kunt bijvoorbeeld een Azure batchgroep maken met 0 toegewezen VM's en een formule voor automatisch schalen op basis van het aantal taken dat in behandeling is.

De voorbeeldformule hier bereikt het volgende gedrag: Wanneer de pool in eerste instantie wordt gemaakt, begint deze met 1 VM. $PendingTasks metrie definieert het aantal taken in de status 'uitvoerend' + 'actief' (in wachtrij). De formule zoekt het gemiddelde aantal taken in behandeling in de afgelopen 180 seconden en stelt TargetDedicated dienovereenkomstig in. Het zorgt ervoor dat TargetDedicated nooit meer dan 25 VM's overschrijdt. Naarmate er nieuwe taken worden verzonden, groeit de pool automatisch en wanneer taken zijn voltooid, komen VM's één voor één vrij en krimpt het autoschalen deze VM's. startingNumberOfVMs en maxNumberofVMs kunnen worden aangepast aan uw behoeften.

Formule voor automatisch schalen:

startingNumberOfVMs = 1;
maxNumberofVMs = 25;
pendingTaskSamplePercent = $PendingTasks.GetSamplePercent(180 * TimeInterval_Second);
pendingTaskSamples = pendingTaskSamplePercent < 70 ? startingNumberOfVMs : avg($PendingTasks.GetSample(180 * TimeInterval_Second));
$TargetDedicated=min(maxNumberofVMs,pendingTaskSamples);

Zie Automatisch rekenknooppunten schalen in een Azure Batch-pool voor meer informatie.

Als de pool gebruikmaakt van de standaard autoScaleEvaluationInterval, kan de Batch-service 15-30 minuten duren om de VIRTUELE machine voor te bereiden voordat de aangepaste activiteit wordt uitgevoerd. Als de pool een andere autoScaleEvaluationInterval gebruikt, kan de Batch-service autoScaleEvaluationInterval + 10 minuten duren.

Zie de volgende artikelen waarin wordt uitgelegd hoe u gegevens op andere manieren kunt transformeren: