Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Contoh sederhana untuk Koordinator Aktivitas ini menunjukkan bagaimana API dapat dimanfaatkan untuk melatih kembali model di latar belakang saat kondisi sistem terpenuhi.
Contoh gambaran umum proyek
Mari kita pertimbangkan kasus aplikasi pengeditan musik. Aplikasi ini memiliki tugas latar belakang prioritas tinggi yang diminta pengguna layanan, seperti menerbitkan konten ke penyimpanan cloud. Ada juga tugas latar belakang berprioritas rendah yang mendukung interaksi pengguna, seperti memberikan rekomendasi otomatis untuk meningkatkan komposisi saat mengedit. Terakhir, ada serangkaian tugas yang ditangguhkan yang tidak perlu terjadi pada waktu tertentu tanpa permintaan pengguna, yang merupakan fokus kami dalam contoh ini. Secara khusus, kami ingin melatakan kembali model rekomendasi secara berkala ketika dampak pengguna minimal. Kita dapat menggunakan API Koordinator Aktivitas untuk mencapai hal ini.
Untuk skenario ini, kami ingin melatakan kembali model saat pengguna tidak ada. Alur kerja pelatihan ulang dalam skenario ini juga merupakan konsumen GPU, jadi kami juga ingin menjalankan ketika ini adalah waktu yang tepat untuk menggunakan GPU. Kami dapat menentukan persyaratan ini menggunakan kebijakan Koordinator Aktivitas. API Koordinator Aktivitas akan menggunakan kebijakan kami untuk menentukan kapan persyaratan terpenuhi dan mengirim pemberitahuan kapan harus memulai atau berhenti menjalankan pekerjaan kami.
Dalam hal ini, templat kebijakan GOOD memenuhi sebagian besar kebutuhan kita karena melacak CPU, memori, disk sistem, daya, dan user-idle. Kita hanya perlu secara eksplisit menetapkan kondisi untuk GPU. Penting untuk diingat bahwa meskipun beban kerja kami terutama akan menggunakan GPU, eksekusi aktivitas kami masih secara inheren menggunakan CPU, memori, disk, dan daya. Dampak kami terhadap sumber daya ini juga dapat sangat bervariasi di antara konfigurasi sistem. Misalnya, GPU yang lebih cepat dapat mengakibatkan CPU menghabiskan lebih banyak waktu memberi makan GPU dengan data, yang kemudian dapat mengakibatkan lebih banyak data dibaca atau disimpan ke disk. Kecepatan disk ini juga dapat memengaruhi konsumsi CPU dengan cara yang sama. Dengan mengonfigurasi semua sumber daya yang kami pengaruhi, kami dapat memastikan bahwa kami tidak secara tidak sengaja mengganggu pengalaman pengguna atau menurunkan performa sistem. Selain itu, pekerjaan itu sendiri telah dipecah terjadi dalam gugus kecil, sehingga kita dapat menanggapi pemberitahuan koordinasi secara memadai untuk menghindari berjalan di luar kondisi yang diinginkan.
Untuk menunjukkan bagaimana pengembang dapat mengubah atau menurunkan kebijakan, kami juga menambahkan persyaratan yang ingin kami latih ulang untuk diselesaikan dalam waktu 48 jam. 24 jam pertama, tenggat waktu sementara kami, kami mencoba menjalankan dengan kebijakan ideal kami, dan 24 jam terakhir kami menurunkan ke kebijakan yang lebih rendah.
Contoh kode proyek
Kode berikut adalah aplikasi sampel pengeditan musik. Ini memanfaatkan API Koordinator Aktivitas untuk melakukan tugas latar belakang, seperti yang dijelaskan dalam gambaran umum.
#include <chrono>
#include <mutex>
#include <condition_variable>
#include <Windows.h>
#include <ActivityCoordinator.h>
#include <wil/resource.h>
// To use ActivityCoordinator, we must link to the OneCoreUAP library.
#pragma comment(lib, "OneCoreUAP.lib")
using namespace std;
using namespace chrono;
using namespace wil;
// Declare RAII wrappers for the Activity Coordinator policy and subscription.
// These behave like traditional smart pointers and will call their associated
// API cleanup functions when they go out of scope.
typedef wil::unique_any<
ACTIVITY_COORDINATOR_POLICY,
decltype(&DestroyActivityCoordinatorPolicy),
DestroyActivityCoordinatorPolicy>
unique_policy;
typedef wil::unique_any<
ACTIVITY_COORDINATOR_SUBSCRIPTION,
decltype(&UnsubscribeActivityCoordinatorPolicy),
UnsubscribeActivityCoordinatorPolicy>
unique_subscription;
struct WORKER_CONTEXT {
mutex ContextLock;
unique_threadpool_work Worker;
bool ShouldRun;
bool IsRunning;
bool IsComplete;
std::condition_variable CompletionSignal;
};
_Requires_lock_held_(workerContext->ContextLock)
void
ResumeWorker(
_In_ WORKER_CONTEXT* workerContext
)
{
workerContext->ShouldRun = true;
if (!workerContext->IsRunning && !workerContext->IsComplete) {
// No active workers, so start a new one.
workerContext->IsRunning = true;
SubmitThreadpoolWork(workerContext->Worker.get());
}
}
void
DeferredWorkEventCallback(
_In_ ACTIVITY_COORDINATOR_NOTIFICATION notificationType,
_In_ void* callbackContext
)
{
WORKER_CONTEXT* workerContext = reinterpret_cast<WORKER_CONTEXT*>(callbackContext);
// Use this callback thread to dispatch notifications to a worker thread
// about whether or not it should process the next chunk of deferred work.
// Note: Do not use this thread to perform your activity's workload.
lock_guard<mutex> scopedLock(workerContext->ContextLock);
switch (notificationType) {
case ACTIVITY_COORDINATOR_NOTIFICATION_RUN:
// Allow deferred work to be processed.
ResumeWorker(workerContext);
break;
case ACTIVITY_COORDINATOR_NOTIFICATION_STOP:
// Stop processing deferred work.
workerContext->ShouldRun = false;
break;
default:
FAIL_FAST();
break;
}
}
bool
TrainNextModelChunk(
)
{
//
// Returns true if all work is completed, or false if there is more work.
//
return false;
}
void
DeferredModelTrainingWorker(
_Inout_ PTP_CALLBACK_INSTANCE callbackInstance,
_Inout_opt_ PVOID callbackContext,
_Inout_ PTP_WORK work
)
{
// Threadpool callback instance and work are not needed for this sample.
UNREFERENCED_PARAMETER(callbackInstance);
UNREFERENCED_PARAMETER(work);
WORKER_CONTEXT* workerContext = reinterpret_cast<WORKER_CONTEXT*>(callbackContext);
bool workComplete = false;
// Keep processing work until being told to stop or all work has been completed.
while (true) {
{
lock_guard<mutex> scopedLock(workerContext->ContextLock);
if (workComplete) {
workerContext->IsComplete = true;
}
if (!workerContext->ShouldRun || workerContext->IsComplete) {
workerContext->IsRunning = false;
break;
}
}
// TrainNextModelChunk returns true when there is no more work to do.
workComplete = TrainNextModelChunk();
}
workerContext->CompletionSignal.notify_all();
}
int
__cdecl
wmain(
)
{
WORKER_CONTEXT workerContext;
workerContext.ShouldRun = false;
workerContext.IsRunning = false;
workerContext.IsComplete = false;
// Create the worker that will be started by our subscription callback.
workerContext.Worker.reset(CreateThreadpoolWork(
DeferredModelTrainingWorker,
&workerContext,
nullptr));
RETURN_LAST_ERROR_IF_NULL(workerContext.Worker);
// Allocate a policy suited for tasks that are best run when unlikely
// to cause impact to the user or system performance.
unique_policy policy;
RETURN_IF_FAILED(CreateActivityCoordinatorPolicy(
ACTIVITY_COORDINATOR_POLICY_TEMPLATE_GOOD,
&policy));
// The model training in this sample consumes GPU.
// The GOOD policy template doesn't currently include the GPU resource. We
// therefore customize the policy to include good GPU conditions to minimize
// the impact of running our work.
RETURN_IF_FAILED(SetActivityCoordinatorPolicyResourceCondition(
policy.get(),
ACTIVITY_COORDINATOR_RESOURCE_GPU,
ACTIVITY_COORDINATOR_CONDITION_GOOD));
// Subscribe to the policy for coordination notifications.
unique_subscription subscription;
RETURN_IF_FAILED(SubscribeActivityCoordinatorPolicy(
policy.get(),
DeferredWorkEventCallback,
&workerContext,
&subscription));
// Destroy the policy because we no longer need it.
policy.reset();
// We want our task to complete within 48h, so we allocate 24h under our
// ideal policy and before falling back to a downgraded policy.
bool workerCompleted;
{
unique_lock<mutex> scopedLock(workerContext.ContextLock);
workerCompleted = workerContext.CompletionSignal.wait_for(
scopedLock,
hours(24),
[&workerContext] { return workerContext.IsComplete; });
}
if (workerCompleted) {
// Since our work is complete, we should clean up our subscription by
// unsubscribing. This would normally be handled quietly by our RAII
// types, but we release them explicitly to demonstrate API flow for
// developers manually managing resources.
subscription.reset();
return S_OK;
}
// We passed our soft deadline, so downgrade the policy and wait the
// remaining 24h until our hard deadline has been reached. Since
// Subscriptions and policies are independent of each other, we need to
// create a new subscription with our downgraded policy to receive
// notifications based on its configuration.
//
// The downgraded policy uses medium conditions for all needed resources.
// This gives us the best chance to run while helping to prevent us from
// critically degrading the user experience, which we are more likely to do
// when falling back to manual execution.
RETURN_IF_FAILED(CreateActivityCoordinatorPolicy(
ACTIVITY_COORDINATOR_POLICY_TEMPLATE_MEDIUM,
&policy));
RETURN_IF_FAILED(SetActivityCoordinatorPolicyResourceCondition(
policy.get(),
ACTIVITY_COORDINATOR_RESOURCE_GPU,
ACTIVITY_COORDINATOR_CONDITION_MEDIUM));
subscription.reset();
RETURN_IF_FAILED(SubscribeActivityCoordinatorPolicy(
policy.get(),
DeferredWorkEventCallback,
&workerContext,
&subscription));
{
unique_lock<mutex> scopedLock(workerContext.ContextLock);
workerCompleted = workerContext.CompletionSignal.wait_for(
scopedLock,
hours(24),
[&workerContext] { return workerContext.IsComplete; });
}
// We passed our deadline, so unsubscribe and manually resume our task.
subscription.reset();
ResumeWorker(&workerContext);
// We destroyed our subscription, so we wait indefinitely for completion as
// there's nothing to pause execution of our task.
unique_lock<mutex> scopedLock(workerContext.ContextLock);
workerContext.CompletionSignal.wait(
scopedLock,
[&workerContext] { return workerContext.IsComplete; });
return S_OK;
}
Topik terkait
Gambaran umum API Koordinator Aktivitas