Używanie działań niestandardowych w potoku Azure Data Factory lub Azure Synapse Analytics

DOTYCZY: Azure Data Factory Azure Synapse Analytics

Wskazówka

Data Factory w usłudze Microsoft Fabric jest następną generacją Azure Data Factory z prostszą architekturą, wbudowaną sztuczną inteligencją i nowymi funkcjami. Jeśli dopiero zaczynasz integrować dane, zacznij od Fabric Data Factory. Istniejące obciążenia ADF można zaktualizować do Fabric, aby uzyskać dostęp do nowych możliwości w zakresie nauki o danych, analiz w czasie rzeczywistym oraz raportowania.

Istnieją dwa typy czynności, których można użyć w kanale Azure Data Factory lub Synapse.

Aby przenieść dane do/z magazynu danych, którego usługa nie obsługuje, lub przekształcić lub przetworzyć dane w sposób, który nie jest obsługiwany przez usługę, możesz utworzyć niestandardowe działanie z własną logiką przenoszenia lub przekształcania danych i używać tego działania w przepływie pracy. Działanie niestandardowe uruchamia dostosowaną logikę kodu na Azure Batch puli maszyn wirtualnych.

Uwaga

Zalecamy użycie modułu Azure Az programu PowerShell do interakcji z Azure. Aby rozpocząć, zobacz Install Azure PowerShell. Aby dowiedzieć się, jak przeprowadzić migrację do modułu Az programu PowerShell, zobacz Migrate Azure PowerShell z modułu AzureRM do modułu Az.

Jeśli dopiero zaczynasz korzystać z usługi Azure Batch, zobacz następujące artykuły:

Ważne

Podczas tworzenia nowej puli Azure Batch należy użyć "VirtualMachineConfiguration" i NIE "CloudServiceConfiguration".

Dodawanie działań niestandardowych do potoku za pomocą interfejsu użytkownika

Aby użyć aktywności niestandardowej w potoku, wykonaj następujące kroki:

  1. Wyszukaj ciąg Niestandardowy w okienku Działania potoku i przeciągnij działanie niestandardowe na kanwę potoku.

  2. Wybierz nowe działanie niestandardowe na płótnie, jeśli nie zostało jeszcze wybrane.

  3. Wybierz kartę Azure Batch aby wybrać lub utworzyć nową połączoną usługę Azure Batch, która wykona działanie niestandardowe.

    Pokazuje interfejs użytkownika dla działania niestandardowego.

  4. Wybierz kartę Settings i określ polecenie do wykonania na Azure Batch i opcjonalne szczegóły zaawansowane.

    Pokazuje interfejs użytkownika dla karty Ustawienia dla działania niestandardowego.

połączona usługa Azure Batch

Poniższy kod JSON definiuje przykładową usługę powiązaną Azure Batch. Aby uzyskać szczegółowe informacje, zobacz Obsługiwane środowiska obliczeniowe

{
    "name": "AzureBatchLinkedService",
    "properties": {
        "type": "AzureBatch",
        "typeProperties": {
            "accountName": "batchaccount",
            "accessKey": {
                "type": "SecureString",
                "value": "access key"
            },
            "batchUri": "https://batchaccount.region.batch.azure.com",
            "poolName": "poolname",
            "linkedServiceName": {
                "referenceName": "StorageLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }
}

Aby dowiedzieć się więcej na temat połączonej usługi Azure Batch, zobacz artykuł Połączone usługi Compute.

Działanie niestandardowe

Poniższy fragment kodu JSON definiuje potok danych z prostą Aktywnością Niestandardową. Definicja działania zawiera odwołanie do połączonej usługi Azure Batch.

{
  "name": "MyCustomActivityPipeline",
  "properties": {
    "description": "Custom activity sample",
    "activities": [{
      "type": "Custom",
      "name": "MyCustomActivity",
      "linkedServiceName": {
        "referenceName": "AzureBatchLinkedService",
        "type": "LinkedServiceReference"
      },
      "typeProperties": {
        "command": "helloworld.exe",
        "folderPath": "customactv2/helloworld",
        "resourceLinkedService": {
          "referenceName": "StorageLinkedService",
          "type": "LinkedServiceReference"
        }
      }
    }]
  }
}

W tym przykładzie helloworld.exe to aplikacja niestandardowa przechowywana w folderze customactv2/helloworld konta Azure Storage używanego w usłudze resourceLinkedService. Działanie niestandardowe przesyła tę aplikację niestandardową do wykonania w Azure Batch. Polecenie można zastąpić dowolną preferowaną aplikacją, która może być wykonywana w docelowym systemie operacyjnym węzłów puli Azure Batch.

W poniższej tabeli opisano nazwy i opisy właściwości specyficznych dla tego działania.

Właściwość Opis Wymagane
nazwa Nazwa działania w rurociągu Yes
opis Tekst opisujący działanie. Nie
typ W przypadku działania niestandardowego typ działania to Niestandardowy. Yes
linkedServiceName Połączona usługa z Azure Batch. Aby dowiedzieć się więcej o tej połączonej usłudze, zobacz artykuł Dotyczący połączonych usług obliczeniowych . Yes
polecenie Polecenie do wykonania dla aplikacji niestandardowej. Jeśli aplikacja jest już dostępna w węźle puli Azure Batch, można pominąć parametr resourceLinkedService i folderPath. Można na przykład określić polecenie cmd /c dir, które jest natywnie obsługiwane przez węzeł puli Windows Batch. Yes
resourceLinkedService Połączona usługa Azure Storage do konta magazynu, na którym przechowywana jest aplikacja niestandardowa Nie*
folderPath Ścieżka do folderu aplikacji niestandardowej i wszystkich jej zależności

Jeśli masz zależności przechowywane w podfolderach — czyli w strukturze folderów hierarchicznych w folderPath — struktura folderów jest obecnie spłaszczana, gdy pliki są kopiowane do Azure Batch. Oznacza to, że wszystkie pliki są kopiowane do jednego folderu bez podfolderów. Aby obejść to zachowanie, rozważ skompresowanie plików, skopiowanie skompresowanego pliku, a następnie rozpakowanie go przy użyciu kodu niestandardowego w żądanej lokalizacji.
Nie*
obiekty referencyjne Tablica istniejących połączonych usług i zestawów danych. Przywoływane połączone usługi i zestawy danych są przekazywane do aplikacji niestandardowej w formacie JSON, dzięki czemu kod niestandardowy może odwoływać się do zasobów usługi Nie
rozszerzoneWłaściwości Właściwości zdefiniowane przez użytkownika, które można przekazać do aplikacji niestandardowej w formacie JSON, aby kod niestandardowy mógł odwoływać się do dodatkowych właściwości Nie
czasPrzechowywaniaWDniach Czas przechowywania plików przesłanych do aktywności niestandardowej. Wartość domyślna to 30 dni. Nie

* Właściwości resourceLinkedService i folderPath muszą zostać określone albo oba te właściwości zostaną pominięte.

Uwaga

Jeśli przekazujesz połączone usługi jako referenceObjects w działaniu niestandardowym, dobrą praktyką bezpieczeństwa jest użycie połączonej usługi z włączonym Azure Key Vault (ponieważ nie zawiera żadnych bezpiecznych ciągów) i pobranie poświadczeń, stosując nazwę sekretu, bezpośrednio z Key Vault w kodzie. Możesz znaleźć przykład tutaj, który odnosi się do usługi połączonej z AKV, pobiera dane uwierzytelniające z Key Vault, a następnie uzyskuje dostęp do magazynu w kodzie.

Uwaga

Obecnie tylko usługa Blob Storage Azure jest obsługiwana dla zasobuLinkedService w działaniu niestandardowym i jest to jedyna połączona usługa, która jest tworzona domyślnie i nie ma opcji wyboru innych łączników, takich jak ADLS Gen2.

Uprawnienia do działań niestandardowych

Działanie niestandardowe ustawia konto użytkownika automatycznego Azure Batch na Dostęp nieadministracyjny z zakresem zadań (domyślna specyfikacja użytkownika automatycznego). Nie można zmienić poziomu uprawnień konta użytkownika automatycznego. Aby uzyskać więcej informacji, zobacz Uruchamianie zadań w ramach kont użytkowników w usłudze Batch | Konta użytkowników automatycznych.

Wykonywanie poleceń

Polecenie można wykonać bezpośrednio przy użyciu Aktywności Niestandardowej. Poniższy przykład uruchamia polecenie "echo hello world" na docelowych węzłach puli Azure Batch i wyświetla dane wyjściowe do stdout.

{
  "name": "MyCustomActivity",
  "properties": {
    "description": "Custom activity sample",
    "activities": [{
      "type": "Custom",
      "name": "MyCustomActivity",
      "linkedServiceName": {
        "referenceName": "AzureBatchLinkedService",
        "type": "LinkedServiceReference"
      },
      "typeProperties": {
        "command": "cmd /c echo hello world"
      }
    }]
  }
}

Przekazywanie obiektów i właściwości

W tym przykładzie pokazano, jak za pomocą obiektów referenceObjects i extendedProperties przekazywać obiekty i właściwości zdefiniowane przez użytkownika z usługi do aplikacji niestandardowej.

{
  "name": "MyCustomActivityPipeline",
  "properties": {
    "description": "Custom activity sample",
    "activities": [{
      "type": "Custom",
      "name": "MyCustomActivity",
      "linkedServiceName": {
        "referenceName": "AzureBatchLinkedService",
        "type": "LinkedServiceReference"
      },
      "typeProperties": {
        "command": "SampleApp.exe",
        "folderPath": "customactv2/SampleApp",
        "resourceLinkedService": {
          "referenceName": "StorageLinkedService",
          "type": "LinkedServiceReference"
        },
        "referenceObjects": {
          "linkedServices": [{
            "referenceName": "AzureBatchLinkedService",
            "type": "LinkedServiceReference"
          }]
        },
        "extendedProperties": {          
          "connectionString": {
            "type": "SecureString",
            "value": "aSampleSecureString"
          },
          "PropertyBagPropertyName1": "PropertyBagValue1",
          "propertyBagPropertyName2": "PropertyBagValue2",
          "dateTime1": "2015-04-12T12:13:14Z"
        }
      }
    }]
  }
}

Po wykonaniu działania obiekty referenceObjects i extendedProperties są przechowywane w następujących plikach wdrożonych w tym samym folderze wykonywania SampleApp.exe:

  • activity.json

    Przechowuje właściwości rozszerzone i właściwości działania niestandardowego.

  • linkedServices.json

    Przechowuje tablicę połączonych usług zdefiniowanych we właściwości referenceObjects.

  • datasets.json

    Przechowuje tablicę zestawów danych zdefiniowanych we właściwości referenceObjects.

Poniższy przykładowy kod pokazuje, jak SampleApp.exe może uzyskać dostęp do wymaganych informacji z plików JSON:

using Newtonsoft.Json;
using System;
using System.IO;

namespace SampleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            //From Extend Properties
            dynamic activity = JsonConvert.DeserializeObject(File.ReadAllText("activity.json"));
            Console.WriteLine(activity.typeProperties.extendedProperties.connectionString.value);

            // From LinkedServices
            dynamic linkedServices = JsonConvert.DeserializeObject(File.ReadAllText("linkedServices.json"));
            Console.WriteLine(linkedServices[0].properties.typeProperties.accountName);
        }
    }
}

Pobieranie wyników eksperacji

Można uruchomić potok przy użyciu następującego polecenia PowerShell:

$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName

Po uruchomieniu potoku możesz sprawdzić wynik wykonania przy użyciu następujących poleceń:

while ($True) {
    $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)

    if(!$result) {
        Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
    }
    elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
        Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
    }
    else {
        Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
        $result
        break
    }
    ($result | Format-List | Out-String)
    Start-Sleep -Seconds 15
}

Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
$result.Output -join "`r`n"

Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
$result.Error -join "`r`n"

stdout i stderr twojej aplikacji niestandardowej są zapisywane do kontenera adfjobs w usłudze Azure Storage połączonej za pomocą zdefiniowanej przez Ciebie usługi połączonej Azure Batch z identyfikatorem GUID zadania. Szczegółową ścieżkę można uzyskać z danych wyjściowych uruchomienia działania, jak pokazano w poniższym fragmencie kodu:

Pipeline ' MyCustomActivity' run finished. Result:

ResourceGroupName : resourcegroupname
DataFactoryName   : datafactoryname
ActivityName      : MyCustomActivity
PipelineRunId     : xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
PipelineName      : MyCustomActivity
Input             : {command}
Output            : {exitcode, outputs, effectiveIntegrationRuntime}
LinkedServiceName :
ActivityRunStart  : 10/5/2017 3:33:06 PM
ActivityRunEnd    : 10/5/2017 3:33:28 PM
DurationInMs      : 21203
Status            : Succeeded
Error             : {errorCode, message, failureType, target}

Activity Output section:
"exitcode": 0
"outputs": [
  "https://<container>.blob.core.windows.net/adfjobs/<GUID>/output/stdout.txt",
  "https://<container>.blob.core.windows.net/adfjobs/<GUID>/output/stderr.txt"
]
"effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
Activity Error section:
"errorCode": ""
"message": ""
"failureType": ""
"target": "MyCustomActivity"

Jeśli chcesz korzystać z zawartości pliku stdout.txt w dalszych działaniach, możesz uzyskać ścieżkę do pliku stdout.txt w wyrażeniu "@activity('MyCustomActivity').output.outputs[0]".

Ważne

  • activity.json, linkedServices.jsoni datasets.json są przechowywane w folderze uruchomieniowym zadania usługi Batch. W tym przykładzie activity.json, linkedServices.jsoni datasets.json są przechowywane w https://adfv2storage.blob.core.windows.net/adfjobs/<GUID>/runtime/ ścieżce. W razie potrzeby należy je wyczyścić oddzielnie.
  • W przypadku połączonych usług korzystających z Self-Hosted Integration Runtime poufne informacje, takie jak klucze lub hasła, są szyfrowane przez Self-Hosted Integration Runtime w celu zapewnienia, że poświadczenia pozostają w środowisku sieci prywatnej zdefiniowanej przez klienta. Niektórych poufnych pól może brakować, jeśli w ten sposób odwołuje się do kodu aplikacji niestandardowej. W razie potrzeby użyj SecureString w extendedProperties zamiast odwołania do usługi powiązanej.

Przekazywanie danych wyjściowych do innego działania

Możesz wysyłać wartości niestandardowe z kodu w działaniu niestandardowym z powrotem do usługi. Możesz to zrobić, zapisując je w outputs.json aplikacji. Usługa kopiuje zawartość outputs.json i dołącza ją do wyników działania jako wartość właściwości customOutput. (Limit rozmiaru wynosi 2 MB.) Jeśli chcesz korzystać z zawartości outputs.json w działaniach podrzędnych, możesz uzyskać wartość przy użyciu wyrażenia @activity('<MyCustomActivity>').output.customOutput.

Pobieranie danych wyjściowych funkcji SecureString

Poufne wartości właściwości wyznaczone jako typ SecureString, jak wskazano w niektórych przykładach w tym artykule, są ukrywane na karcie Monitorowanie w interfejsie użytkownika. Jednak w rzeczywistym wykonaniu potoku właściwość SecureString jest serializowana jako JSON w activity.json pliku jako zwykły tekst. Przykład:

"extendedProperties": {
  "connectionString": {
    "type": "SecureString",
    "value": "aSampleSecureString"
  }
}

Ta serializacja nie jest naprawdę bezpieczna i nie ma być bezpieczna. Intencją jest wskazówka dla usługi maskowania wartości na karcie Monitorowanie.

Aby uzyskać dostęp do właściwości typu SecureString z działania niestandardowego, odczytaj plik activity.json, który znajduje się w tym samym folderze co .EXE, zdeserializuj JSON-a, a następnie uzyskaj dostęp do jego właściwości (extendedProperties => [propertyName] => wartość).

Automatyczne skalowanie Azure Batch

Możesz również utworzyć pulę Azure Batch za pomocą funkcji autoscale. Można na przykład utworzyć pulę wsadową Azure z zerem dedykowanych maszyn wirtualnych i formułą autoskalowania opartą na liczbie oczekujących zadań.

Przykładowa formuła w tym miejscu osiąga następujące zachowanie: po początkowym utworzeniu puli rozpoczyna się od 1 maszyny wirtualnej. $PendingTasks metryka definiuje liczbę zadań w stanie uruchomienia i aktywnym (w kolejce). Formuła znajduje średnią liczbę oczekujących zadań w ciągu ostatnich 180 sekund i ustawia wartość TargetDedicated odpowiednio. Gwarantuje to, że targetDedicated nigdy nie wykracza poza 25 maszyn wirtualnych. W miarę przesyłania nowych zadań pula automatycznie rośnie, a wraz z kończeniem zadań maszyny wirtualne zwalniają się pojedynczo, a automatyczne skalowanie zmniejsza ich liczbę. startNumberOfVMs i maxNumberofVMs można dostosować do Twoich potrzeb.

Formuła autoskalowania:

startingNumberOfVMs = 1;
maxNumberofVMs = 25;
pendingTaskSamplePercent = $PendingTasks.GetSamplePercent(180 * TimeInterval_Second);
pendingTaskSamples = pendingTaskSamplePercent < 70 ? startingNumberOfVMs : avg($PendingTasks.GetSample(180 * TimeInterval_Second));
$TargetDedicated=min(maxNumberofVMs,pendingTaskSamples);

Aby uzyskać szczegółowe informacje, zobacz Automatycznie skaluj węzły obliczeniowe w puli Azure Batch.

Jeśli pula używa domyślnego autoScaleEvaluationInterval, usługa Batch może zająć 15–30 minut na przygotowanie maszyny wirtualnej, zanim uruchomi działanie niestandardowe. Jeśli pula używa innego autoScaleEvaluationInterval, usługa Batch może potrwać autoScaleEvalInterval + 10 minut.

Zapoznaj się z następującymi artykułami, które wyjaśniają sposób przekształcania danych na inne sposoby: