Szybki start: tworzenie aplikacji Python Durable Functions

Użyj funkcji Durable Functions platformy Azure Functions, aby tworzyć stanowe bezserwerowe przepływy pracy w języku Python. W tym przewodniku Szybki start sklonujesz i uruchomisz przykładową aplikację, która demonstruje dwa typowe wzorce aranżacji:

  • Łańcuch funkcji: sekwencyjnie wywołuje działania (Tokio → Seattle → Londynie).
  • Fan-out/fan-in: Wywołuje działania równolegle w pięciu miastach, a następnie agreguje wyniki.

Na koniec obie orkiestracje będą uruchomione lokalnie z emulatorem Durable Task Scheduler, a ich stan będzie można wyświetlić w pulpicie nawigacyjnym.

  • Sklonuj i przygotuj przykładowy projekt Hello Cities.
  • Skonfiguruj emulator narzędzia Durable Task Scheduler i Azurite na potrzeby programowania lokalnego.
  • Uruchom aplikację funkcji i uruchom obie orkiestracje.
  • Przejrzyj stan orkiestracji i dane wyjściowe na pulpicie nawigacyjnym narzędzia Durable Task Scheduler.

Wymagania wstępne

Konfigurowanie emulatora harmonogramu zadań Durable Task Scheduler

Emulator harmonogramu zadań Durable udostępnia lokalne środowisko programistyczne, dzięki czemu można testować aranżacje bez subskrypcji Azure. Host funkcji wymaga również Azurite do lokalnego magazynu danych.

Uruchom oba kontenery:

docker run -d --name dtsemulator -p 8080:8080 -p 8082:8082 \
  mcr.microsoft.com/dts/dts-emulator:latest

docker run -d --name azurite -p 10000:10000 -p 10001:10001 -p 10002:10002 \
  mcr.microsoft.com/azure-storage/azurite

Wskazówka

Po uruchomieniu emulatora możesz uzyskać dostęp do pulpitu nawigacyjnego narzędzia Durable Task Scheduler pod adresem , http://localhost:8082 aby monitorować aranżacje.

Uruchom przykład szybkiego startu

  1. Przejdź do przykładowego katalogu Hello Cities:

    cd samples/durable-functions/python/hello-cities
    
  2. Utwórz środowisko wirtualne i zainstaluj zależności:

    python -m venv .venv
    .venv\Scripts\activate
    pip install -r requirements.txt
    
  3. Sprawdź, czy local.settings.json plik zawiera następującą konfigurację:

    {
      "IsEncrypted": false,
      "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "python",
        "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None"
      }
    }
    
  4. Uruchom aplikację funkcji:

    func start
    
  5. W osobnym terminalu wyzwól orkiestrację łańcucha funkcji :

    $response = Invoke-RestMethod -Method POST -Uri http://localhost:7071/api/StartChaining
    $response
    

    Odpowiedź zawiera adresy URL stanu wystąpienia orkiestracji. statusQueryGetUri Skopiuj wartość i uruchom ją, aby sprawdzić wynik:

    Invoke-RestMethod -Uri $response.statusQueryGetUri
    
  6. Uruchom orkiestrację fan-out/fan-in:

    $response = Invoke-RestMethod -Method POST -Uri http://localhost:7071/api/StartFanOutFanIn
    Invoke-RestMethod -Uri $response.statusQueryGetUri
    

Oczekiwane dane wyjściowe

Żądanie POST zwraca odpowiedź w formacie JSON z adresami URL statusu. Przykład:

{
  "id": "<instanceId>",
  "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/<instanceId>?code=...",
  "sendEventPostUri": "...",
  "terminatePostUri": "...",
  "purgeHistoryDeleteUri": "..."
}

Gdy wykonasz zapytanie do statusQueryGetUri i gdy element runtimeStatus orkiestracji ma wartość Completed, wyniki powitania znajdziesz w polu output. Orkiestracja łańcuchowa zwraca:

{
  "name": "chaining_orchestration",
  "runtimeStatus": "Completed",
  "output": ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
}

Orkiestracja typu fan-out/fan-in zwraca:

{
  "name": "fan_out_fan_in_orchestration",
  "runtimeStatus": "Completed",
  "output": ["Hello Tokyo!", "Hello Seattle!", "Hello London!", "Hello Paris!", "Hello Berlin!"]
}

Wskazówka

Jeśli runtimeStatus pokazuje Running lub Pending, zaczekaj chwilę i ponownie wykonaj zapytanie do statusQueryGetUri.

Otwórz pulpit narzędzia Durable Task Scheduler pod adresem http://localhost:8082, aby wyświetlić stan orkiestracji i historię wykonywania.

Omówienie kodu

W przykładzie użyto modelu programowania Python w wersji 2 z dekoratorami, gdzie wszystkie funkcje są zdefiniowane w jednym pliku (function_app.py).

Funkcja aktywności

Działanie say_hello przyjmuje nazwę miasta i zwraca powitanie:

@app.activity_trigger(input_name="city")
def say_hello(city: str) -> str:
    """Activity function that returns a greeting for a city."""
    logging.info(f"Saying hello to {city}.")
    return f"Hello {city}!"

Funkcje programu Orchestrator

Orkiestrator łańcuchowy wywołuje say_hello sekwencyjnie dla trzech miast:

@app.orchestration_trigger(context_name="context")
def chaining_orchestration(context: df.DurableOrchestrationContext):
    """Function chaining orchestration: calls activities sequentially."""
    result1 = yield context.call_activity("say_hello", "Tokyo")
    result2 = yield context.call_activity("say_hello", "Seattle")
    result3 = yield context.call_activity("say_hello", "London")
    return [result1, result2, result3]

Orkiestrator fan-out/fan-in harmonogramuje aktywności równolegle:

@app.orchestration_trigger(context_name="context")
def fan_out_fan_in_orchestration(context: df.DurableOrchestrationContext):
    """Fan-out/Fan-in orchestration: calls activities in parallel."""
    cities = ["Tokyo", "Seattle", "London", "Paris", "Berlin"]

    # Fan-out: schedule all activities in parallel
    parallel_tasks = []
    for city in cities:
        task = context.call_activity("say_hello", city)
        parallel_tasks.append(task)

    # Fan-in: wait for all to complete
    results = yield context.task_all(parallel_tasks)
    return results

Funkcje klienta

Funkcje klienta wyzwalane przez protokół HTTP uruchamiają każdą aranżację. Na przykład starter łańcucha:

@app.route(route="StartChaining", methods=["POST"])
@app.durable_client_input(client_name="client")
async def start_chaining(req: func.HttpRequest, client) -> func.HttpResponse:
    """HTTP trigger to start the function chaining orchestration."""
    instance_id = await client.start_new("chaining_orchestration")
    logging.info(f"Started chaining orchestration with ID = '{instance_id}'.")
    return client.create_check_status_response(req, instance_id)

Konfiguracja

W przykładzie jako zaplecze magazynu danych użyto emulatora Durable Task Scheduler. Jest to skonfigurowane w programie host.json:

{
  "version": "2.0",
  "logging": {
    "logLevel": {
      "DurableTask.Core": "Warning"
    }
  },
  "extensions": {
    "durableTask": {
      "hubName": "default",
      "storageProvider": {
        "type": "azureManaged",
        "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING"
      }
    }
  },
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[4.*, 5.0.0)"
  }
}

Uprzątnij zasoby

Zatrzymaj kontenery emulatora, gdy skończysz:

docker stop dtsemulator azurite && docker rm dtsemulator azurite

Aby dezaktywować środowisko wirtualne Python:

deactivate

Następne kroki