Używanie działań niestandardowych w potoku Azure Data Factory w wersji 1

Uwaga

Ten artykuł dotyczy wersji 1 usługi Data Factory. Jeśli używasz bieżącej wersji usługi Data Factory, zobacz Działania niestandardowe w wersji 2.

Istnieją dwa typy działań, których można użyć w potoku Azure Data Factory.

Aby przenieść dane do/z magazynu danych, do którego usługa Data Factory nie obsługuje, utwórz działanie niestandardowe z własną logiką przenoszenia danych i użyj działania w potoku. Podobnie w celu przekształcania/przetwarzania danych w sposób, który nie jest obsługiwany przez usługę Data Factory, utwórz działanie niestandardowe z własną logiką przekształcania danych i użyj działania w potoku.

Możesz skonfigurować działanie niestandardowe do uruchamiania w puli Azure Batch maszyn wirtualnych. W przypadku korzystania z Azure Batch można użyć tylko istniejącej puli Azure Batch.

Poniższy przewodnik zawiera instrukcje krok po kroku dotyczące tworzenia niestandardowego działania platformy .NET i używania niestandardowego działania w potoku. Przewodnik używa połączonej usługi Azure Batch.

Ważne

  • Nie można użyć bramy Zarządzanie danymi z działania niestandardowego w celu uzyskania dostępu do lokalnych źródeł danych. Obecnie usługa Zarządzanie danymi Gateway obsługuje tylko działanie kopiowania i działanie procedury składowanej w usłudze Data Factory.

Przewodnik: tworzenie działania niestandardowego

Wymagania wstępne

Azure Batch wymagania wstępne

W przewodniku uruchamiasz niestandardowe działania platformy .NET przy użyciu Azure Batch jako zasobu obliczeniowego. Azure Batch to usługa platformy do wydajnego uruchamiania aplikacji równoległych i obliczeń o wysokiej wydajności (HPC) na dużą skalę w chmurze. Azure Batch planuje wykonywanie prac intensywnie korzystających z obliczeń w ramach zarządzanej kolekcji maszyn wirtualnych i umożliwia automatyczne skalowanie zasobów obliczeniowych w celu spełnienia wymagań zadań. Aby uzyskać szczegółowe omówienie usługi Azure Batch, zobacz artykuł Azure Batch Basics (Podstawy).

W tym samouczku utwórz konto Azure Batch z pulą maszyn wirtualnych. Oto konkretne kroki:

  1. Utwórz konto Azure Batch przy użyciu Azure Portal. Aby uzyskać instrukcje, zobacz Tworzenie konta Azure Batch i zarządzanie nim.

  2. Zanotuj nazwę konta Azure Batch, klucz konta, identyfikator URI i nazwę puli. Musisz utworzyć połączoną usługę Azure Batch.

    1. Na stronie głównej konta Azure Batch zostanie wyświetlony adres URL w następującym formacie: https://myaccount.westus.batch.azure.com. W tym przykładzie myaccount jest nazwą konta Azure Batch. Identyfikator URI używany w definicji połączonej usługi to adres URL bez nazwy konta. Na przykład: https://<region>.batch.azure.com.
    2. Kliknij pozycję Klucze w menu po lewej stronie i skopiuj KLUCZ DOSTĘPU PODSTAWOWEgo.
    3. Aby użyć istniejącej puli, kliknij pozycję Pule w menu i zanotuj identyfikator puli. Jeśli nie masz istniejącej puli, przejdź do następnego kroku.
  3. Utwórz pulę Azure Batch.

    1. W Azure Portal kliknij pozycję Przeglądaj w menu po lewej stronie, a następnie kliknij pozycję Konta usługi Batch.
    2. Wybierz konto Azure Batch, aby otworzyć blok Konto usługi Batch.
    3. Kliknij kafelek Pule .
    4. W bloku Pule kliknij przycisk Dodaj na pasku narzędzi, aby dodać pulę.
      1. Wprowadź identyfikator puli (identyfikator puli). Zanotuj identyfikator puli; jest on potrzebny podczas tworzenia rozwiązania usługi Data Factory.
      2. Określ Windows Server 2012 R2 dla ustawienia Rodzina systemów operacyjnych.
      3. Wybierz warstwę cenową węzła.
      4. Wprowadź wartość 2 jako wartość ustawienia Docelowa dedykowana .
      5. Wprowadź wartość 2 jako wartość ustawienia Maksymalna liczba zadań na węzeł .
    5. Kliknij przycisk OK, aby utworzyć pulę.
    6. Zanotuj identyfikator puli.

Kroki ogólne

Poniżej przedstawiono dwa ogólne kroki wykonywane w ramach tego przewodnika:

  1. Utwórz działanie niestandardowe zawierające prostą logikę przekształcania/przetwarzania danych.
  2. Utwórz fabrykę danych platformy Azure przy użyciu potoku, który używa działania niestandardowego.

Tworzenie działania niestandardowego

Aby utworzyć działanie niestandardowe platformy .NET, utwórz projekt Biblioteka klas platformy .NET z klasą implementowaną przez interfejs IDotNetActivity . Ten interfejs ma tylko jedną metodę: Wykonaj , a jego podpis to:

public IDictionary<string, string> Execute(
    IEnumerable<LinkedService> linkedServices,
    IEnumerable<Dataset> datasets,
    Activity activity,
    IActivityLogger logger)

Metoda przyjmuje cztery parametry:

  • linkedServices. Ta właściwość jest wyliczalną listą połączonych usług magazynu danych, do których odwołują się zestawy danych wejściowych/wyjściowych dla działania.
  • zestawy danych. Ta właściwość jest wyliczalną listą zestawów danych wejściowych/wyjściowych dla działania. Tego parametru można użyć do pobrania lokalizacji i schematów zdefiniowanych przez wejściowe i wyjściowe zestawy danych.
  • działanie. Ta właściwość reprezentuje bieżące działanie. Może służyć do uzyskiwania dostępu do rozszerzonych właściwości skojarzonych z działaniem niestandardowym. Aby uzyskać szczegółowe informacje, zobacz Uzyskiwanie dostępu do właściwości rozszerzonych .
  • rejestrator. Ten obiekt umożliwia zapisywanie komentarzy debugowania, które są wyświetlane w dzienniku użytkownika dla potoku.

Metoda zwraca słownik, który może służyć do łączenia działań niestandardowych w przyszłości. Ta funkcja nie jest jeszcze zaimplementowana, dlatego zwraca pusty słownik z metody .

Procedura

  1. Utwórz projekt biblioteki klas platformy .NET .

  2. Kliknij pozycję Narzędzia, wskaż pozycję Menedżer pakietów NuGet, a następnie kliknij pozycję Konsola menedżera pakietów.

  3. W konsoli Menedżera pakietów wykonaj następujące polecenie, aby zaimportować Microsoft. Azure.Management.DataFactories.

    Install-Package Microsoft.Azure.Management.DataFactories
    
  4. Zaimportuj pakiet NuGet usługi Azure Storage do projektu.

    Install-Package WindowsAzure.Storage -Version 4.3.0
    

    Ważne

    Uruchamianie usługi Data Factory wymaga wersji 4.3 systemu WindowsAzure.Storage. Jeśli dodasz odwołanie do nowszej wersji zestawu usługi Azure Storage w projekcie działań niestandardowych, po wykonaniu działania zostanie wyświetlony błąd. Aby rozwiązać ten problem, zobacz sekcję Izolacja w domenie aplikacji .

  5. Dodaj następujące instrukcje using do pliku źródłowego w projekcie.

    
    // Comment these lines if using VS 2017
    using System.IO;
    using System.Globalization;
    using System.Diagnostics;
    using System.Linq;
    // --------------------
    
    // Comment these lines if using <= VS 2015
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    // ---------------------
    
    using Microsoft.Azure.Management.DataFactories.Models;
    using Microsoft.Azure.Management.DataFactories.Runtime;
    
    using Microsoft.WindowsAzure.Storage;
    using Microsoft.WindowsAzure.Storage.Blob;
    
  6. Zmień nazwę przestrzeni nazw na MyDotNetActivityNS.

    namespace MyDotNetActivityNS
    
  7. Zmień nazwę klasy na MyDotNetActivity i utwórz ją z interfejsu IDotNetActivity , jak pokazano w poniższym fragmencie kodu:

    public class MyDotNetActivity : IDotNetActivity
    
  8. Zaimplementuj (Dodaj) metodę Execute interfejsu IDotNetActivity do klasy MyDotNetActivity i skopiuj następujący przykładowy kod do metody .

    Poniższy przykład zlicza liczbę wystąpień terminu wyszukiwania ("Microsoft") w każdym obiekcie blob skojarzonym z wycinkiem danych.

    /// <summary>
    /// Execute method is the only method of IDotNetActivity interface you must implement.
    /// In this sample, the method invokes the Calculate method to perform the core logic.
    /// </summary>
    
    public IDictionary<string, string> Execute(
        IEnumerable<LinkedService> linkedServices,
        IEnumerable<Dataset> datasets,
        Activity activity,
        IActivityLogger logger)
    {
        // get extended properties defined in activity JSON definition
        // (for example: SliceStart)
        DotNetActivity dotNetActivity = (DotNetActivity)activity.TypeProperties;
        string sliceStartString = dotNetActivity.ExtendedProperties["SliceStart"];
    
        // to log information, use the logger object
        // log all extended properties
        IDictionary<string, string> extendedProperties = dotNetActivity.ExtendedProperties;
        logger.Write("Logging extended properties if any...");
        foreach (KeyValuePair<string, string> entry in extendedProperties)
        {
            logger.Write("<key:{0}> <value:{1}>", entry.Key, entry.Value);
        }
    
        // linked service for input and output data stores
        // in this example, same storage is used for both input/output
        AzureStorageLinkedService inputLinkedService;
    
        // get the input dataset
        Dataset inputDataset = datasets.Single(dataset => dataset.Name == activity.Inputs.Single().Name);
    
        // declare variables to hold type properties of input/output datasets
        AzureBlobDataset inputTypeProperties, outputTypeProperties;
    
        // get type properties from the dataset object
        inputTypeProperties = inputDataset.Properties.TypeProperties as AzureBlobDataset;
    
        // log linked services passed in linkedServices parameter
        // you will see two linked services of type: AzureStorage
        // one for input dataset and the other for output dataset
        foreach (LinkedService ls in linkedServices)
            logger.Write("linkedService.Name {0}", ls.Name);
    
        // get the first Azure Storage linked service from linkedServices object
        // using First method instead of Single since we are using the same
        // Azure Storage linked service for input and output.
        inputLinkedService = linkedServices.First(
            linkedService =>
            linkedService.Name ==
            inputDataset.Properties.LinkedServiceName).Properties.TypeProperties
            as AzureStorageLinkedService;
    
        // get the connection string in the linked service
        string connectionString = inputLinkedService.ConnectionString;
    
        // get the folder path from the input dataset definition
        string folderPath = GetFolderPath(inputDataset);
        string output = string.Empty; // for use later.
    
        // create storage client for input. Pass the connection string.
        CloudStorageAccount inputStorageAccount = CloudStorageAccount.Parse(connectionString);
        CloudBlobClient inputClient = inputStorageAccount.CreateCloudBlobClient();
    
        // initialize the continuation token before using it in the do-while loop.
        BlobContinuationToken continuationToken = null;
        do
        {   // get the list of input blobs from the input storage client object.
            BlobResultSegment blobList = inputClient.ListBlobsSegmented(folderPath,
                                     true,
                                     BlobListingDetails.Metadata,
                                     null,
                                     continuationToken,
                                     null,
                                     null);
    
            // Calculate method returns the number of occurrences of
            // the search term ("Microsoft") in each blob associated
            // with the data slice. definition of the method is shown in the next step.
    
            output = Calculate(blobList, logger, folderPath, ref continuationToken, "Microsoft");
    
        } while (continuationToken != null);
    
        // get the output dataset using the name of the dataset matched to a name in the Activity output collection.
        Dataset outputDataset = datasets.Single(dataset => dataset.Name == activity.Outputs.Single().Name);
    
        // get type properties for the output dataset
        outputTypeProperties = outputDataset.Properties.TypeProperties as AzureBlobDataset;
    
        // get the folder path from the output dataset definition
        folderPath = GetFolderPath(outputDataset);
    
        // log the output folder path
        logger.Write("Writing blob to the folder: {0}", folderPath);
    
        // create a storage object for the output blob.
        CloudStorageAccount outputStorageAccount = CloudStorageAccount.Parse(connectionString);
        // write the name of the file.
        Uri outputBlobUri = new Uri(outputStorageAccount.BlobEndpoint, folderPath + "/" + GetFileName(outputDataset));
    
        // log the output file name
        logger.Write("output blob URI: {0}", outputBlobUri.ToString());
    
        // create a blob and upload the output text.
        CloudBlockBlob outputBlob = new CloudBlockBlob(outputBlobUri, outputStorageAccount.Credentials);
        logger.Write("Writing {0} to the output blob", output);
        outputBlob.UploadText(output);
    
        // The dictionary can be used to chain custom activities together in the future.
        // This feature is not implemented yet, so just return an empty dictionary.
    
        return new Dictionary<string, string>();
    }
    
  9. Dodaj następujące metody pomocnika:

    /// <summary>
    /// Gets the folderPath value from the input/output dataset.
    /// </summary>
    
    private static string GetFolderPath(Dataset dataArtifact)
    {
        if (dataArtifact == null || dataArtifact.Properties == null)
        {
            return null;
        }
    
        // get type properties of the dataset
        AzureBlobDataset blobDataset = dataArtifact.Properties.TypeProperties as AzureBlobDataset;
        if (blobDataset == null)
        {
            return null;
        }
    
        // return the folder path found in the type properties
        return blobDataset.FolderPath;
    }
    
    /// <summary>
    /// Gets the fileName value from the input/output dataset.
    /// </summary>
    
    private static string GetFileName(Dataset dataArtifact)
    {
        if (dataArtifact == null || dataArtifact.Properties == null)
        {
            return null;
        }
    
        // get type properties of the dataset
        AzureBlobDataset blobDataset = dataArtifact.Properties.TypeProperties as AzureBlobDataset;
        if (blobDataset == null)
        {
            return null;
        }
    
        // return the blob/file name in the type properties
        return blobDataset.FileName;
    }
    
    /// <summary>
    /// Iterates through each blob (file) in the folder, counts the number of instances of search term in the file,
    /// and prepares the output text that is written to the output blob.
    /// </summary>
    
    public static string Calculate(BlobResultSegment Bresult, IActivityLogger logger, string folderPath, ref BlobContinuationToken token, string searchTerm)
    {
        string output = string.Empty;
        logger.Write("number of blobs found: {0}", Bresult.Results.Count<IListBlobItem>());
        foreach (IListBlobItem listBlobItem in Bresult.Results)
        {
            CloudBlockBlob inputBlob = listBlobItem as CloudBlockBlob;
            if ((inputBlob != null) && (inputBlob.Name.IndexOf("$$$.$$$") == -1))
            {
                string blobText = inputBlob.DownloadText(Encoding.ASCII, null, null, null);
                logger.Write("input blob text: {0}", blobText);
                string[] source = blobText.Split(new char[] { '.', '?', '!', ' ', ';', ':', ',' }, StringSplitOptions.RemoveEmptyEntries);
                var matchQuery = from word in source
                                 where word.ToLowerInvariant() == searchTerm.ToLowerInvariant()
                                 select word;
                int wordCount = matchQuery.Count();
                output += string.Format("{0} occurrences(s) of the search term \"{1}\" were found in the file {2}.\r\n", wordCount, searchTerm, inputBlob.Name);
            }
        }
        return output;
    }
    

    Metoda GetFolderPath zwraca ścieżkę do folderu, do którego wskazuje zestaw danych, a metoda GetFileName zwraca nazwę obiektu blob/pliku, do którego wskazuje zestaw danych. Jeśli masz parametr folderPath definiuje użycie zmiennych, takich jak {Year}, {Month}, {Day} itp., metoda zwraca ciąg bez zastępowania ich wartościami środowiska uruchomieniowego. Aby uzyskać szczegółowe informacje na temat uzyskiwania dostępu do fragmentatoraStart, SliceEnd itp., zobacz sekcję Uzyskiwanie dostępu do właściwości rozszerzonych.

    "name": "InputDataset",
    "properties": {
        "type": "AzureBlob",
        "linkedServiceName": "AzureStorageLinkedService",
        "typeProperties": {
            "fileName": "file.txt",
            "folderPath": "adftutorial/inputfolder/",
    

    Metoda Calculate oblicza liczbę wystąpień słowa kluczowego Microsoft w plikach wejściowych (obiekty blob w folderze). Termin wyszukiwania ("Microsoft") jest zakodowany w kodzie.

  10. Skompiluj projekt. Kliknij pozycję Kompiluj z menu i kliknij pozycję Kompiluj rozwiązanie.

    Ważne

    Ustaw wersję 4.5.2 .NET Framework jako strukturę docelową projektu: kliknij projekt prawym przyciskiem myszy, a następnie kliknij pozycję Właściwości, aby ustawić strukturę docelową. Usługa Data Factory nie obsługuje działań niestandardowych skompilowanych na .NET Framework wersjach nowszych niż 4.5.2.

  11. Uruchom Eksploratora Windows i przejdź do folderu bin\debug lub bin\release w zależności od typu kompilacji.

  12. Utwórz plik zip MyDotNetActivity.zip zawierający wszystkie pliki binarne w <folderze> projektu\bin\Debug. Dołącz plik MyDotNetActivity.pdb , aby uzyskać dodatkowe szczegóły, takie jak numer wiersza w kodzie źródłowym, który spowodował problem, jeśli wystąpił błąd.

    Ważne

    Wszystkie pliki w archiwum ZIP działania niestandardowego muszą znajdować się na najwyższym poziomie, bez podfolderów.

    Wyjściowe pliki binarne

  13. Utwórz kontener obiektów blob o nazwie customactivitycontainer , jeśli jeszcze nie istnieje.

  14. Przekaż MyDotNetActivity.zip jako obiekt blob do obiektu customactivitycontainer w usłudze Azure Blob Storage ogólnego przeznaczenia (nie gorąca/chłodna usługa Blob Storage), która jest określana przez usługę AzureStorageLinkedService.

Ważne

Jeśli dodasz ten projekt działań platformy .NET do rozwiązania w programie Visual Studio, który zawiera projekt usługi Data Factory i dodasz odwołanie do projektu działań platformy .NET z projektu aplikacji usługi Data Factory, nie musisz wykonywać dwóch ostatnich kroków ręcznego tworzenia pliku zip i przekazywania go do ogólnego przeznaczenia usługi Azure Blob Storage. Po opublikowaniu jednostek usługi Data Factory przy użyciu programu Visual Studio te kroki są wykonywane automatycznie przez proces publikowania. Aby uzyskać więcej informacji, zobacz Projekt usługi Data Factory w programie Visual Studio .

Tworzenie potoku z działaniem niestandardowym

Utworzono działanie niestandardowe i przekazano plik zip z plikami binarnymi do kontenera obiektów blob na koncie usługi Azure Storage ogólnego przeznaczenia . W tej sekcji utworzysz fabrykę danych platformy Azure z potokiem, który używa działania niestandardowego.

Wejściowy zestaw danych dla działania niestandardowego reprezentuje obiekty blob (pliki) w folderze customactivityinput kontenera adftutorial w magazynie obiektów blob. Wyjściowy zestaw danych dla działania reprezentuje wyjściowe obiekty blob w folderze customactivityoutput kontenera adftutorial w magazynie obiektów blob.

Utwórz plikfile.txt z następującą zawartością i przekaż go do folderu customactivityinput kontenera adftutorial . Utwórz kontener adftutorial, jeśli jeszcze nie istnieje.

test custom activity Microsoft test custom activity Microsoft

Folder wejściowy odpowiada wycinkowi w Azure Data Factory nawet wtedy, gdy folder ma co najmniej dwa pliki. Gdy każdy wycinek jest przetwarzany przez potok, działanie niestandardowe iteruje wszystkie obiekty blob w folderze wejściowym dla tego wycinka.

Zostanie wyświetlony jeden plik wyjściowy z folderem adftutorial\customactivityoutput z co najmniej jednym wierszem (takim samym jak liczba obiektów blob w folderze wejściowym):

2 occurrences(s) of the search term "Microsoft" were found in the file inputfolder/2016-11-16-00/file.txt.

Poniżej przedstawiono kroki wykonywane w tej sekcji:

  1. Utwórz fabrykę danych.
  2. Utwórz połączone usługi dla puli Azure Batch maszyn wirtualnych, na których działa działanie niestandardowe, oraz usługę Azure Storage, która przechowuje wejściowe/wyjściowe obiekty blob.
  3. Utwórz wejściowe i wyjściowe zestawy danych reprezentujące dane wejściowe i wyjściowe działania niestandardowego.
  4. Utwórz potok , który używa działania niestandardowego.

Uwaga

Utwórz file.txt i przekaż go do kontenera obiektów blob, jeśli jeszcze tego nie zrobiono. Zobacz instrukcje w poprzedniej sekcji.

Krok 1. Tworzenie fabryki danych

  1. Po zalogowaniu się do Azure Portal wykonaj następujące czynności:

    1. Kliknij pozycję Utwórz zasób w menu po lewej stronie.

    2. Kliknij pozycję Dane i analiza w bloku Nowy .

    3. Kliknij przycisk Fabryka danych w bloku Analiza danych.

      Menu Nowy Azure Data Factory

  2. W bloku Nowa fabryka danych wprowadź wartość CustomActivityFactory w polu Nazwa. Nazwa fabryki danych Azure musi być globalnie unikatowa. Jeśli wystąpi błąd: Nazwa fabryki danych "CustomActivityFactory" jest niedostępna, zmień nazwę fabryki danych (na przykład yournameCustomActivityFactory) i spróbuj utworzyć ponownie.

    Nowy blok Azure Data Factory

  3. Kliknij pozycję NAZWA GRUPY ZASOBÓW i wybierz istniejącą grupę zasobów lub utwórz grupę zasobów.

  4. Sprawdź, czy używasz odpowiedniej subskrypcji i regionu , w którym chcesz utworzyć fabrykę danych.

  5. Kliknij przycisk Utwórz w bloku Nowa fabryka danych.

  6. Zostanie wyświetlona fabryka danych utworzona na pulpicie nawigacyjnym Azure Portal.

  7. Po pomyślnym utworzeniu fabryki danych zostanie wyświetlony blok Fabryka danych, który pokazuje zawartość fabryki danych.

    Blok Fabryka danych

Krok 2. Tworzenie połączonych usług

Połączone usługi łączą magazyny danych lub usługi obliczeniowe z fabryką danych Azure. W tym kroku połączysz konto usługi Azure Storage i Azure Batch konto z fabryką danych.

Tworzenie połączonej usługi Azure Storage

  1. Kliknij kafelek Autor i wdróż w bloku FABRYKA DANYCH dla pozycji CustomActivityFactory. Pojawi się Edytor fabryki danych.

  2. Kliknij pozycję Nowy magazyn danych na pasku poleceń i wybierz pozycję Azure Storage. Powinien zostać wyświetlony skrypt JSON do tworzenia połączonej usługi Azure Storage w edytorze.

    Nowy magazyn danych — Azure Storage

  3. Zastąp <accountname> ciąg nazwą konta usługi Azure Storage i <accountkey> kluczem dostępu konta usługi Azure Storage. Aby dowiedzieć się, jak uzyskać klucz dostępu do magazynu, zobacz Zarządzanie kluczami dostępu do konta magazynu.

    Usługa lubiana w usłudze Azure Storage

  4. Kliknij przycisk Wdróż na pasku poleceń, aby wdrożyć połączoną usługę.

Tworzenie połączonej usługi Azure Batch

  1. W Edytorze fabryki danych kliknij pozycję ... Więcej informacji na pasku poleceń, kliknij pozycję Nowe obliczenia, a następnie wybierz Azure Batch z menu.

    Nowe obliczenia — Azure Batch

  2. Wprowadź następujące zmiany w skryscie JSON:

    1. Określ nazwę konta Azure Batch dla właściwości accountName. Adres URL z bloku konta Azure Batch ma następujący format: http://accountname.region.batch.azure.com. Dla właściwości batchUri w formacie JSON należy usunąć accountname. element z adresu URL i użyć accountnameaccountName właściwości JSON.

    2. Określ klucz konta Azure Batch dla właściwości accessKey.

    3. Określ nazwę puli utworzonej w ramach wymagań wstępnych dla właściwości poolName . Można również określić identyfikator puli zamiast nazwy puli.

    4. Określ identyfikator URI Azure Batch dla właściwości batchUri. Przykład: https://westus.batch.azure.com.

    5. Określ właściwość AzureStorageLinkedService dla właściwości linkedServiceName .

      {
        "name": "AzureBatchLinkedService",
        "properties": {
          "type": "AzureBatch",
          "typeProperties": {
            "accountName": "myazurebatchaccount",
            "batchUri": "https://westus.batch.azure.com",
            "accessKey": "<yourbatchaccountkey>",
            "poolName": "myazurebatchpool",
            "linkedServiceName": "AzureStorageLinkedService"
          }
        }
      }
      

      Dla właściwości poolName można również określić identyfikator puli zamiast nazwy puli.

Krok 3. Tworzenie zestawów danych

W tym kroku utworzysz zestawy danych reprezentujące dane wejściowe i wyjściowe.

Tworzenie wejściowego zestawu danych

  1. W edytorze fabryki danych kliknij pozycję ... Więcej informacji na pasku poleceń, kliknij pozycję Nowy zestaw danych, a następnie wybierz pozycję Azure Blob Storage z menu rozwijanego.

  2. Zastąp kod JSON w okienku po prawej stronie następującym fragmentem kodu JSON:

    {
        "name": "InputDataset",
        "properties": {
            "type": "AzureBlob",
            "linkedServiceName": "AzureStorageLinkedService",
            "typeProperties": {
                "folderPath": "adftutorial/customactivityinput/",
                "format": {
                    "type": "TextFormat"
                }
            },
            "availability": {
                "frequency": "Hour",
                "interval": 1
            },
            "external": true,
            "policy": {}
        }
    }
    

    Potok zostanie utworzony w dalszej części tego przewodnika z godziną rozpoczęcia: 2016-11-16T00:00:00Z i godzina zakończenia: 2016-11-16T05:00:00Z. Zaplanowano tworzenie danych co godzinę, więc istnieje pięć wycinków wejściowych/wyjściowych (od 00:00:00 do>05:00:00:00).

    Częstotliwość i interwał wejściowego zestawu danych są ustawione na Godzinę i 1, co oznacza, że wycinek wejściowy jest dostępny co godzinę. W tym przykładzie jest to ten sam plik (file.txt) w folderze intputfolder.

    Poniżej przedstawiono godziny rozpoczęcia dla każdego wycinka, który jest reprezentowany przez zmienną systemową SliceStart w powyższym fragmencie kodu JSON.

  3. Kliknij pozycję Wdróż na pasku narzędzi, aby utworzyć i wdrożyć zestaw InputDataset. Upewnij się, że na pasku tytułu w edytorze wyświetlany jest komunikat TABELA ZOSTAŁA UTWORZONA POMYŚLNIE.

Tworzenie wyjściowego zestawu danych

  1. W edytorze usługi Data Factory kliknij pozycję ... Więcej informacji na pasku poleceń kliknij pozycję Nowy zestaw danych, a następnie wybierz pozycję Azure Blob Storage.

  2. Zastąp skrypt JSON w okienku po prawej stronie następującym skryptem JSON:

    {
        "name": "OutputDataset",
        "properties": {
            "type": "AzureBlob",
            "linkedServiceName": "AzureStorageLinkedService",
            "typeProperties": {
                "fileName": "{slice}.txt",
                "folderPath": "adftutorial/customactivityoutput/",
                "partitionedBy": [
                    {
                        "name": "slice",
                        "value": {
                            "type": "DateTime",
                            "date": "SliceStart",
                            "format": "yyyy-MM-dd-HH"
                        }
                    }
                ]
            },
            "availability": {
                "frequency": "Hour",
                "interval": 1
            }
        }
    }
    

    Lokalizacja wyjściowa to adftutorial/customactivityoutput/ i nazwa pliku wyjściowego jest yyyy-MM-dd-HH.txt gdzie rrrr-MM-dd-HH jest rokiem, miesiącem, datą i godziną produkcji wycinka. Aby uzyskać szczegółowe informacje, zobacz Dokumentacja dla deweloperów .

    Wyjściowy obiekt blob/plik jest generowany dla każdego wycinka danych wejściowych. Poniżej przedstawiono sposób, w jaki plik wyjściowy ma nazwę dla każdego wycinka. Wszystkie pliki wyjściowe są generowane w jednym folderze wyjściowym: adftutorial\customactivityoutput.

    Plasterek Godzina rozpoczęcia Plik wyjściowy
    1 2016-11-16T00:00:00 2016-11-16-00.txt
    2 2016-11-16T01:00:00 2016-11-16-01.txt
    3 2016-11-16T02:00:00 2016-11-16-02.txt
    4 2016-11-16T03:00:00 2016-11-16-03.txt
    5 2016-11-16T04:00:00 2016-11-16-04.txt

    Pamiętaj, że wszystkie pliki w folderze wejściowym są częścią wycinka z godzinami rozpoczęcia wymienionymi powyżej. Po przetworzeniu tego wycinka działanie niestandardowe skanuje za pomocą każdego pliku i tworzy wiersz w pliku wyjściowym z liczbą wystąpień terminu wyszukiwania ("Microsoft"). Jeśli w folderze wejściowym znajdują się trzy wiersze, w pliku wyjściowym znajdują się trzy wiersze dla każdego wycinka godzinowego: 2016-11-16-00.txt, 2016-11-16:01:00:00.txt itp.

  3. Aby wdrożyć zestaw OutputDataset, kliknij pozycję Wdróż na pasku poleceń.

Tworzenie i uruchamianie potoku korzystającego z działania niestandardowego

  1. W Edytorze fabryki danych kliknij pozycję ... Więcej, a następnie wybierz pozycję Nowy potok na pasku poleceń.

  2. Zastąp kod JSON w okienku po prawej stronie następującym skryptem JSON:

    {
      "name": "ADFTutorialPipelineCustom",
      "properties": {
        "description": "Use custom activity",
        "activities": [
          {
            "Name": "MyDotNetActivity",
            "Type": "DotNetActivity",
            "Inputs": [
              {
                "Name": "InputDataset"
              }
            ],
            "Outputs": [
              {
                "Name": "OutputDataset"
              }
            ],
            "LinkedServiceName": "AzureBatchLinkedService",
            "typeProperties": {
              "AssemblyName": "MyDotNetActivity.dll",
              "EntryPoint": "MyDotNetActivityNS.MyDotNetActivity",
              "PackageLinkedService": "AzureStorageLinkedService",
              "PackageFile": "customactivitycontainer/MyDotNetActivity.zip",
              "extendedProperties": {
                "SliceStart": "$$Text.Format('{0:yyyyMMddHH-mm}', Time.AddMinutes(SliceStart, 0))"
              }
            },
            "Policy": {
              "Concurrency": 2,
              "ExecutionPriorityOrder": "OldestFirst",
              "Retry": 3,
              "Timeout": "00:30:00",
              "Delay": "00:00:00"
            }
          }
        ],
        "start": "2016-11-16T00:00:00Z",
        "end": "2016-11-16T05:00:00Z",
        "isPaused": false
      }
    }
    

    Pamiętaj o następujących kwestiach:

    • Współbieżność jest ustawiona na 2, dzięki czemu dwa wycinki są przetwarzane równolegle przez 2 maszyny wirtualne w puli Azure Batch.
    • Istnieje jedno działanie w sekcji działania i jest to typ : DotNetActivity.
    • Nazwa_zestawu jest ustawiona na nazwę biblioteki DLL: MyDotnetActivity.dll.
    • Program EntryPoint jest ustawiony na Wartość MyDotNetActivityNS.MyDotNetActivity.
    • Parametr PackageLinkedService jest ustawiony na wartość AzureStorageLinkedService , która wskazuje magazyn obiektów blob zawierający niestandardowy plik zip działania. Jeśli używasz różnych kont usługi Azure Storage dla plików wejściowych/wyjściowych i niestandardowego pliku zip działania, utworzysz inną połączoną usługę Azure Storage. W tym artykule założono, że używasz tego samego konta usługi Azure Storage.
    • Parametr PackageFile jest ustawiony na wartość customactivitycontainer/MyDotNetActivity.zip. Jest on w formacie: containerforthezip/nameofthezip.zip.
    • Działanie niestandardowe przyjmuje parametr InputDataset jako dane wejściowe i OutputDataset jako dane wyjściowe.
    • Właściwość linkedServiceName niestandardowych punktów działania do usługi AzureBatchLinkedService, która informuje Azure Data Factory, że działanie niestandardowe musi być uruchamiane na maszynach wirtualnych Azure Batch.
    • właściwość isPaused jest domyślnie ustawiona na wartość false . Potok jest uruchamiany natychmiast w tym przykładzie, ponieważ wycinki zaczynają się w przeszłości. Możesz ustawić tę właściwość na wartość true, aby wstrzymać potok i ustawić go z powrotem na wartość false, aby ponownie uruchomić.
    • Czas rozpoczęcia i czas zakończenia to pięć godzin, a wycinki są tworzone co godzinę, więc pięć wycinków jest generowanych przez potok.
  3. Aby wdrożyć potok, kliknij pozycję Wdróż na pasku poleceń.

Monitorowanie potoku

  1. W bloku Fabryka danych w Azure Portal kliknij pozycję Diagram.

    Kafelek Diagram

  2. W widoku diagramu kliknij pozycję OutputDataset.

    Widok diagramu

  3. Powinny zostać wyświetlone pięć wycinków wyjściowych w stanie Gotowe. Jeśli nie są w stanie Gotowe, nie zostały jeszcze wyprodukowane.

    Wycinki danych wyjściowych

  4. Sprawdź, czy pliki wyjściowe są generowane w magazynie obiektów blob w kontenerze adftutorial .

    dane wyjściowe z działań niestandardowych

  5. Po otwarciu pliku wyjściowego powinny zostać wyświetlone dane wyjściowe podobne do następujących danych wyjściowych:

    2 occurrences(s) of the search term "Microsoft" were found in the file inputfolder/2016-11-16-00/file.txt.
    
  6. Użyj poleceń cmdlet Azure Portal lub Azure PowerShell, aby monitorować fabrykę danych, potoki i zestawy danych. Komunikaty z dziennika ActivityLogger można wyświetlić w kodzie dla działania niestandardowego w dziennikach (w szczególności user-0.log), które można pobrać z portalu lub przy użyciu poleceń cmdlet.

    pobieranie dzienników z działań niestandardowych

Zobacz Monitorowanie potoków i zarządzanie nimi, aby uzyskać szczegółowe instrukcje dotyczące monitorowania zestawów danych i potoków.

Projekt usługi Data Factory w programie Visual Studio

Jednostki usługi Data Factory można tworzyć i publikować przy użyciu programu Visual Studio zamiast Azure Portal. Aby uzyskać szczegółowe informacje na temat tworzenia i publikowania jednostek usługi Data Factory przy użyciu programu Visual Studio, zobacz Tworzenie pierwszego potoku przy użyciu programu Visual Studio i Kopiowanie danych z usługi Azure Blob do Azure SQL artykułów.

Wykonaj następujące dodatkowe kroki, jeśli tworzysz projekt usługi Data Factory w programie Visual Studio:

  1. Dodaj projekt usługi Data Factory do rozwiązania programu Visual Studio, które zawiera projekt działań niestandardowych.

  2. Dodaj odwołanie do projektu działań platformy .NET z projektu usługi Data Factory. Kliknij prawym przyciskiem myszy projekt usługi Data Factory, wskaż polecenie Dodaj, a następnie kliknij przycisk Odwołanie.

  3. W oknie dialogowym Dodawanie odwołania wybierz projekt MyDotNetActivity , a następnie kliknij przycisk OK.

  4. Skompiluj i opublikuj rozwiązanie.

    Ważne

    Podczas publikowania jednostek usługi Data Factory plik zip jest automatycznie tworzony i przekazywany do kontenera obiektów blob: customactivitycontainer. Jeśli kontener obiektów blob nie istnieje, zostanie on również utworzony automatycznie.

Integracja z usługą Data Factory i usługą Batch

Usługa Data Factory tworzy zadanie w Azure Batch o nazwie adf-poolname: job-xxx. Kliknij pozycję Zadania w menu po lewej stronie.

Azure Data Factory — zadania usługi Batch

Zadanie jest tworzone dla każdego uruchomienia działania wycinka. Jeśli w tym zadaniu jest gotowych do przetworzenia pięć wycinków, w tym zadaniu są tworzone pięć zadań. Jeśli w puli usługi Batch istnieje wiele węzłów obliczeniowych, co najmniej dwa wycinki mogą działać równolegle. Jeśli maksymalna liczba zadań na węzeł obliczeniowy jest ustawiona na > 1, możesz również mieć więcej niż jeden wycinek uruchomiony w ramach tego samego obliczenia.

Azure Data Factory — zadania zadań usługi Batch

Na poniższym diagramie przedstawiono relację między zadaniami Azure Data Factory i Batch.

Usługa Data Factory & Batch

Rozwiązywanie problemów

Rozwiązywanie problemów składa się z kilku podstawowych technik:

  1. Jeśli zostanie wyświetlony następujący błąd, możesz użyć magazynu obiektów blob Gorąca/Chłodna zamiast używania ogólnego przeznaczenia usługi Azure Blob Storage. Przekaż plik zip do konta usługi Azure Storage ogólnego przeznaczenia.

    Error in Activity: Job encountered scheduling error. Code: BlobDownloadMiscError Category: ServerError Message: Miscellaneous error encountered while downloading one of the specified Azure Blob(s).
    
  2. Jeśli zostanie wyświetlony następujący błąd, upewnij się, że nazwa klasy w pliku CS jest zgodna z nazwą określoną dla właściwości EntryPoint w formacie JSON potoku. W przewodniku nazwa klasy to: MyDotNetActivity, a punkt wejścia w formacie JSON to: MyDotNetActivityNS. MyDotNetActivity.

    MyDotNetActivity assembly does not exist or doesn't implement the type Microsoft.DataFactories.Runtime.IDotNetActivity properly
    

    Jeśli nazwy są zgodne, upewnij się, że wszystkie pliki binarne znajdują się w folderze głównym pliku zip. Oznacza to, że po otwarciu pliku zip wszystkie pliki w folderze głównym powinny być widoczne, a nie w żadnych podfolderach.

  3. Jeśli wycinek wejściowy nie jest ustawiony na Gotowe, upewnij się, że struktura folderów wejściowych jest poprawna i file.txt istnieje w folderach wejściowych.

  4. W metodzie Execute działania niestandardowego użyj obiektu IActivityLogger , aby rejestrować informacje ułatwiające rozwiązywanie problemów. Zarejestrowane komunikaty są wyświetlane w plikach dziennika użytkownika (co najmniej jeden plik o nazwie: user-0.log, user-1.log, user-2.log itp.).

    W bloku OutputDataset kliknij wycinek, aby wyświetlić blok FRAGMENT DANYCH dla tego wycinka. Zobaczysz uruchomienia działań dla tego wycinka. Powinno zostać wyświetlone jedno uruchomienie działania dla wycinka. Po kliknięciu przycisku Uruchom na pasku poleceń możesz uruchomić inne działanie dla tego samego wycinka.

    Po kliknięciu przebiegu działania zostanie wyświetlony blok SZCZEGÓŁY URUCHOMIENIA DZIAŁANIA z listą plików dziennika. W pliku user_0.log są wyświetlane zarejestrowane komunikaty. Gdy wystąpi błąd, zobaczysz trzy uruchomienia działania, ponieważ liczba ponownych prób jest ustawiona na 3 w formacie JSON potoku/działania. Po kliknięciu przebiegu działania zobaczysz pliki dziennika, które można przejrzeć, aby rozwiązać problem z błędem.

    Na liście plików dziennika kliknij plik user-0.log. W panelu po prawej stronie są wyniki użycia metody IActivityLogger.Write . Jeśli nie widzisz wszystkich komunikatów, sprawdź, czy masz więcej plików dziennika o nazwie: user_1.log, user_2.log itp. W przeciwnym razie kod mógł zakończyć się niepowodzeniem po ostatnim zalogowanym komunikacie.

    Ponadto sprawdź plik system-0.log pod kątem wszelkich komunikatów o błędach systemowych i wyjątków.

  5. Dołącz plik PDB do pliku zip, aby szczegóły błędu zawierały informacje, takie jak stos wywołań w przypadku wystąpienia błędu.

  6. Wszystkie pliki w archiwum ZIP działania niestandardowego muszą znajdować się na najwyższym poziomie, bez podfolderów.

  7. Upewnij się, że parametr assemblyName (MyDotNetActivity.dll), entryPoint(MyDotNetActivityNS.MyDotNetActivity), packageFile (customactivitycontainer/MyDotNetActivity.zip) i packageLinkedService (powinien wskazywać magazyn obiektów blob platformy Azure ogólnego przeznaczenia, który zawiera plik zip) są ustawione na poprawne wartości.

  8. Jeśli naprawiono błąd i chcesz przetworzyć wycinek ponownie, kliknij prawym przyciskiem wycinek w bloku OutputDataset i kliknij polecenie Uruchom.

  9. Jeśli zostanie wyświetlony następujący błąd, używasz pakietu usługi Azure Storage w wersji > 4.3.0. Uruchamianie usługi Data Factory wymaga wersji 4.3 systemu WindowsAzure.Storage. Jeśli musisz użyć nowszej wersji zestawu usługi Azure Storage, zobacz sekcję Izolacja domeny aplikacji .

    Error in Activity: Unknown error in module: System.Reflection.TargetInvocationException: Exception has been thrown by the target of an invocation. ---> System.TypeLoadException: Could not load type 'Microsoft.WindowsAzure.Storage.Blob.CloudBlob' from assembly 'Microsoft.WindowsAzure.Storage, Version=4.3.0.0, Culture=neutral,
    

    Jeśli możesz użyć pakietu usługi Azure Storage w wersji 4.3.0, usuń istniejące odwołanie do pakietu usługi Azure Storage w wersji > 4.3.0. Następnie uruchom następujące polecenie w konsoli Menedżera pakietów NuGet.

    Install-Package WindowsAzure.Storage -Version 4.3.0
    

    Skompiluj projekt. Usuń zestaw Azure.Storage w wersji > 4.3.0 z folderu bin\Debug. Utwórz plik zip z plikami binarnymi i plikiem PDB. Zastąp stary plik zip tym plikiem w kontenerze obiektów blob (customactivitycontainer). Uruchom ponownie wycinki, które zakończyły się niepowodzeniem (kliknij prawym przyciskiem myszy wycinek, a następnie kliknij polecenie Uruchom).

  10. Działanie niestandardowe nie używa pliku app.config z pakietu. W związku z tym, jeśli kod odczytuje parametry połączenia z pliku konfiguracji, nie działa w czasie wykonywania. Najlepszym rozwiązaniem w przypadku używania Azure Batch jest utrzymywanie jakichkolwiek wpisów tajnych w usłudze Azure KeyVault, używanie jednostki usługi opartej na certyfikatach w celu ochrony magazynu kluczy i dystrybuowanie certyfikatu do puli Azure Batch. Niestandardowe działanie .NET będzie miało w takiej sytuacji dostęp do danych poufnych z magazynu KeyVault w czasie uruchomienia. To rozwiązanie jest ogólnym rozwiązaniem i umożliwia skalowanie do dowolnego typu wpisu tajnego, a nie tylko parametrów połączenia.

    Istnieje łatwiejsze obejście (ale nie najlepsze rozwiązanie): można utworzyć Azure SQL połączoną usługę z ustawieniami parametrów połączenia, utworzyć zestaw danych używający połączonej usługi i połączyć zestaw danych jako fikcyjny wejściowy zestaw danych z niestandardowym działaniem platformy .NET. Następnie możesz uzyskać dostęp do parametrów połączenia połączonej usługi w niestandardowym kodzie działania.

Aktualizowanie działania niestandardowego

Jeśli zaktualizujesz kod działania niestandardowego, skompiluj go i przekaż plik zip zawierający nowe pliki binarne do magazynu obiektów blob.

Izolacja domeny aplikacji

Zobacz Przykład Cross AppDomain , który pokazuje, jak utworzyć działanie niestandardowe, które nie jest ograniczone do wersji zestawów używanych przez moduł uruchamiania usługi Data Factory (na przykład: WindowsAzure.Storage v4.3.0, Newtonsoft.Json v6.0.x itp.).

Uzyskiwanie dostępu do właściwości rozszerzonych

Właściwości rozszerzone można zadeklarować w kodzie JSON działania, jak pokazano w poniższym przykładzie:

"typeProperties": {
  "AssemblyName": "MyDotNetActivity.dll",
  "EntryPoint": "MyDotNetActivityNS.MyDotNetActivity",
  "PackageLinkedService": "AzureStorageLinkedService",
  "PackageFile": "customactivitycontainer/MyDotNetActivity.zip",
  "extendedProperties": {
    "SliceStart": "$$Text.Format('{0:yyyyMMddHH-mm}', Time.AddMinutes(SliceStart, 0))",
    "DataFactoryName": "CustomActivityFactory"
  }
},

W tym przykładzie istnieją dwie właściwości rozszerzone: SliceStart i DataFactoryName. Wartość sliceStart jest oparta na zmiennej systemowej SliceStart. Aby uzyskać listę obsługiwanych zmiennych systemowych, zobacz Zmienne systemowe . Wartość elementu DataFactoryName jest trwale zakodowana na wartość CustomActivityFactory.

Aby uzyskać dostęp do tych właściwości rozszerzonych w metodzie Execute , użyj kodu podobnego do następującego kodu:

// to get extended properties (for example: SliceStart)
DotNetActivity dotNetActivity = (DotNetActivity)activity.TypeProperties;
string sliceStartString = dotNetActivity.ExtendedProperties["SliceStart"];

// to log all extended properties
IDictionary<string, string> extendedProperties = dotNetActivity.ExtendedProperties;
logger.Write("Logging extended properties if any...");
foreach (KeyValuePair<string, string> entry in extendedProperties)
{
    logger.Write("<key:{0}> <value:{1}>", entry.Key, entry.Value);
}

Automatyczne skalowanie Azure Batch

Możesz również utworzyć pulę Azure Batch z funkcją autoskalowania. Można na przykład utworzyć pulę usługi Azure Batch z 0 dedykowanymi maszynami wirtualnymi i formułą autoskalowania na podstawie liczby oczekujących zadań.

Przykładowa formuła w tym miejscu zapewnia następujące zachowanie: po początkowym utworzeniu puli rozpoczyna się od 1 maszyny wirtualnej. $PendingTasks metryka definiuje liczbę zadań w stanie uruchomionym i aktywnym (w kolejce). Formuła znajduje średnią liczbę oczekujących zadań w ciągu ostatnich 180 sekund i odpowiednio ustawia element TargetDedicated. Gwarantuje to, że element TargetDedicated nigdy nie przekracza 25 maszyn wirtualnych. W miarę przesyłania nowych zadań pula automatycznie rośnie i w miarę wykonywania zadań maszyny wirtualne stają się wolne od jednego do jednego, a skalowanie automatyczne zmniejsza te maszyny wirtualne. maszyny startNumberOfVMs i maxNumberofVM można dostosować do Twoich potrzeb.

Formuła autoskalu:

startingNumberOfVMs = 1;
maxNumberofVMs = 25;
pendingTaskSamplePercent = $PendingTasks.GetSamplePercent(180 * TimeInterval_Second);
pendingTaskSamples = pendingTaskSamplePercent < 70 ? startingNumberOfVMs : avg($PendingTasks.GetSample(180 * TimeInterval_Second));
$TargetDedicated=min(maxNumberofVMs,pendingTaskSamples);

Aby uzyskać szczegółowe informacje, zobacz Automatyczne skalowanie węzłów obliczeniowych w puli Azure Batch.

Jeśli pula używa domyślnej wartości autoScaleEvaluationInterval, przygotowanie maszyny wirtualnej przed uruchomieniem działania niestandardowego może potrwać 15–30 minut. Jeśli pula korzysta z innej wartości autoScaleEvaluationInterval, usługa Batch może potrwać autoScaleEvaluationInterval + 10 minut.

Tworzenie działania niestandardowego przy użyciu zestawu SDK platformy .NET

W przewodniku w tym artykule utworzysz fabrykę danych z potokiem, który używa działania niestandardowego przy użyciu Azure Portal. Poniższy kod pokazuje, jak utworzyć fabrykę danych przy użyciu zestawu SDK platformy .NET. Więcej szczegółów na temat używania zestawu SDK do programowego tworzenia potoków można znaleźć w artykule Tworzenie potoku za pomocą działania kopiowania przy użyciu interfejsu API platformy .NET .

using System;
using System.Configuration;
using System.Collections.ObjectModel;
using System.Threading;
using System.Threading.Tasks;

using Microsoft.Azure;
using Microsoft.Azure.Management.DataFactories;
using Microsoft.Azure.Management.DataFactories.Models;
using Microsoft.Azure.Management.DataFactories.Common.Models;

using Microsoft.IdentityModel.Clients.ActiveDirectory;
using System.Collections.Generic;

namespace DataFactoryAPITestApp
{
    class Program
    {
        static void Main(string[] args)
        {
            // create data factory management client

            // TODO: replace ADFTutorialResourceGroup with the name of your resource group.
            string resourceGroupName = "ADFTutorialResourceGroup";

            // TODO: replace APITutorialFactory with a name that is globally unique. For example: APITutorialFactory04212017
            string dataFactoryName = "APITutorialFactory";

            TokenCloudCredentials aadTokenCredentials = new TokenCloudCredentials(
                ConfigurationManager.AppSettings["SubscriptionId"],
                GetAuthorizationHeader().Result);

            Uri resourceManagerUri = new Uri(ConfigurationManager.AppSettings["ResourceManagerEndpoint"]);

            DataFactoryManagementClient client = new DataFactoryManagementClient(aadTokenCredentials, resourceManagerUri);

            Console.WriteLine("Creating a data factory");
            client.DataFactories.CreateOrUpdate(resourceGroupName,
                new DataFactoryCreateOrUpdateParameters()
                {
                    DataFactory = new DataFactory()
                    {
                        Name = dataFactoryName,
                        Location = "westus",
                        Properties = new DataFactoryProperties()
                    }
                }
            );

            // create a linked service for input data store: Azure Storage
            Console.WriteLine("Creating Azure Storage linked service");
            client.LinkedServices.CreateOrUpdate(resourceGroupName, dataFactoryName,
                new LinkedServiceCreateOrUpdateParameters()
                {
                    LinkedService = new LinkedService()
                    {
                        Name = "AzureStorageLinkedService",
                        Properties = new LinkedServiceProperties
                        (
                            // TODO: Replace <accountname> and <accountkey> with name and key of your Azure Storage account.
                            new AzureStorageLinkedService("DefaultEndpointsProtocol=https;AccountName=<accountname>;AccountKey=<accountkey>")
                        )
                    }
                }
            );

            // create a linked service for output data store: Azure SQL Database
            Console.WriteLine("Creating Azure Batch linked service");
            client.LinkedServices.CreateOrUpdate(resourceGroupName, dataFactoryName,
                new LinkedServiceCreateOrUpdateParameters()
                {
                    LinkedService = new LinkedService()
                    {
                        Name = "AzureBatchLinkedService",
                        Properties = new LinkedServiceProperties
                        (
                            // TODO: replace <batchaccountname> and <yourbatchaccountkey> with name and key of your Azure Batch account
                            new AzureBatchLinkedService("<batchaccountname>", "https://westus.batch.azure.com", "<yourbatchaccountkey>", "myazurebatchpool", "AzureStorageLinkedService")
                        )
                    }
                }
            );

            // create input and output datasets
            Console.WriteLine("Creating input and output datasets");
            string Dataset_Source = "InputDataset";
            string Dataset_Destination = "OutputDataset";

            Console.WriteLine("Creating input dataset of type: Azure Blob");
            client.Datasets.CreateOrUpdate(resourceGroupName, dataFactoryName,

                new DatasetCreateOrUpdateParameters()
                {
                    Dataset = new Dataset()
                    {
                        Name = Dataset_Source,
                        Properties = new DatasetProperties()
                        {
                            LinkedServiceName = "AzureStorageLinkedService",
                            TypeProperties = new AzureBlobDataset()
                            {
                                FolderPath = "adftutorial/customactivityinput/",
                                Format = new TextFormat()
                            },
                            External = true,
                            Availability = new Availability()
                            {
                                Frequency = SchedulePeriod.Hour,
                                Interval = 1,
                            },

                            Policy = new Policy() { }
                        }
                    }
                });

            Console.WriteLine("Creating output dataset of type: Azure Blob");
            client.Datasets.CreateOrUpdate(resourceGroupName, dataFactoryName,
                new DatasetCreateOrUpdateParameters()
                {
                    Dataset = new Dataset()
                    {
                        Name = Dataset_Destination,
                        Properties = new DatasetProperties()
                        {
                            LinkedServiceName = "AzureStorageLinkedService",
                            TypeProperties = new AzureBlobDataset()
                            {
                                FileName = "{slice}.txt",
                                FolderPath = "adftutorial/customactivityoutput/",
                                PartitionedBy = new List<Partition>()
                                {
                                    new Partition()
                                    {
                                        Name = "slice",
                                        Value = new DateTimePartitionValue()
                                        {
                                            Date = "SliceStart",
                                            Format = "yyyy-MM-dd-HH"
                                        }
                                    }
                                }
                            },
                            Availability = new Availability()
                            {
                                Frequency = SchedulePeriod.Hour,
                                Interval = 1,
                            },
                        }
                    }
                });

            Console.WriteLine("Creating a custom activity pipeline");
            DateTime PipelineActivePeriodStartTime = new DateTime(2017, 3, 9, 0, 0, 0, 0, DateTimeKind.Utc);
            DateTime PipelineActivePeriodEndTime = PipelineActivePeriodStartTime.AddMinutes(60);
            string PipelineName = "ADFTutorialPipelineCustom";

            client.Pipelines.CreateOrUpdate(resourceGroupName, dataFactoryName,
                new PipelineCreateOrUpdateParameters()
                {
                    Pipeline = new Pipeline()
                    {
                        Name = PipelineName,
                        Properties = new PipelineProperties()
                        {
                            Description = "Use custom activity",

                            // Initial value for pipeline's active period. With this, you won't need to set slice status
                            Start = PipelineActivePeriodStartTime,
                            End = PipelineActivePeriodEndTime,
                            IsPaused = false,

                            Activities = new List<Activity>()
                            {
                                new Activity()
                                {
                                    Name = "MyDotNetActivity",
                                    Inputs = new List<ActivityInput>()
                                    {
                                        new ActivityInput() {
                                            Name = Dataset_Source
                                        }
                                    },
                                    Outputs = new List<ActivityOutput>()
                                    {
                                        new ActivityOutput()
                                        {
                                            Name = Dataset_Destination
                                        }
                                    },
                                    LinkedServiceName = "AzureBatchLinkedService",
                                    TypeProperties = new DotNetActivity()
                                    {
                                        AssemblyName = "MyDotNetActivity.dll",
                                        EntryPoint = "MyDotNetActivityNS.MyDotNetActivity",
                                        PackageLinkedService = "AzureStorageLinkedService",
                                        PackageFile = "customactivitycontainer/MyDotNetActivity.zip",
                                        ExtendedProperties = new Dictionary<string, string>()
                                        {
                                            { "SliceStart", "$$Text.Format('{0:yyyyMMddHH-mm}', Time.AddMinutes(SliceStart, 0))"}
                                        }
                                    },
                                    Policy = new ActivityPolicy()
                                    {
                                        Concurrency = 2,
                                        ExecutionPriorityOrder = "OldestFirst",
                                        Retry = 3,
                                        Timeout = new TimeSpan(0,0,30,0),
                                        Delay = new TimeSpan()
                                    }
                                }
                            }
                        }
                    }
                });
        }

        public static async Task<string> GetAuthorizationHeader()
        {
            AuthenticationContext context = new AuthenticationContext(ConfigurationManager.AppSettings["ActiveDirectoryEndpoint"] + ConfigurationManager.AppSettings["ActiveDirectoryTenantId"]);
            ClientCredential credential = new ClientCredential(
                ConfigurationManager.AppSettings["ApplicationId"],
                ConfigurationManager.AppSettings["Password"]);
            AuthenticationResult result = await context.AcquireTokenAsync(
                resource: ConfigurationManager.AppSettings["WindowsManagementUri"],
                clientCredential: credential);

            if (result != null)
                return result.AccessToken;

            throw new InvalidOperationException("Failed to acquire token");
        }
    }
}

Debugowanie działań niestandardowych w programie Visual Studio

Przykład Azure Data Factory — środowisko lokalne w usłudze GitHub zawiera narzędzie umożliwiające debugowanie niestandardowych działań platformy .NET w programie Visual Studio.

Przykładowe działania niestandardowe w usłudze GitHub

Przykład Co robi działanie niestandardowe
Narzędzie do pobierania danych HTTP. Pobiera dane z punktu końcowego HTTP do Azure Blob Storage przy użyciu niestandardowego działania języka C# w usłudze Data Factory.
Uruchom skrypt języka R. Wywołuje skrypt języka R, uruchamiając RScript.exe w klastrze usługi HDInsight, na którym jest już zainstalowany język R.
Działanie platformy .NET między domenami aplikacji Używa różnych wersji zestawów z tych używanych przez moduł uruchamiania usługi Data Factory
Ponowne przetwarzanie modelu w Azure Analysis Services Ponowne przetwarzanie modelu w Azure Analysis Services.