활동 코디네이터 예제 프로젝트

활동 코디네이터에 대한 이 간단한 예제는 API를 활용하여 시스템 조건이 충족될 때 백그라운드에서 모델을 다시 학습하는 방법을 보여 줍니다.

예제 프로젝트 개요

음악 편집 앱의 경우를 살펴보겠습니다. 이 앱에는 클라우드 스토리지에 콘텐츠 게시와 같이 사용자가 요청하는 우선 순위가 높은 백그라운드 작업이 있습니다. 편집하는 동안 컴퍼지션을 개선하기 위한 자동 권장 사항을 제공하는 등 사용자 상호 작용을 지원하는 우선 순위가 낮은 백그라운드 작업도 있습니다. 마지막으로, 사용자의 요청 없이 특정 시간에 발생할 필요가 없는 지연된 작업 집합이 있습니다. 이는 이 예제에서 중점을 줍니다. 특히 사용자 영향이 최소화된 경우 권장 사항 모델을 주기적으로 다시 학습하려고 합니다. 이를 위해 활동 코디네이터 API를 사용할 수 있습니다.

이 시나리오에서는 사용자가 없을 때 모델을 다시 학습하려고 합니다. 이 시나리오의 재학습 워크플로는 GPU 소비자이기도 하므로 GPU를 사용하기에 적절한 시기일 때도 실행하려고 합니다. Activity Coordinator 정책을 사용하여 이러한 요구 사항을 지정할 수 있습니다. 활동 코디네이터 API는 정책을 사용하여 요구 사항이 충족되는 시기를 확인하고 작업 실행을 시작하거나 중지할 시기에 대한 알림을 보냅니다.

이 경우 GOOD 정책 템플릿은 CPU, 메모리, 시스템 디스크, 전원 및 사용자 유휴 상태를 추적하므로 대부분의 요구 사항을 충족합니다. GPU에 대한 조건을 명시적으로 설정하기만 하면됩니다. 워크로드가 주로 GPU를 활용하지만 작업의 실행은 여전히 본질적으로 CPU, 메모리, 디스크 및 전원을 소비한다는 점을 기억해야 합니다. 이러한 리소스에 미치는 영향은 시스템 구성마다 크게 다를 수 있습니다. instance 경우 GPU가 빨라지면 CPU가 GPU에 데이터를 공급하는 데 더 많은 시간을 소비할 수 있으며, 이로 인해 더 많은 데이터가 디스크에 읽거나 저장될 수 있습니다. 이 디스크의 속도도 비슷한 방식으로 CPU 사용량에 영향을 줄 수 있습니다. 영향을 받는 모든 리소스를 구성하여 실수로 사용자 환경을 방해하거나 시스템 성능을 저하하지 않도록 할 수 있습니다. 또한 작업 자체는 작은 청크에서 발생하도록 세분화되었으므로 원하는 조건 외부에서 실행되지 않도록 조정 알림에 적절하게 응답할 수 있습니다.

개발자가 정책을 변경하거나 다운그레이드하는 방법을 보여주기 위해 48시간 이내에 재학습을 완료해야 하는 요구 사항도 추가합니다. 처음 24시간, 소프트 마감일, 우리는 이상적인 정책으로 실행하려고 시도하며, 지난 24시간 동안 우리는 더 작은 정책으로 다운그레이드합니다.

예제 프로젝트 코드

다음 코드는 음악 편집 샘플 애플리케이션입니다. 개요에 설명된 대로 활동 코디네이터 API를 활용하여 백그라운드 작업을 수행합니다.

#include <chrono>
#include <mutex>
#include <condition_variable>
#include <Windows.h>
#include <ActivityCoordinator.h>
#include <wil/resource.h>

// To use ActivityCoordinator, we must link to the OneCoreUAP library.

#pragma comment(lib, "OneCoreUAP.lib")

using namespace std;
using namespace chrono;
using namespace wil;

// Declare RAII wrappers for the Activity Coordinator policy and subscription.
// These behave like traditional smart pointers and will call their associated
// API cleanup functions when they go out of scope.

typedef wil::unique_any<
        ACTIVITY_COORDINATOR_POLICY,
        decltype(&DestroyActivityCoordinatorPolicy),
        DestroyActivityCoordinatorPolicy>
    unique_policy;

typedef wil::unique_any<
        ACTIVITY_COORDINATOR_SUBSCRIPTION,
        decltype(&UnsubscribeActivityCoordinatorPolicy),
        UnsubscribeActivityCoordinatorPolicy>
    unique_subscription;

struct WORKER_CONTEXT {
    mutex ContextLock;
    unique_threadpool_work Worker;
    bool ShouldRun;
    bool IsRunning;
    bool IsComplete;
    std::condition_variable CompletionSignal;
};

_Requires_lock_held_(workerContext->ContextLock)
void
ResumeWorker(
    _In_ WORKER_CONTEXT* workerContext
    )
{
    workerContext->ShouldRun = true;
    if (!workerContext->IsRunning && !workerContext->IsComplete) {

        // No active workers, so start a new one.

        workerContext->IsRunning = true;
        SubmitThreadpoolWork(workerContext->Worker.get());
    }
}

void
DeferredWorkEventCallback(
    _In_ ACTIVITY_COORDINATOR_NOTIFICATION notificationType,
    _In_ void* callbackContext
    )
{
    WORKER_CONTEXT* workerContext = reinterpret_cast<WORKER_CONTEXT*>(callbackContext);

    // Use this callback thread to dispatch notifications to a worker thread
    // about whether or not it should process the next chunk of deferred work.

    // Note: Do not use this thread to perform your activity's workload.

    lock_guard<mutex> scopedLock(workerContext->ContextLock);
    switch (notificationType) {
    case ACTIVITY_COORDINATOR_NOTIFICATION_RUN:

        // Allow deferred work to be processed.

        ResumeWorker(workerContext);

        break;

    case ACTIVITY_COORDINATOR_NOTIFICATION_STOP:

        // Stop processing deferred work.

        workerContext->ShouldRun = false;

        break;

    default:
        FAIL_FAST();
        break;
    }
}

bool
TrainNextModelChunk(
    )
{
    //
    // Returns true if all work is completed, or false if there is more work.
    //

    return false;
}

void
DeferredModelTrainingWorker(
    _Inout_ PTP_CALLBACK_INSTANCE callbackInstance,
    _Inout_opt_ PVOID callbackContext,
    _Inout_ PTP_WORK work
    )
{
    // Threadpool callback instance and work are not needed for this sample.

    UNREFERENCED_PARAMETER(callbackInstance);
    UNREFERENCED_PARAMETER(work);

    WORKER_CONTEXT* workerContext = reinterpret_cast<WORKER_CONTEXT*>(callbackContext);
    bool workComplete = false;

    // Keep processing work until being told to stop or all work has been completed.

    while (true) {
        {
            lock_guard<mutex> scopedLock(workerContext->ContextLock);

            if (workComplete) {
                workerContext->IsComplete = true;
            }

            if (!workerContext->ShouldRun || workerContext->IsComplete) {
                workerContext->IsRunning = false;
                break;
            }
        }

        // TrainNextModelChunk returns true when there is no more work to do.

        workComplete = TrainNextModelChunk();
    }

    workerContext->CompletionSignal.notify_all();
}

int
__cdecl
wmain(
    )
{
    WORKER_CONTEXT workerContext;
    workerContext.ShouldRun = false;
    workerContext.IsRunning = false;
    workerContext.IsComplete = false;

    // Create the worker that will be started by our subscription callback.

    workerContext.Worker.reset(CreateThreadpoolWork(
        DeferredModelTrainingWorker,
        &workerContext,
        nullptr));
    RETURN_LAST_ERROR_IF_NULL(workerContext.Worker);

    // Allocate a policy suited for tasks that are best run when unlikely
    // to cause impact to the user or system performance.

    unique_policy policy;
    RETURN_IF_FAILED(CreateActivityCoordinatorPolicy(
        ACTIVITY_COORDINATOR_POLICY_TEMPLATE_GOOD,
        &policy));

    // The model training in this sample consumes GPU. As per the MSDN docs, the
    // GOOD policy template doesn't currently include the GPU resource. We
    // therefore customize the policy to include good GPU conditions to minimize
    // the impact of running our work.

    RETURN_IF_FAILED(SetActivityCoordinatorPolicyResourceCondition(
        policy.get(),
        ACTIVITY_COORDINATOR_RESOURCE_GPU,
        ACTIVITY_COORDINATOR_CONDITION_GOOD));

    // Subscribe to the policy for coordination notifications.

    unique_subscription subscription;
    RETURN_IF_FAILED(SubscribeActivityCoordinatorPolicy(
        policy.get(),
        DeferredWorkEventCallback,
        &workerContext,
        &subscription));;

    // Destroy the policy because we no longer need it.

    policy.reset();

    // We want our task to complete within 48h, so we allocate 24h under our
    // ideal policy and before falling back to a downgraded policy.

    bool workerCompleted;

    {
        unique_lock<mutex> scopedLock(workerContext.ContextLock);
        workerCompleted = workerContext.CompletionSignal.wait_for(
            scopedLock,
            hours(24),
            [&workerContext] { return workerContext.IsComplete; });
    }

    if (workerCompleted) {

        // Since our work is complete, we should clean up our subscription by
        // unsubscribing. This would normally be handled quietly by our RAII
        // types, but we release them explicitly to demonstrate API flow for
        // developers manually managing resources.

        subscription.reset();
        return S_OK;
    }

    // We passed our soft deadline, so downgrade the policy and wait the
    // remaining 24h until our hard deadline has been reached. Since
    // Subscriptions and policies are independent of each other, we need to
    // create a new subscription with our downgraded policy to receive
    // notifications based on its configuration.
    // 
    // The downgraded policy uses medium conditions for all needed resources.
    // This gives us the best chance to run while helping to prevent us from
    // critically degrading the user experience, which we are more likely to do
    // when falling back to manual execution.

    RETURN_IF_FAILED(CreateActivityCoordinatorPolicy(
        ACTIVITY_COORDINATOR_POLICY_TEMPLATE_MEDIUM,
        &policy));

    RETURN_IF_FAILED(SetActivityCoordinatorPolicyResourceCondition(
        policy.get(),
        ACTIVITY_COORDINATOR_RESOURCE_GPU,
        ACTIVITY_COORDINATOR_CONDITION_MEDIUM));

    subscription.reset();
    RETURN_IF_FAILED(SubscribeActivityCoordinatorPolicy(
        policy.get(),
        DeferredWorkEventCallback,
        &workerContext,
        &subscription));

    {
        unique_lock<mutex> scopedLock(workerContext.ContextLock);
        workerCompleted = workerContext.CompletionSignal.wait_for(
            scopedLock,
            hours(24),
            [&workerContext] { return workerContext.IsComplete; });
    }

    // We passed our deadline, so unsubscribe and manually resume our task.

    subscription.reset();
    ResumeWorker(&workerContext);

    // We destroyed our subscription, so we wait indefinitely for completion as
    // there's nothing to pause execution of our task.

    unique_lock<mutex> scopedLock(workerContext.ContextLock);
    workerContext.CompletionSignal.wait(
        scopedLock,
        [&workerContext] { return workerContext.IsComplete; });

    return S_OK;
}

활동 코디네이터 API 개요

활동 코디네이터 API 및 용어

올바른 활동 코디네이터 정책 선택