Partager via


Exemple de projet du coordinateur d’activité

Cet exemple simple pour le coordinateur d’activité montre comment l’API peut être exploitée pour réentraîner un modèle en arrière-plan lorsque les conditions système sont remplies.

Exemple de vue d’ensemble de projet

Prenons le cas d’une application d’édition musicale. Cette application a des tâches en arrière-plan hautement prioritaires qui permettent de traiter les demandes des utilisateurs, comme la publication de contenu dans le stockage cloud. Il existe également des tâches en arrière-plan de faible priorité qui prennent en charge l’interaction utilisateur, comme la fourniture de recommandations automatiques pour améliorer une composition lors de la modification. Enfin, il existe un ensemble de tâches différées qui n’ont pas besoin de se produire à un moment spécifique sans la demande de l’utilisateur, ce qui est notre priorité dans cet exemple. En particulier, nous aimerions réentraîner régulièrement le modèle de recommandation lorsque l’impact utilisateur est minimal. Pour ce faire, nous pouvons utiliser l’API Du coordinateur d’activité.

Pour ce scénario, nous aimerions réentraîner le modèle lorsque l’utilisateur n’est pas présent. Le flux de travail de réentraînement dans ce scénario est également un consommateur de GPU. Nous voulons donc également exécuter quand il est temps d’utiliser le GPU. Nous pouvons spécifier ces exigences à l’aide de stratégies de coordinateur d’activité. L’API Coordinateur d’activité utilise notre stratégie pour déterminer quand les conditions sont remplies et envoyer des notifications pour quand commencer ou arrêter l’exécution de notre travail.

Dans ce cas, le modèle de stratégie GOOD répond à la plupart de nos besoins, car il effectue le suivi du processeur, de la mémoire, du disque système, de l’alimentation et de l’inactivité de l’utilisateur. Il nous suffit de définir explicitement une condition pour le GPU. Il est important de se rappeler que même si notre charge de travail utilise principalement le GPU, l’exécution de notre activité consomme toujours intrinsèquement du processeur, de la mémoire, du disque et de l’alimentation. Notre impact sur ces ressources peut également varier considérablement d’une configuration système à l’autre. Par instance, un GPU plus rapide peut faire en sorte que le processeur passe plus de temps à alimenter le GPU avec des données, ce qui peut entraîner la lecture ou l’enregistrement d’un plus grand nombre de données sur le disque. La vitesse de ce disque peut également affecter la consommation du processeur de la même manière. En configurant toutes les ressources que nous affectons, nous pouvons être sûrs de ne pas interférer par inadvertance avec l’expérience utilisateur ou dégrader les performances du système. En outre, le travail lui-même a été décomposé pour se produire en petits morceaux, afin que nous puissions répondre adéquatement aux notifications de coordination afin d’éviter de s’exécuter en dehors des conditions souhaitées.

Pour montrer comment les développeurs peuvent modifier ou rétrograder des stratégies, nous ajoutons également l’exigence que nous voulons que le réentraînement se termine dans les 48 heures. Les premières 24 heures, notre échéance molle, nous essayons de fonctionner avec notre stratégie idéale, et les dernières 24 heures, nous rétrogradons à une stratégie inférieure.

Exemple de code de projet

Le code suivant est l’exemple d’application d’édition de musique. Il tire parti de l’API Du coordinateur d’activité pour effectuer des tâches en arrière-plan, comme décrit dans la vue d’ensemble.

#include <chrono>
#include <mutex>
#include <condition_variable>
#include <Windows.h>
#include <ActivityCoordinator.h>
#include <wil/resource.h>

// To use ActivityCoordinator, we must link to the OneCoreUAP library.

#pragma comment(lib, "OneCoreUAP.lib")

using namespace std;
using namespace chrono;
using namespace wil;

// Declare RAII wrappers for the Activity Coordinator policy and subscription.
// These behave like traditional smart pointers and will call their associated
// API cleanup functions when they go out of scope.

typedef wil::unique_any<
        ACTIVITY_COORDINATOR_POLICY,
        decltype(&DestroyActivityCoordinatorPolicy),
        DestroyActivityCoordinatorPolicy>
    unique_policy;

typedef wil::unique_any<
        ACTIVITY_COORDINATOR_SUBSCRIPTION,
        decltype(&UnsubscribeActivityCoordinatorPolicy),
        UnsubscribeActivityCoordinatorPolicy>
    unique_subscription;

struct WORKER_CONTEXT {
    mutex ContextLock;
    unique_threadpool_work Worker;
    bool ShouldRun;
    bool IsRunning;
    bool IsComplete;
    std::condition_variable CompletionSignal;
};

_Requires_lock_held_(workerContext->ContextLock)
void
ResumeWorker(
    _In_ WORKER_CONTEXT* workerContext
    )
{
    workerContext->ShouldRun = true;
    if (!workerContext->IsRunning && !workerContext->IsComplete) {

        // No active workers, so start a new one.

        workerContext->IsRunning = true;
        SubmitThreadpoolWork(workerContext->Worker.get());
    }
}

void
DeferredWorkEventCallback(
    _In_ ACTIVITY_COORDINATOR_NOTIFICATION notificationType,
    _In_ void* callbackContext
    )
{
    WORKER_CONTEXT* workerContext = reinterpret_cast<WORKER_CONTEXT*>(callbackContext);

    // Use this callback thread to dispatch notifications to a worker thread
    // about whether or not it should process the next chunk of deferred work.

    // Note: Do not use this thread to perform your activity's workload.

    lock_guard<mutex> scopedLock(workerContext->ContextLock);
    switch (notificationType) {
    case ACTIVITY_COORDINATOR_NOTIFICATION_RUN:

        // Allow deferred work to be processed.

        ResumeWorker(workerContext);

        break;

    case ACTIVITY_COORDINATOR_NOTIFICATION_STOP:

        // Stop processing deferred work.

        workerContext->ShouldRun = false;

        break;

    default:
        FAIL_FAST();
        break;
    }
}

bool
TrainNextModelChunk(
    )
{
    //
    // Returns true if all work is completed, or false if there is more work.
    //

    return false;
}

void
DeferredModelTrainingWorker(
    _Inout_ PTP_CALLBACK_INSTANCE callbackInstance,
    _Inout_opt_ PVOID callbackContext,
    _Inout_ PTP_WORK work
    )
{
    // Threadpool callback instance and work are not needed for this sample.

    UNREFERENCED_PARAMETER(callbackInstance);
    UNREFERENCED_PARAMETER(work);

    WORKER_CONTEXT* workerContext = reinterpret_cast<WORKER_CONTEXT*>(callbackContext);
    bool workComplete = false;

    // Keep processing work until being told to stop or all work has been completed.

    while (true) {
        {
            lock_guard<mutex> scopedLock(workerContext->ContextLock);

            if (workComplete) {
                workerContext->IsComplete = true;
            }

            if (!workerContext->ShouldRun || workerContext->IsComplete) {
                workerContext->IsRunning = false;
                break;
            }
        }

        // TrainNextModelChunk returns true when there is no more work to do.

        workComplete = TrainNextModelChunk();
    }

    workerContext->CompletionSignal.notify_all();
}

int
__cdecl
wmain(
    )
{
    WORKER_CONTEXT workerContext;
    workerContext.ShouldRun = false;
    workerContext.IsRunning = false;
    workerContext.IsComplete = false;

    // Create the worker that will be started by our subscription callback.

    workerContext.Worker.reset(CreateThreadpoolWork(
        DeferredModelTrainingWorker,
        &workerContext,
        nullptr));
    RETURN_LAST_ERROR_IF_NULL(workerContext.Worker);

    // Allocate a policy suited for tasks that are best run when unlikely
    // to cause impact to the user or system performance.

    unique_policy policy;
    RETURN_IF_FAILED(CreateActivityCoordinatorPolicy(
        ACTIVITY_COORDINATOR_POLICY_TEMPLATE_GOOD,
        &policy));

    // The model training in this sample consumes GPU. As per the MSDN docs, the
    // GOOD policy template doesn't currently include the GPU resource. We
    // therefore customize the policy to include good GPU conditions to minimize
    // the impact of running our work.

    RETURN_IF_FAILED(SetActivityCoordinatorPolicyResourceCondition(
        policy.get(),
        ACTIVITY_COORDINATOR_RESOURCE_GPU,
        ACTIVITY_COORDINATOR_CONDITION_GOOD));

    // Subscribe to the policy for coordination notifications.

    unique_subscription subscription;
    RETURN_IF_FAILED(SubscribeActivityCoordinatorPolicy(
        policy.get(),
        DeferredWorkEventCallback,
        &workerContext,
        &subscription));;

    // Destroy the policy because we no longer need it.

    policy.reset();

    // We want our task to complete within 48h, so we allocate 24h under our
    // ideal policy and before falling back to a downgraded policy.

    bool workerCompleted;

    {
        unique_lock<mutex> scopedLock(workerContext.ContextLock);
        workerCompleted = workerContext.CompletionSignal.wait_for(
            scopedLock,
            hours(24),
            [&workerContext] { return workerContext.IsComplete; });
    }

    if (workerCompleted) {

        // Since our work is complete, we should clean up our subscription by
        // unsubscribing. This would normally be handled quietly by our RAII
        // types, but we release them explicitly to demonstrate API flow for
        // developers manually managing resources.

        subscription.reset();
        return S_OK;
    }

    // We passed our soft deadline, so downgrade the policy and wait the
    // remaining 24h until our hard deadline has been reached. Since
    // Subscriptions and policies are independent of each other, we need to
    // create a new subscription with our downgraded policy to receive
    // notifications based on its configuration.
    // 
    // The downgraded policy uses medium conditions for all needed resources.
    // This gives us the best chance to run while helping to prevent us from
    // critically degrading the user experience, which we are more likely to do
    // when falling back to manual execution.

    RETURN_IF_FAILED(CreateActivityCoordinatorPolicy(
        ACTIVITY_COORDINATOR_POLICY_TEMPLATE_MEDIUM,
        &policy));

    RETURN_IF_FAILED(SetActivityCoordinatorPolicyResourceCondition(
        policy.get(),
        ACTIVITY_COORDINATOR_RESOURCE_GPU,
        ACTIVITY_COORDINATOR_CONDITION_MEDIUM));

    subscription.reset();
    RETURN_IF_FAILED(SubscribeActivityCoordinatorPolicy(
        policy.get(),
        DeferredWorkEventCallback,
        &workerContext,
        &subscription));

    {
        unique_lock<mutex> scopedLock(workerContext.ContextLock);
        workerCompleted = workerContext.CompletionSignal.wait_for(
            scopedLock,
            hours(24),
            [&workerContext] { return workerContext.IsComplete; });
    }

    // We passed our deadline, so unsubscribe and manually resume our task.

    subscription.reset();
    ResumeWorker(&workerContext);

    // We destroyed our subscription, so we wait indefinitely for completion as
    // there's nothing to pause execution of our task.

    unique_lock<mutex> scopedLock(workerContext.ContextLock);
    workerContext.CompletionSignal.wait(
        scopedLock,
        [&workerContext] { return workerContext.IsComplete; });

    return S_OK;
}

Vue d’ensemble de l’API Du coordinateur d’activité

API et terminologie du coordinateur d’activité

Choix de la stratégie de coordinateur d’activité appropriée