Пример проекта координатора действий

В этом простом примере для координатора действий показано, как API можно использовать для переобучения модели в фоновом режиме при выполнении системных условий.

Обзор примера проекта

Рассмотрим случай приложения для редактирования музыки. Это приложение имеет высокоприоритетные фоновые задачи, которые запрашивают пользователей службы, например публикацию содержимого в облачном хранилище. Существуют также фоновые задачи с низким приоритетом, поддерживающие взаимодействие с пользователем, например предоставление автоматических рекомендаций по улучшению композиции во время редактирования. Наконец, существует набор отложенных задач, которые не должны выполняться в определенное время без запроса пользователя, что является нашим фокусом в этом примере. В частности, мы хотели бы периодически переобучить модель рекомендаций при минимальном влиянии пользователя. Для этого можно использовать API координатора действий.

Для этого сценария мы хотели бы переобучить модель, когда пользователь не присутствует. Рабочий процесс переобучения в этом сценарии также является потребителем GPU, поэтому мы также хотим запустить, когда это хорошее время для использования GPU. Эти требования можно указать с помощью политик координатора действий. API координатора действий будет использовать нашу политику для определения соответствия требованиям и отправки уведомлений о том, когда начать или прекратить выполнение нашей работы.

В этом случае шаблон политики GOOD соответствует большинству наших потребностей, так как отслеживает ЦП, память, системный диск, питание и простой пользователя. Нам просто нужно явно задать условие для GPU. Важно помнить, что, хотя наша рабочая нагрузка в основном будет использовать GPU, выполнение нашей деятельности по-прежнему по-прежнему потребляет ЦП, память, диск и мощность. Наше влияние на эти ресурсы также может значительно отличаться между конфигурациями системы. Например, более быстрый GPU может привести к тому, что ЦП тратит больше времени на кормление GPU данными, что может привести к тому, что данные считываются или сохраняются на диске. Скорость этого диска также может повлиять на потребление ЦП аналогичным образом. Настроив все ресурсы, которые мы влияем, мы можем быть уверены, что непреднамеренно вмешиваемся в работу пользователя или ухудшаем производительность системы. Кроме того, сама работа была разбита на небольшие блоки, чтобы мы могли адекватно реагировать на уведомления о координации, чтобы избежать выполнения вне требуемых условий.

Чтобы продемонстрировать, как разработчики могут изменять или уменьшать политики, мы также добавляем требование, которое мы хотим повторно обучить в течение 48 часов. Первые 24 часа, наш мягкий крайний срок, мы пытаемся запустить с нашей идеальной политикой, и последние 24-часов мы понижение до меньшей политики.

Пример кода проекта

Следующий код — это пример приложения для редактирования музыки. Он использует API координатора действий для выполнения фоновых задач, как описано в обзоре.

#include <chrono>
#include <mutex>
#include <condition_variable>
#include <Windows.h>
#include <ActivityCoordinator.h>
#include <wil/resource.h>

// To use ActivityCoordinator, we must link to the OneCoreUAP library.

#pragma comment(lib, "OneCoreUAP.lib")

using namespace std;
using namespace chrono;
using namespace wil;

// Declare RAII wrappers for the Activity Coordinator policy and subscription.
// These behave like traditional smart pointers and will call their associated
// API cleanup functions when they go out of scope.

typedef wil::unique_any<
        ACTIVITY_COORDINATOR_POLICY,
        decltype(&DestroyActivityCoordinatorPolicy),
        DestroyActivityCoordinatorPolicy>
    unique_policy;

typedef wil::unique_any<
        ACTIVITY_COORDINATOR_SUBSCRIPTION,
        decltype(&UnsubscribeActivityCoordinatorPolicy),
        UnsubscribeActivityCoordinatorPolicy>
    unique_subscription;

struct WORKER_CONTEXT {
    mutex ContextLock;
    unique_threadpool_work Worker;
    bool ShouldRun;
    bool IsRunning;
    bool IsComplete;
    std::condition_variable CompletionSignal;
};

_Requires_lock_held_(workerContext->ContextLock)
void
ResumeWorker(
    _In_ WORKER_CONTEXT* workerContext
    )
{
    workerContext->ShouldRun = true;
    if (!workerContext->IsRunning && !workerContext->IsComplete) {

        // No active workers, so start a new one.

        workerContext->IsRunning = true;
        SubmitThreadpoolWork(workerContext->Worker.get());
    }
}

void
DeferredWorkEventCallback(
    _In_ ACTIVITY_COORDINATOR_NOTIFICATION notificationType,
    _In_ void* callbackContext
    )
{
    WORKER_CONTEXT* workerContext = reinterpret_cast<WORKER_CONTEXT*>(callbackContext);

    // Use this callback thread to dispatch notifications to a worker thread
    // about whether or not it should process the next chunk of deferred work.

    // Note: Do not use this thread to perform your activity's workload.

    lock_guard<mutex> scopedLock(workerContext->ContextLock);
    switch (notificationType) {
    case ACTIVITY_COORDINATOR_NOTIFICATION_RUN:

        // Allow deferred work to be processed.

        ResumeWorker(workerContext);

        break;

    case ACTIVITY_COORDINATOR_NOTIFICATION_STOP:

        // Stop processing deferred work.

        workerContext->ShouldRun = false;

        break;

    default:
        FAIL_FAST();
        break;
    }
}

bool
TrainNextModelChunk(
    )
{
    //
    // Returns true if all work is completed, or false if there is more work.
    //

    return false;
}

void
DeferredModelTrainingWorker(
    _Inout_ PTP_CALLBACK_INSTANCE callbackInstance,
    _Inout_opt_ PVOID callbackContext,
    _Inout_ PTP_WORK work
    )
{
    // Threadpool callback instance and work are not needed for this sample.

    UNREFERENCED_PARAMETER(callbackInstance);
    UNREFERENCED_PARAMETER(work);

    WORKER_CONTEXT* workerContext = reinterpret_cast<WORKER_CONTEXT*>(callbackContext);
    bool workComplete = false;

    // Keep processing work until being told to stop or all work has been completed.

    while (true) {
        {
            lock_guard<mutex> scopedLock(workerContext->ContextLock);

            if (workComplete) {
                workerContext->IsComplete = true;
            }

            if (!workerContext->ShouldRun || workerContext->IsComplete) {
                workerContext->IsRunning = false;
                break;
            }
        }

        // TrainNextModelChunk returns true when there is no more work to do.

        workComplete = TrainNextModelChunk();
    }

    workerContext->CompletionSignal.notify_all();
}

int
__cdecl
wmain(
    )
{
    WORKER_CONTEXT workerContext;
    workerContext.ShouldRun = false;
    workerContext.IsRunning = false;
    workerContext.IsComplete = false;

    // Create the worker that will be started by our subscription callback.

    workerContext.Worker.reset(CreateThreadpoolWork(
        DeferredModelTrainingWorker,
        &workerContext,
        nullptr));
    RETURN_LAST_ERROR_IF_NULL(workerContext.Worker);

    // Allocate a policy suited for tasks that are best run when unlikely
    // to cause impact to the user or system performance.

    unique_policy policy;
    RETURN_IF_FAILED(CreateActivityCoordinatorPolicy(
        ACTIVITY_COORDINATOR_POLICY_TEMPLATE_GOOD,
        &policy));

    // The model training in this sample consumes GPU. As per the MSDN docs, the
    // GOOD policy template doesn't currently include the GPU resource. We
    // therefore customize the policy to include good GPU conditions to minimize
    // the impact of running our work.

    RETURN_IF_FAILED(SetActivityCoordinatorPolicyResourceCondition(
        policy.get(),
        ACTIVITY_COORDINATOR_RESOURCE_GPU,
        ACTIVITY_COORDINATOR_CONDITION_GOOD));

    // Subscribe to the policy for coordination notifications.

    unique_subscription subscription;
    RETURN_IF_FAILED(SubscribeActivityCoordinatorPolicy(
        policy.get(),
        DeferredWorkEventCallback,
        &workerContext,
        &subscription));;

    // Destroy the policy because we no longer need it.

    policy.reset();

    // We want our task to complete within 48h, so we allocate 24h under our
    // ideal policy and before falling back to a downgraded policy.

    bool workerCompleted;

    {
        unique_lock<mutex> scopedLock(workerContext.ContextLock);
        workerCompleted = workerContext.CompletionSignal.wait_for(
            scopedLock,
            hours(24),
            [&workerContext] { return workerContext.IsComplete; });
    }

    if (workerCompleted) {

        // Since our work is complete, we should clean up our subscription by
        // unsubscribing. This would normally be handled quietly by our RAII
        // types, but we release them explicitly to demonstrate API flow for
        // developers manually managing resources.

        subscription.reset();
        return S_OK;
    }

    // We passed our soft deadline, so downgrade the policy and wait the
    // remaining 24h until our hard deadline has been reached. Since
    // Subscriptions and policies are independent of each other, we need to
    // create a new subscription with our downgraded policy to receive
    // notifications based on its configuration.
    // 
    // The downgraded policy uses medium conditions for all needed resources.
    // This gives us the best chance to run while helping to prevent us from
    // critically degrading the user experience, which we are more likely to do
    // when falling back to manual execution.

    RETURN_IF_FAILED(CreateActivityCoordinatorPolicy(
        ACTIVITY_COORDINATOR_POLICY_TEMPLATE_MEDIUM,
        &policy));

    RETURN_IF_FAILED(SetActivityCoordinatorPolicyResourceCondition(
        policy.get(),
        ACTIVITY_COORDINATOR_RESOURCE_GPU,
        ACTIVITY_COORDINATOR_CONDITION_MEDIUM));

    subscription.reset();
    RETURN_IF_FAILED(SubscribeActivityCoordinatorPolicy(
        policy.get(),
        DeferredWorkEventCallback,
        &workerContext,
        &subscription));

    {
        unique_lock<mutex> scopedLock(workerContext.ContextLock);
        workerCompleted = workerContext.CompletionSignal.wait_for(
            scopedLock,
            hours(24),
            [&workerContext] { return workerContext.IsComplete; });
    }

    // We passed our deadline, so unsubscribe and manually resume our task.

    subscription.reset();
    ResumeWorker(&workerContext);

    // We destroyed our subscription, so we wait indefinitely for completion as
    // there's nothing to pause execution of our task.

    unique_lock<mutex> scopedLock(workerContext.ContextLock);
    workerContext.CompletionSignal.wait(
        scopedLock,
        [&workerContext] { return workerContext.IsComplete; });

    return S_OK;
}

Общие сведения об API координатора действий

API и терминология координатора действий

Выбор правильной политики координатора действий