Udostępnij za pośrednictwem


Scenariusz fan-out/fan-in w usłudze Durable Functions — przykład tworzenia kopii zapasowej w chmurze

Fan-out/fan-in odnosi się do wzorca wykonywania wielu funkcji jednocześnie, a następnie wykonywania agregacji na wynikach. W tym artykule wyjaśniono przykład, który używa rozszerzenia Durable Functions do implementowania scenariusza fan-in/fan-out. Przykład jest trwałą funkcją, która tworzy kopię zapasową całej zawartości witryny aplikacji lub jej zawartości w usłudze Azure Storage.

Uwaga

Wersja 4 modelu programowania Node.js dla usługi Azure Functions jest ogólnie dostępna. Nowy model w wersji 4 został zaprojektowany z myślą o bardziej elastycznym i intuicyjnym środowisku dla deweloperów języków JavaScript i TypeScript. Dowiedz się więcej o różnicach między wersjami 3 i v4 w przewodniku migracji.

W poniższych fragmentach kodu javaScript (PM4) oznacza model programowania W wersji 4, nowe środowisko.

Wymagania wstępne

Omówienie scenariusza

W tym przykładzie funkcje przekazują wszystkie pliki w określonym katalogu rekursywnie do magazynu obiektów blob. Zliczają również łączną liczbę przekazanych bajtów.

Można napisać jedną funkcję, która zajmuje się wszystkim. Głównym problemem, który można napotkać, jest skalowalność. Wykonywanie jednej funkcji może być uruchamiane tylko na jednej maszynie wirtualnej, więc przepływność będzie ograniczona przez przepływność tej pojedynczej maszyny wirtualnej. Innym problemem jest niezawodność. Jeśli wystąpił błąd w połowie procesu lub cały proces trwa dłużej niż 5 minut, tworzenie kopii zapasowej może zakończyć się niepowodzeniem w stanie częściowo ukończonym. Następnie należy go ponownie uruchomić.

Bardziej niezawodną metodą byłoby zapisanie dwóch regularnych funkcji: jeden wylicza pliki i dodaje nazwy plików do kolejki, a drugi odczytuje z kolejki i przekazuje pliki do magazynu obiektów blob. Takie podejście jest lepsze pod względem przepływności i niezawodności, ale wymaga aprowizacji kolejki i zarządzania nią. Co ważniejsze, znacząca złożoność jest wprowadzana w zakresie zarządzania stanem i koordynacji , jeśli chcesz zrobić coś więcej, na przykład zgłosić łączną liczbę przekazanych bajtów.

Podejście Durable Functions zapewnia wszystkie wymienione korzyści z bardzo niskim obciążeniem.

Funkcje

W tym artykule opisano następujące funkcje w przykładowej aplikacji:

  • E2_BackupSiteContent: Funkcja orkiestratora, która wywołuje E2_GetFileList metodę uzyskiwania listy plików do utworzenia kopii zapasowej, a następnie wywołuje E2_CopyFileToBlob kopię zapasową każdego pliku.
  • E2_GetFileList: funkcja działania zwracająca listę plików w katalogu.
  • E2_CopyFileToBlob: funkcja działania, która tworzy kopię zapasową pojedynczego pliku w usłudze Azure Blob Storage.

funkcja orkiestratora E2_BackupSiteContent

Ta funkcja orkiestratora zasadniczo wykonuje następujące czynności:

  1. rootDirectory Przyjmuje wartość jako parametr wejściowy.
  2. Wywołuje funkcję , aby uzyskać cykliczną listę plików w obszarze rootDirectory.
  3. Wykonuje wiele równoległych wywołań funkcji w celu przekazania każdego pliku do usługi Azure Blob Storage.
  4. Czeka na ukończenie wszystkich operacji przekazywania.
  5. Zwraca sumę całkowitą bajtów przekazanych do usługi Azure Blob Storage.

Oto kod implementujący funkcję orkiestratora:

[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
    string rootDirectory = backupContext.GetInput<string>()?.Trim();
    if (string.IsNullOrEmpty(rootDirectory))
    {
        rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
    }

    string[] files = await backupContext.CallActivityAsync<string[]>(
        "E2_GetFileList",
        rootDirectory);

    var tasks = new Task<long>[files.Length];
    for (int i = 0; i < files.Length; i++)
    {
        tasks[i] = backupContext.CallActivityAsync<long>(
            "E2_CopyFileToBlob",
            files[i]);
    }

    await Task.WhenAll(tasks);

    long totalBytes = tasks.Sum(t => t.Result);
    return totalBytes;
}

Zwróć uwagę na await Task.WhenAll(tasks); wiersz. Wszystkie poszczególne wywołania E2_CopyFileToBlob funkcji nie były oczekiwane, co umożliwia ich równoległe uruchamianie. Gdy przekażemy tę tablicę zadań do programu , wrócimy do Task.WhenAllzadania, które nie zostanie ukończone, dopóki wszystkie operacje kopiowania nie zostaną ukończone. Jeśli znasz bibliotekę równoległą zadań (TPL) na platformie .NET, to nie jest to dla Ciebie nowość. Różnica polega na tym, że te zadania mogą być uruchomione na wielu maszynach wirtualnych jednocześnie, a rozszerzenie Durable Functions gwarantuje, że kompleksowe wykonywanie jest odporne na odtwarzanie procesów.

Po oczekiwaniu na polecenie Task.WhenAllwiemy, że wszystkie wywołania funkcji zostały ukończone i zwróciły do nas wartości. Każde wywołanie funkcji zwraca E2_CopyFileToBlob liczbę przekazanych bajtów, dlatego obliczenie łącznej liczby bajtów jest kwestią dodania wszystkich tych zwracanych wartości.

Funkcje działania pomocnika

Funkcje działania pomocnika, podobnie jak w przypadku innych przykładów, są tylko zwykłymi funkcjami korzystającymi z activityTrigger powiązania wyzwalacza.

funkcja działania E2_GetFileList

[FunctionName("E2_GetFileList")]
public static string[] GetFileList(
    [ActivityTrigger] string rootDirectory, 
    ILogger log)
{
    log.LogInformation($"Searching for files under '{rootDirectory}'...");
    string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
    log.LogInformation($"Found {files.Length} file(s) under {rootDirectory}.");

    return files;
}

Uwaga

Być może zastanawiasz się, dlaczego nie można po prostu umieścić tego kodu bezpośrednio w funkcji orkiestratora. Można jednak przerwać jedną z podstawowych reguł funkcji orkiestratora, która polega na tym, że nigdy nie powinny wykonywać operacji we/wy, w tym lokalnego dostępu do systemu plików. Aby uzyskać więcej informacji, zobacz Ograniczenia kodu funkcji programu Orchestrator.

funkcja działania E2_CopyFileToBlob

[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
    [ActivityTrigger] string filePath,
    Binder binder,
    ILogger log)
{
    long byteCount = new FileInfo(filePath).Length;

    // strip the drive letter prefix and convert to forward slashes
    string blobPath = filePath
        .Substring(Path.GetPathRoot(filePath).Length)
        .Replace('\\', '/');
    string outputLocation = $"backups/{blobPath}";

    log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");

    // copy the file contents into a blob
    using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
    using (Stream destination = await binder.BindAsync<CloudBlobStream>(
        new BlobAttribute(outputLocation, FileAccess.Write)))
    {
        await source.CopyToAsync(destination);
    }

    return byteCount;
}

Uwaga

Aby uruchomić przykładowy kod, należy zainstalować Microsoft.Azure.WebJobs.Extensions.Storage pakiet NuGet.

Funkcja używa niektórych zaawansowanych funkcji powiązań usługi Azure Functions (czyli użycia parametruBinder), ale nie musisz martwić się o te szczegóły na potrzeby tego przewodnika.

Implementacja ładuje plik z dysku i asynchronicznie przesyła strumieniowo zawartość do obiektu blob o tej samej nazwie w kontenerze "kopie zapasowe". Wartość zwracana jest liczbą bajtów skopiowanych do magazynu, która jest następnie używana przez funkcję orkiestratora do obliczenia sumy agregującej.

Uwaga

Jest to doskonały przykład przenoszenia operacji we/wy do activityTrigger funkcji. Praca może być dystrybuowana nie tylko na wielu różnych maszynach, ale także korzyści wynikające z tworzenia punktów kontrolnych postępu. Jeśli proces hosta zostanie przerwany z jakiegokolwiek powodu, wiesz, które przekazywanie zostało już ukończone.

Uruchamianie aplikacji przykładowej

Orkiestrację można uruchomić w systemie Windows, wysyłając następujące żądanie HTTP POST.

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"D:\\home\\LogFiles"

Alternatywnie w aplikacji funkcji systemu Linux (język Python działa obecnie tylko w systemie Linux dla usługi App Service), możesz uruchomić aranżację w następujący sposób:

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"/home/site/wwwroot"

Uwaga

Wywoływana HttpStart funkcja działa tylko z zawartością sformatowaną w formacie JSON. Z tego powodu Content-Type: application/json nagłówek jest wymagany, a ścieżka katalogu jest kodowana jako ciąg JSON. Ponadto fragment kodu HTTP zakłada, że w host.json pliku znajduje się wpis, który usuwa domyślny api/ prefiks ze wszystkich adresów URL funkcji wyzwalacza HTTP. Znaczniki dla tej konfiguracji można znaleźć w pliku w host.json przykładach.

To żądanie HTTP wyzwala E2_BackupSiteContent koordynatora i przekazuje ciąg D:\home\LogFiles jako parametr. Odpowiedź zawiera link umożliwiający uzyskanie stanu operacji tworzenia kopii zapasowej:

HTTP/1.1 202 Accepted
Content-Length: 719
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

(...trimmed...)

W zależności od liczby plików dziennika w aplikacji funkcji wykonanie tej operacji może potrwać kilka minut. Najnowszy stan można uzyskać, wysyłając zapytanie o adres URL w Location nagłówku poprzedniej odpowiedzi HTTP 202.

GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
HTTP/1.1 202 Accepted
Content-Length: 148
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

{"runtimeStatus":"Running","input":"D:\\home\\LogFiles","output":null,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:16Z"}

W tym przypadku funkcja jest nadal uruchomiona. Możesz zobaczyć dane wejściowe zapisane w stanie orkiestratora i czas ostatniej aktualizacji. Możesz nadal używać wartości nagłówka do sondowania pod kątem Location ukończenia. Gdy stan to "Ukończono", zostanie wyświetlona wartość odpowiedzi HTTP podobna do następującej:

HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8

{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}

Teraz możesz zobaczyć, że aranżacja jest ukończona i w przybliżeniu ile czasu zajęło ukończenie. Zostanie również wyświetlona wartość output pola, która wskazuje, że przekazano około 450 KB dzienników.

Następne kroki

W tym przykładzie pokazano, jak zaimplementować wzorzec fan-out/fan-in. W następnym przykładzie pokazano, jak zaimplementować wzorzec monitora przy użyciu trwałych czasomierzy.