Beispielprojekt "Aktivitätskoordinator"

In diesem einfachen Beispiel für den Aktivitätskoordinator wird veranschaulicht, wie die API genutzt werden kann, um ein Modell im Hintergrund erneut zu trainieren, wenn Systembedingungen erfüllt sind.

Beispielprojektübersicht

Betrachten wir den Fall einer Musikbearbeitungs-App. Diese App verfügt über Hintergrundaufgaben mit hoher Priorität, die Benutzeranforderungen verarbeiten, z. B. das Veröffentlichen von Inhalten im Cloudspeicher. Es gibt auch Hintergrundaufgaben mit niedriger Priorität, die die Benutzerinteraktion unterstützen, z. B. die Bereitstellung automatischer Empfehlungen zur Verbesserung einer Komposition während der Bearbeitung. Schließlich gibt es eine Reihe von verzögerten Aufgaben, die nicht zu einem bestimmten Zeitpunkt ohne die Anforderung des Benutzers ausgeführt werden müssen, was in diesem Beispiel unser Schwerpunkt ist. Insbesondere möchten wir das Empfehlungsmodell regelmäßig erneut trainieren, wenn die Auswirkungen auf den Benutzer minimal sind. Dazu können wir die Aktivitätskoordinator-API verwenden.

In diesem Szenario möchten wir das Modell erneut trainieren, wenn der Benutzer nicht anwesend ist. Der Erneuttrainingsworkflow in diesem Szenario ist auch ein GPU-Consumer, daher möchten wir auch ausführen, wenn es ein guter Zeitpunkt ist, die GPU zu verwenden. Wir können diese Anforderungen mithilfe von Aktivitätskoordinatorrichtlinien angeben. Die Aktivitätskoordinator-API verwendet unsere Richtlinie, um zu bestimmen, wann die Anforderungen erfüllt sind, und sendet Benachrichtigungen darüber, wann die Ausführung unserer Arbeit gestartet oder beendet werden soll.

In diesem Fall erfüllt die GOOD-Richtlinienvorlage die meisten unserer Anforderungen, da sie CPU, Arbeitsspeicher, Systemdatenträger, Energie und Benutzer im Leerlauf nachverfolgt. Wir müssen lediglich explizit eine Bedingung für GPU festlegen. Es ist wichtig, daran zu denken, dass, obwohl unsere Workload in erster Linie die GPU nutzt, die Ausführung unserer Aktivität immer noch von Natur aus CPU, Arbeitsspeicher, Datenträger und Energie verbraucht. Unsere Auswirkungen auf diese Ressourcen können auch zwischen den Systemkonfigurationen stark variieren. Für instance kann eine schnellere GPU dazu führen, dass die CPU mehr Zeit damit verbringt, die GPU mit Daten zu versorgen, was dann dazu führen kann, dass mehr Daten gelesen oder auf dem Datenträger gespeichert werden. Die Geschwindigkeit dieses Datenträgers kann sich auch auf ähnliche Weise auf den CPU-Verbrauch auswirken. Durch das Konfigurieren aller ressourcen, auf die wir uns auswirken, können wir sicher sein, dass wir die Benutzererfahrung nicht versehentlich beeinträchtigen oder die Systemleistung beeinträchtigen. Darüber hinaus wurde die Arbeit selbst in kleine Blöcke unterteilt, sodass wir angemessen auf Koordinierungsbenachrichtigungen reagieren können, um zu vermeiden, dass außerhalb der gewünschten Bedingungen ausgeführt wird.

Um zu veranschaulichen, wie Entwickler Richtlinien ändern oder herabstufen können, fügen wir auch die Anforderung hinzu, dass das erneute Training innerhalb von 48 Stunden abgeschlossen werden soll. Die ersten 24 Stunden, unseren weichen Stichtag, versuchen wir, mit unserer idealen Richtlinie zu arbeiten, und die letzten 24 Stunden führen wir zu einer niedrigeren Richtlinie herab.

Beispielprojektcode

Der folgende Code ist die Beispielanwendung für die Musikbearbeitung. Es nutzt die Aktivitätskoordinator-API, um Hintergrundaufgaben auszuführen, wie in der Übersicht beschrieben.

#include <chrono>
#include <mutex>
#include <condition_variable>
#include <Windows.h>
#include <ActivityCoordinator.h>
#include <wil/resource.h>

// To use ActivityCoordinator, we must link to the OneCoreUAP library.

#pragma comment(lib, "OneCoreUAP.lib")

using namespace std;
using namespace chrono;
using namespace wil;

// Declare RAII wrappers for the Activity Coordinator policy and subscription.
// These behave like traditional smart pointers and will call their associated
// API cleanup functions when they go out of scope.

typedef wil::unique_any<
        ACTIVITY_COORDINATOR_POLICY,
        decltype(&DestroyActivityCoordinatorPolicy),
        DestroyActivityCoordinatorPolicy>
    unique_policy;

typedef wil::unique_any<
        ACTIVITY_COORDINATOR_SUBSCRIPTION,
        decltype(&UnsubscribeActivityCoordinatorPolicy),
        UnsubscribeActivityCoordinatorPolicy>
    unique_subscription;

struct WORKER_CONTEXT {
    mutex ContextLock;
    unique_threadpool_work Worker;
    bool ShouldRun;
    bool IsRunning;
    bool IsComplete;
    std::condition_variable CompletionSignal;
};

_Requires_lock_held_(workerContext->ContextLock)
void
ResumeWorker(
    _In_ WORKER_CONTEXT* workerContext
    )
{
    workerContext->ShouldRun = true;
    if (!workerContext->IsRunning && !workerContext->IsComplete) {

        // No active workers, so start a new one.

        workerContext->IsRunning = true;
        SubmitThreadpoolWork(workerContext->Worker.get());
    }
}

void
DeferredWorkEventCallback(
    _In_ ACTIVITY_COORDINATOR_NOTIFICATION notificationType,
    _In_ void* callbackContext
    )
{
    WORKER_CONTEXT* workerContext = reinterpret_cast<WORKER_CONTEXT*>(callbackContext);

    // Use this callback thread to dispatch notifications to a worker thread
    // about whether or not it should process the next chunk of deferred work.

    // Note: Do not use this thread to perform your activity's workload.

    lock_guard<mutex> scopedLock(workerContext->ContextLock);
    switch (notificationType) {
    case ACTIVITY_COORDINATOR_NOTIFICATION_RUN:

        // Allow deferred work to be processed.

        ResumeWorker(workerContext);

        break;

    case ACTIVITY_COORDINATOR_NOTIFICATION_STOP:

        // Stop processing deferred work.

        workerContext->ShouldRun = false;

        break;

    default:
        FAIL_FAST();
        break;
    }
}

bool
TrainNextModelChunk(
    )
{
    //
    // Returns true if all work is completed, or false if there is more work.
    //

    return false;
}

void
DeferredModelTrainingWorker(
    _Inout_ PTP_CALLBACK_INSTANCE callbackInstance,
    _Inout_opt_ PVOID callbackContext,
    _Inout_ PTP_WORK work
    )
{
    // Threadpool callback instance and work are not needed for this sample.

    UNREFERENCED_PARAMETER(callbackInstance);
    UNREFERENCED_PARAMETER(work);

    WORKER_CONTEXT* workerContext = reinterpret_cast<WORKER_CONTEXT*>(callbackContext);
    bool workComplete = false;

    // Keep processing work until being told to stop or all work has been completed.

    while (true) {
        {
            lock_guard<mutex> scopedLock(workerContext->ContextLock);

            if (workComplete) {
                workerContext->IsComplete = true;
            }

            if (!workerContext->ShouldRun || workerContext->IsComplete) {
                workerContext->IsRunning = false;
                break;
            }
        }

        // TrainNextModelChunk returns true when there is no more work to do.

        workComplete = TrainNextModelChunk();
    }

    workerContext->CompletionSignal.notify_all();
}

int
__cdecl
wmain(
    )
{
    WORKER_CONTEXT workerContext;
    workerContext.ShouldRun = false;
    workerContext.IsRunning = false;
    workerContext.IsComplete = false;

    // Create the worker that will be started by our subscription callback.

    workerContext.Worker.reset(CreateThreadpoolWork(
        DeferredModelTrainingWorker,
        &workerContext,
        nullptr));
    RETURN_LAST_ERROR_IF_NULL(workerContext.Worker);

    // Allocate a policy suited for tasks that are best run when unlikely
    // to cause impact to the user or system performance.

    unique_policy policy;
    RETURN_IF_FAILED(CreateActivityCoordinatorPolicy(
        ACTIVITY_COORDINATOR_POLICY_TEMPLATE_GOOD,
        &policy));

    // The model training in this sample consumes GPU. As per the MSDN docs, the
    // GOOD policy template doesn't currently include the GPU resource. We
    // therefore customize the policy to include good GPU conditions to minimize
    // the impact of running our work.

    RETURN_IF_FAILED(SetActivityCoordinatorPolicyResourceCondition(
        policy.get(),
        ACTIVITY_COORDINATOR_RESOURCE_GPU,
        ACTIVITY_COORDINATOR_CONDITION_GOOD));

    // Subscribe to the policy for coordination notifications.

    unique_subscription subscription;
    RETURN_IF_FAILED(SubscribeActivityCoordinatorPolicy(
        policy.get(),
        DeferredWorkEventCallback,
        &workerContext,
        &subscription));;

    // Destroy the policy because we no longer need it.

    policy.reset();

    // We want our task to complete within 48h, so we allocate 24h under our
    // ideal policy and before falling back to a downgraded policy.

    bool workerCompleted;

    {
        unique_lock<mutex> scopedLock(workerContext.ContextLock);
        workerCompleted = workerContext.CompletionSignal.wait_for(
            scopedLock,
            hours(24),
            [&workerContext] { return workerContext.IsComplete; });
    }

    if (workerCompleted) {

        // Since our work is complete, we should clean up our subscription by
        // unsubscribing. This would normally be handled quietly by our RAII
        // types, but we release them explicitly to demonstrate API flow for
        // developers manually managing resources.

        subscription.reset();
        return S_OK;
    }

    // We passed our soft deadline, so downgrade the policy and wait the
    // remaining 24h until our hard deadline has been reached. Since
    // Subscriptions and policies are independent of each other, we need to
    // create a new subscription with our downgraded policy to receive
    // notifications based on its configuration.
    // 
    // The downgraded policy uses medium conditions for all needed resources.
    // This gives us the best chance to run while helping to prevent us from
    // critically degrading the user experience, which we are more likely to do
    // when falling back to manual execution.

    RETURN_IF_FAILED(CreateActivityCoordinatorPolicy(
        ACTIVITY_COORDINATOR_POLICY_TEMPLATE_MEDIUM,
        &policy));

    RETURN_IF_FAILED(SetActivityCoordinatorPolicyResourceCondition(
        policy.get(),
        ACTIVITY_COORDINATOR_RESOURCE_GPU,
        ACTIVITY_COORDINATOR_CONDITION_MEDIUM));

    subscription.reset();
    RETURN_IF_FAILED(SubscribeActivityCoordinatorPolicy(
        policy.get(),
        DeferredWorkEventCallback,
        &workerContext,
        &subscription));

    {
        unique_lock<mutex> scopedLock(workerContext.ContextLock);
        workerCompleted = workerContext.CompletionSignal.wait_for(
            scopedLock,
            hours(24),
            [&workerContext] { return workerContext.IsComplete; });
    }

    // We passed our deadline, so unsubscribe and manually resume our task.

    subscription.reset();
    ResumeWorker(&workerContext);

    // We destroyed our subscription, so we wait indefinitely for completion as
    // there's nothing to pause execution of our task.

    unique_lock<mutex> scopedLock(workerContext.ContextLock);
    workerContext.CompletionSignal.wait(
        scopedLock,
        [&workerContext] { return workerContext.IsComplete; });

    return S_OK;
}

Übersicht über die Aktivitätskoordinator-API

Aktivitätskoordinator-API und Terminologie

Auswählen der richtigen Aktivitätskoordinatorrichtlinie