Azure Functions 기능인 Durable Functions 사용하여 Python 상태 저장 서버리스 워크플로를 작성합니다. 이 빠른 시작에서는 두 가지 일반적인 오케스트레이션 패턴을 보여 주는 샘플 앱을 복제하고 실행합니다.
- 함수 연결: 순차적으로 활동을 호출합니다(도쿄 → 시애틀 → 런던).
- 팬아웃/팬인: 5개 도시에서 동시에 활동을 호출한 다음 결과를 집계합니다.
결국 지속성 작업 스케줄러 에뮬레이터를 사용하여 두 오케스트레이션을 로컬로 실행하고 대시보드에서 해당 상태를 볼 수 있습니다.
- Hello Cities 샘플 프로젝트를 복제하고 준비합니다.
- 로컬 개발을 위해 지속성 작업 스케줄러 에뮬레이터 및 Azurite를 설정합니다.
- 함수 앱을 실행하고 두 오케스트레이션을 모두 트리거합니다.
- 지속성 작업 스케줄러 대시보드에서 오케스트레이션 상태 및 출력을 검토합니다.
사전 요구 사항
- Python 3.9+ 설치되어 있습니다.
- Azure Functions Core Tools v4 이상.
- 에뮬레이터 및 Azurite를 실행하기 위한 Docker입니다.
- 빠른 시작 샘플을 사용하도록 지속성 작업 스케줄러 GitHub 리포지토리 를 복제합니다.
지속성 작업 스케줄러 에뮬레이터 설정
Durable Task Scheduler 에뮬레이터는 Azure 구독 없이 오케스트레이션을 테스트할 수 있도록 로컬 개발 환경을 제공합니다. 또한 Functions 호스트에는 로컬 스토리지에 Azurite 가 필요합니다.
두 컨테이너를 모두 시작합니다.
docker run -d --name dtsemulator -p 8080:8080 -p 8082:8082 \
mcr.microsoft.com/dts/dts-emulator:latest
docker run -d --name azurite -p 10000:10000 -p 10001:10001 -p 10002:10002 \
mcr.microsoft.com/azure-storage/azurite
Tip
에뮬레이터가 실행되면 지속성 작업 스케줄러 대시보드에 http://localhost:8082 액세스하여 오케스트레이션을 모니터링할 수 있습니다.
빠른 시작 샘플 실행
Hello Cities 샘플 디렉터리로 이동합니다.
cd samples/durable-functions/python/hello-cities가상 환경을 만들고 종속성을 설치합니다.
python -m venv .venv .venv\Scripts\activate pip install -r requirements.txtlocal.settings.json파일에 다음 구성이 포함되어 있는지 확인합니다.{ "IsEncrypted": false, "Values": { "AzureWebJobsStorage": "UseDevelopmentStorage=true", "FUNCTIONS_WORKER_RUNTIME": "python", "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None" } }함수 앱을 시작합니다.
func start별도의 터미널에서 함수 체이닝 오케스트레이션을 실행합니다:
$response = Invoke-RestMethod -Method POST -Uri http://localhost:7071/api/StartChaining $response응답에는 오케스트레이션 인스턴스에 대한 상태 URL이 포함됩니다.
statusQueryGetUri값을 복사하고 실행하여 결과를 확인합니다.Invoke-RestMethod -Uri $response.statusQueryGetUrifan-out/fan-in 오케스트레이션을 시작합니다.
$response = Invoke-RestMethod -Method POST -Uri http://localhost:7071/api/StartFanOutFanIn Invoke-RestMethod -Uri $response.statusQueryGetUri
예상 출력
POST 요청은 상태 URL이 있는 JSON 응답을 반환합니다. 다음은 그 예입니다.
{
"id": "<instanceId>",
"statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/<instanceId>?code=...",
"sendEventPostUri": "...",
"terminatePostUri": "...",
"purgeHistoryDeleteUri": "..."
}
statusQueryGetUri를 쿼리하고 오케스트레이션의 runtimeStatus이 Completed인 경우, output 필드에서 인사 결과를 찾을 수 있습니다. 체이닝 오케스트레이션은 다음을 반환합니다.
{
"name": "chaining_orchestration",
"runtimeStatus": "Completed",
"output": ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
}
팬아웃/팬인 오케스트레이션은 다음을 반환합니다.
{
"name": "fan_out_fan_in_orchestration",
"runtimeStatus": "Completed",
"output": ["Hello Tokyo!", "Hello Seattle!", "Hello London!", "Hello Paris!", "Hello Berlin!"]
}
Tip
runtimeStatus에 Running 또는 Pending가 표시되면 잠시 기다린 후 statusQueryGetUri를 다시 조회합니다.
지속성 작업 스케줄러 대시보드를 http://localhost:8082 열어 오케스트레이션 상태 및 실행 기록을 봅니다.
코드 이해
이 샘플에서는 데코레이터와 함께 Python v2 프로그래밍 모델을 사용합니다. 여기서 모든 함수는 단일 파일(function_app.py)에 정의됩니다.
작업 함수
활동은 say_hello 도시 이름을 사용하고 인사말을 반환합니다.
@app.activity_trigger(input_name="city")
def say_hello(city: str) -> str:
"""Activity function that returns a greeting for a city."""
logging.info(f"Saying hello to {city}.")
return f"Hello {city}!"
오케스트레이터 함수
체이닝 오케스트레이터는 세 도시에 대해 say_hello를 순차적으로 호출합니다.
@app.orchestration_trigger(context_name="context")
def chaining_orchestration(context: df.DurableOrchestrationContext):
"""Function chaining orchestration: calls activities sequentially."""
result1 = yield context.call_activity("say_hello", "Tokyo")
result2 = yield context.call_activity("say_hello", "Seattle")
result3 = yield context.call_activity("say_hello", "London")
return [result1, result2, result3]
팬아웃/팬인 오케스트레이터는 활동을 병렬로 실행하도록 예약합니다:
@app.orchestration_trigger(context_name="context")
def fan_out_fan_in_orchestration(context: df.DurableOrchestrationContext):
"""Fan-out/Fan-in orchestration: calls activities in parallel."""
cities = ["Tokyo", "Seattle", "London", "Paris", "Berlin"]
# Fan-out: schedule all activities in parallel
parallel_tasks = []
for city in cities:
task = context.call_activity("say_hello", city)
parallel_tasks.append(task)
# Fan-in: wait for all to complete
results = yield context.task_all(parallel_tasks)
return results
클라이언트 함수
HTTP 트리거 클라이언트 함수는 각 오케스트레이션을 시작합니다. 예를 들어 체인 시작은 다음과 같습니다.
@app.route(route="StartChaining", methods=["POST"])
@app.durable_client_input(client_name="client")
async def start_chaining(req: func.HttpRequest, client) -> func.HttpResponse:
"""HTTP trigger to start the function chaining orchestration."""
instance_id = await client.start_new("chaining_orchestration")
logging.info(f"Started chaining orchestration with ID = '{instance_id}'.")
return client.create_check_status_response(req, instance_id)
Configuration
이 샘플에서는 지속성 작업 스케줄러 에뮬레이터를 스토리지 백 엔드로 사용합니다. 이는 host.json에서 구성됩니다:
{
"version": "2.0",
"logging": {
"logLevel": {
"DurableTask.Core": "Warning"
}
},
"extensions": {
"durableTask": {
"hubName": "default",
"storageProvider": {
"type": "azureManaged",
"connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING"
}
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[4.*, 5.0.0)"
}
}
자원을 정리하세요
완료되면 에뮬레이터 컨테이너를 중지합니다.
docker stop dtsemulator azurite && docker rm dtsemulator azurite
Python 가상 환경을 비활성화하려면 다음을 수행합니다.
deactivate
다음 단계
- common Durable Functions 앱 패턴 대해 알아봅니다.
- Durable Functions 스토리지 공급자 대해 알아봅니다.