Ukázkový projekt koordinátoru aktivit
Tento jednoduchý příklad pro koordinátora aktivit ukazuje, jak lze rozhraní API využít k opětovnému trénování modelu na pozadí při splnění systémových podmínek.
Přehled ukázkového projektu
Podívejme se na případ aplikace pro úpravy hudby. Tato aplikace má úlohy na pozadí s vysokou prioritou, které obsluhuje požadavky uživatelů, jako je publikování obsahu do cloudového úložiště. Existují také úlohy na pozadí s nízkou prioritou, které podporují interakci uživatelů, jako je poskytování automatických doporučení ke zlepšení složení při úpravách. Kromě toho existuje sada odložených úkolů, které se nemusí provádět v konkrétním čase bez požadavku uživatele, což je v tomto příkladu naším cílem. Konkrétně bychom chtěli model doporučení pravidelně vytrénovat, pokud je dopad na uživatele minimální. K tomu můžeme použít rozhraní API koordinátora aktivit.
V tomto scénáři bychom chtěli model znovu vytrénovat, když uživatel není k dispozici. Pracovní postup opětovného vytrénování v tomto scénáři je také příjemcem GPU, takže chceme také spustit, když je vhodný čas použít GPU. Tyto požadavky můžeme určit pomocí zásad koordinátoru aktivit. Rozhraní API koordinátoru aktivit použije naši zásadu k určení, kdy jsou splněny požadavky, a odešle oznámení, kdy se má naše práce spustit nebo zastavit.
V tomto případě šablona zásad GOOD splňuje většinu našich potřeb, protože sleduje procesor, paměť, systémový disk, napájení a nečinnost uživatele. Jednoduše potřebujeme explicitně nastavit podmínku pro GPU. Je důležité si uvědomit, že i když naše úloha primárně využívá GPU, provádění naší aktivity stále ze své podstaty spotřebovává procesor, paměť, disk a napájení. Náš dopad na tyto prostředky se také může výrazně lišit mezi konfiguracemi systému. Například rychlejší GPU může vést k tomu, že procesor stráví více času podáváním GPU daty, což pak může vést k tomu, že se na disk čte nebo ukládá více dat. Rychlost tohoto disku může mít vliv také na spotřebu procesoru podobným způsobem. Konfigurací všech prostředků, které ovlivňujeme, můžeme mít jistotu, že neúmyslně nenarušíme uživatelské prostředí nebo snížíme výkon systému. Kromě toho byla samotná práce rozdělena tak, aby probíhala v malých blocích, abychom mohli odpovídajícím způsobem reagovat na koordinační oznámení, abychom se vyhnuli provozu mimo požadované podmínky.
Abychom ukázali, jak můžou vývojáři změnit nebo downgradovat zásady, přidáme také požadavek, který chceme znovu vytrénovat do 48 hodin. Prvních 24 hodin, náš měkký termín, pokusíme se spustit s naší ideální zásadou a posledních 24 hodin jsme downgradovali na menší politiku.
Příklad kódu projektu
Následující kód je ukázková aplikace pro úpravy hudby. Využívá rozhraní API koordinátora aktivit k provádění úloh na pozadí, jak je popsáno v přehledu.
#include <chrono>
#include <mutex>
#include <condition_variable>
#include <Windows.h>
#include <ActivityCoordinator.h>
#include <wil/resource.h>
// To use ActivityCoordinator, we must link to the OneCoreUAP library.
#pragma comment(lib, "OneCoreUAP.lib")
using namespace std;
using namespace chrono;
using namespace wil;
// Declare RAII wrappers for the Activity Coordinator policy and subscription.
// These behave like traditional smart pointers and will call their associated
// API cleanup functions when they go out of scope.
typedef wil::unique_any<
ACTIVITY_COORDINATOR_POLICY,
decltype(&DestroyActivityCoordinatorPolicy),
DestroyActivityCoordinatorPolicy>
unique_policy;
typedef wil::unique_any<
ACTIVITY_COORDINATOR_SUBSCRIPTION,
decltype(&UnsubscribeActivityCoordinatorPolicy),
UnsubscribeActivityCoordinatorPolicy>
unique_subscription;
struct WORKER_CONTEXT {
mutex ContextLock;
unique_threadpool_work Worker;
bool ShouldRun;
bool IsRunning;
bool IsComplete;
std::condition_variable CompletionSignal;
};
_Requires_lock_held_(workerContext->ContextLock)
void
ResumeWorker(
_In_ WORKER_CONTEXT* workerContext
)
{
workerContext->ShouldRun = true;
if (!workerContext->IsRunning && !workerContext->IsComplete) {
// No active workers, so start a new one.
workerContext->IsRunning = true;
SubmitThreadpoolWork(workerContext->Worker.get());
}
}
void
DeferredWorkEventCallback(
_In_ ACTIVITY_COORDINATOR_NOTIFICATION notificationType,
_In_ void* callbackContext
)
{
WORKER_CONTEXT* workerContext = reinterpret_cast<WORKER_CONTEXT*>(callbackContext);
// Use this callback thread to dispatch notifications to a worker thread
// about whether or not it should process the next chunk of deferred work.
// Note: Do not use this thread to perform your activity's workload.
lock_guard<mutex> scopedLock(workerContext->ContextLock);
switch (notificationType) {
case ACTIVITY_COORDINATOR_NOTIFICATION_RUN:
// Allow deferred work to be processed.
ResumeWorker(workerContext);
break;
case ACTIVITY_COORDINATOR_NOTIFICATION_STOP:
// Stop processing deferred work.
workerContext->ShouldRun = false;
break;
default:
FAIL_FAST();
break;
}
}
bool
TrainNextModelChunk(
)
{
//
// Returns true if all work is completed, or false if there is more work.
//
return false;
}
void
DeferredModelTrainingWorker(
_Inout_ PTP_CALLBACK_INSTANCE callbackInstance,
_Inout_opt_ PVOID callbackContext,
_Inout_ PTP_WORK work
)
{
// Threadpool callback instance and work are not needed for this sample.
UNREFERENCED_PARAMETER(callbackInstance);
UNREFERENCED_PARAMETER(work);
WORKER_CONTEXT* workerContext = reinterpret_cast<WORKER_CONTEXT*>(callbackContext);
bool workComplete = false;
// Keep processing work until being told to stop or all work has been completed.
while (true) {
{
lock_guard<mutex> scopedLock(workerContext->ContextLock);
if (workComplete) {
workerContext->IsComplete = true;
}
if (!workerContext->ShouldRun || workerContext->IsComplete) {
workerContext->IsRunning = false;
break;
}
}
// TrainNextModelChunk returns true when there is no more work to do.
workComplete = TrainNextModelChunk();
}
workerContext->CompletionSignal.notify_all();
}
int
__cdecl
wmain(
)
{
WORKER_CONTEXT workerContext;
workerContext.ShouldRun = false;
workerContext.IsRunning = false;
workerContext.IsComplete = false;
// Create the worker that will be started by our subscription callback.
workerContext.Worker.reset(CreateThreadpoolWork(
DeferredModelTrainingWorker,
&workerContext,
nullptr));
RETURN_LAST_ERROR_IF_NULL(workerContext.Worker);
// Allocate a policy suited for tasks that are best run when unlikely
// to cause impact to the user or system performance.
unique_policy policy;
RETURN_IF_FAILED(CreateActivityCoordinatorPolicy(
ACTIVITY_COORDINATOR_POLICY_TEMPLATE_GOOD,
&policy));
// The model training in this sample consumes GPU.
// The GOOD policy template doesn't currently include the GPU resource. We
// therefore customize the policy to include good GPU conditions to minimize
// the impact of running our work.
RETURN_IF_FAILED(SetActivityCoordinatorPolicyResourceCondition(
policy.get(),
ACTIVITY_COORDINATOR_RESOURCE_GPU,
ACTIVITY_COORDINATOR_CONDITION_GOOD));
// Subscribe to the policy for coordination notifications.
unique_subscription subscription;
RETURN_IF_FAILED(SubscribeActivityCoordinatorPolicy(
policy.get(),
DeferredWorkEventCallback,
&workerContext,
&subscription));
// Destroy the policy because we no longer need it.
policy.reset();
// We want our task to complete within 48h, so we allocate 24h under our
// ideal policy and before falling back to a downgraded policy.
bool workerCompleted;
{
unique_lock<mutex> scopedLock(workerContext.ContextLock);
workerCompleted = workerContext.CompletionSignal.wait_for(
scopedLock,
hours(24),
[&workerContext] { return workerContext.IsComplete; });
}
if (workerCompleted) {
// Since our work is complete, we should clean up our subscription by
// unsubscribing. This would normally be handled quietly by our RAII
// types, but we release them explicitly to demonstrate API flow for
// developers manually managing resources.
subscription.reset();
return S_OK;
}
// We passed our soft deadline, so downgrade the policy and wait the
// remaining 24h until our hard deadline has been reached. Since
// Subscriptions and policies are independent of each other, we need to
// create a new subscription with our downgraded policy to receive
// notifications based on its configuration.
//
// The downgraded policy uses medium conditions for all needed resources.
// This gives us the best chance to run while helping to prevent us from
// critically degrading the user experience, which we are more likely to do
// when falling back to manual execution.
RETURN_IF_FAILED(CreateActivityCoordinatorPolicy(
ACTIVITY_COORDINATOR_POLICY_TEMPLATE_MEDIUM,
&policy));
RETURN_IF_FAILED(SetActivityCoordinatorPolicyResourceCondition(
policy.get(),
ACTIVITY_COORDINATOR_RESOURCE_GPU,
ACTIVITY_COORDINATOR_CONDITION_MEDIUM));
subscription.reset();
RETURN_IF_FAILED(SubscribeActivityCoordinatorPolicy(
policy.get(),
DeferredWorkEventCallback,
&workerContext,
&subscription));
{
unique_lock<mutex> scopedLock(workerContext.ContextLock);
workerCompleted = workerContext.CompletionSignal.wait_for(
scopedLock,
hours(24),
[&workerContext] { return workerContext.IsComplete; });
}
// We passed our deadline, so unsubscribe and manually resume our task.
subscription.reset();
ResumeWorker(&workerContext);
// We destroyed our subscription, so we wait indefinitely for completion as
// there's nothing to pause execution of our task.
unique_lock<mutex> scopedLock(workerContext.ContextLock);
workerContext.CompletionSignal.wait(
scopedLock,
[&workerContext] { return workerContext.IsComplete; });
return S_OK;
}
Související témata
Přehled rozhraní API koordinátoru aktivit
rozhraní API pro koordinátora aktivit a terminologii
volba správného zásad koordinátoru aktivit