Activity Coordinator 예제 프로젝트
Activity Coordinator에 대한 이 간단한 예제는 시스템 조건이 충족될 때 API를 활용하여 백그라운드에서 모델을 다시 학습하는 방법을 보여 줍니다.
프로젝트 개요 예제
음악 편집 앱의 경우를 살펴보겠습니다. 이 앱에는 클라우드 스토리지에 콘텐츠 게시와 같이 사용자가 요청하는 우선 순위가 높은 백그라운드 작업이 있습니다. 편집하는 동안 컴퍼지션을 개선하기 위한 자동 권장 사항을 제공하는 등 사용자 상호 작용을 지원하는 우선 순위가 낮은 백그라운드 작업도 있습니다. 마지막으로 사용자의 요청 없이 특정 시간에 발생할 필요가 없는 지연된 작업 집합이 있습니다. 이 예제에서는 이 작업을 중점적으로 설명합니다. 특히 사용자 영향이 최소화된 경우 권장 사항 모델을 주기적으로 다시 학습하려고 합니다. 이를 위해 활동 코디네이터 API를 사용할 수 있습니다.
이 시나리오에서는 사용자가 없을 때 모델을 다시 학습하려고 합니다. 이 시나리오의 재학습 워크플로는 GPU 소비자이기도 하므로 GPU를 사용하기에 적절한 시기에 실행하려고 합니다. 활동 코디네이터 정책을 사용하여 이러한 요구 사항을 지정할 수 있습니다. 활동 코디네이터 API는 정책을 사용하여 요구 사항이 충족되는 시기를 결정하고 작업 실행을 시작하거나 중지할 시기에 대한 알림을 보냅니다.
이 경우 GOOD 정책 템플릿은 CPU, 메모리, 시스템 디스크, 전원 및 사용자 유휴 상태를 추적하므로 대부분의 요구 사항을 충족합니다. GPU에 대한 조건을 명시적으로 설정하기만 하면 합니다. 워크로드가 주로 GPU를 활용하더라도 작업의 실행은 여전히 기본적으로 CPU, 메모리, 디스크 및 전원을 소비한다는 점을 기억해야 합니다. 이러한 리소스에 미치는 영향도 시스템 구성에 따라 크게 달라질 수 있습니다. 예를 들어 GPU가 빨라지면 CPU가 GPU에 데이터를 공급하는 데 더 많은 시간을 소비할 수 있으며, 이로 인해 더 많은 데이터를 읽거나 디스크에 저장할 수 있습니다. 이 디스크의 속도도 비슷한 방식으로 CPU 사용량에 영향을 줄 수 있습니다. 영향을 주는 모든 리소스를 구성하여 실수로 사용자 환경을 방해하거나 시스템 성능을 저하하지 않도록 할 수 있습니다. 또한 작업 자체는 작은 청크 단위로 세분화되어 조정 알림에 적절하게 응답하여 원하는 조건 외부에서 실행되지 않도록 할 수 있습니다.
개발자가 정책을 변경하거나 다운그레이드하는 방법을 보여주기 위해 48시간 이내에 재학습을 완료해야 하는 요구 사항도 추가합니다. 처음 24시간, 소프트 마감일은 이상적인 정책으로 실행하려고 시도하며, 지난 24시간 동안 우리는 더 작은 정책으로 다운그레이드합니다.
예제 프로젝트 코드
다음 코드는 음악 편집 샘플 애플리케이션입니다. 개요에 설명된 대로 활동 코디네이터 API를 활용하여 백그라운드 작업을 수행합니다.
#include <chrono>
#include <mutex>
#include <condition_variable>
#include <Windows.h>
#include <ActivityCoordinator.h>
#include <wil/resource.h>
// To use ActivityCoordinator, we must link to the OneCoreUAP library.
#pragma comment(lib, "OneCoreUAP.lib")
using namespace std;
using namespace chrono;
using namespace wil;
// Declare RAII wrappers for the Activity Coordinator policy and subscription.
// These behave like traditional smart pointers and will call their associated
// API cleanup functions when they go out of scope.
typedef wil::unique_any<
ACTIVITY_COORDINATOR_POLICY,
decltype(&DestroyActivityCoordinatorPolicy),
DestroyActivityCoordinatorPolicy>
unique_policy;
typedef wil::unique_any<
ACTIVITY_COORDINATOR_SUBSCRIPTION,
decltype(&UnsubscribeActivityCoordinatorPolicy),
UnsubscribeActivityCoordinatorPolicy>
unique_subscription;
struct WORKER_CONTEXT {
mutex ContextLock;
unique_threadpool_work Worker;
bool ShouldRun;
bool IsRunning;
bool IsComplete;
std::condition_variable CompletionSignal;
};
_Requires_lock_held_(workerContext->ContextLock)
void
ResumeWorker(
_In_ WORKER_CONTEXT* workerContext
)
{
workerContext->ShouldRun = true;
if (!workerContext->IsRunning && !workerContext->IsComplete) {
// No active workers, so start a new one.
workerContext->IsRunning = true;
SubmitThreadpoolWork(workerContext->Worker.get());
}
}
void
DeferredWorkEventCallback(
_In_ ACTIVITY_COORDINATOR_NOTIFICATION notificationType,
_In_ void* callbackContext
)
{
WORKER_CONTEXT* workerContext = reinterpret_cast<WORKER_CONTEXT*>(callbackContext);
// Use this callback thread to dispatch notifications to a worker thread
// about whether or not it should process the next chunk of deferred work.
// Note: Do not use this thread to perform your activity's workload.
lock_guard<mutex> scopedLock(workerContext->ContextLock);
switch (notificationType) {
case ACTIVITY_COORDINATOR_NOTIFICATION_RUN:
// Allow deferred work to be processed.
ResumeWorker(workerContext);
break;
case ACTIVITY_COORDINATOR_NOTIFICATION_STOP:
// Stop processing deferred work.
workerContext->ShouldRun = false;
break;
default:
FAIL_FAST();
break;
}
}
bool
TrainNextModelChunk(
)
{
//
// Returns true if all work is completed, or false if there is more work.
//
return false;
}
void
DeferredModelTrainingWorker(
_Inout_ PTP_CALLBACK_INSTANCE callbackInstance,
_Inout_opt_ PVOID callbackContext,
_Inout_ PTP_WORK work
)
{
// Threadpool callback instance and work are not needed for this sample.
UNREFERENCED_PARAMETER(callbackInstance);
UNREFERENCED_PARAMETER(work);
WORKER_CONTEXT* workerContext = reinterpret_cast<WORKER_CONTEXT*>(callbackContext);
bool workComplete = false;
// Keep processing work until being told to stop or all work has been completed.
while (true) {
{
lock_guard<mutex> scopedLock(workerContext->ContextLock);
if (workComplete) {
workerContext->IsComplete = true;
}
if (!workerContext->ShouldRun || workerContext->IsComplete) {
workerContext->IsRunning = false;
break;
}
}
// TrainNextModelChunk returns true when there is no more work to do.
workComplete = TrainNextModelChunk();
}
workerContext->CompletionSignal.notify_all();
}
int
__cdecl
wmain(
)
{
WORKER_CONTEXT workerContext;
workerContext.ShouldRun = false;
workerContext.IsRunning = false;
workerContext.IsComplete = false;
// Create the worker that will be started by our subscription callback.
workerContext.Worker.reset(CreateThreadpoolWork(
DeferredModelTrainingWorker,
&workerContext,
nullptr));
RETURN_LAST_ERROR_IF_NULL(workerContext.Worker);
// Allocate a policy suited for tasks that are best run when unlikely
// to cause impact to the user or system performance.
unique_policy policy;
RETURN_IF_FAILED(CreateActivityCoordinatorPolicy(
ACTIVITY_COORDINATOR_POLICY_TEMPLATE_GOOD,
&policy));
// The model training in this sample consumes GPU.
// The GOOD policy template doesn't currently include the GPU resource. We
// therefore customize the policy to include good GPU conditions to minimize
// the impact of running our work.
RETURN_IF_FAILED(SetActivityCoordinatorPolicyResourceCondition(
policy.get(),
ACTIVITY_COORDINATOR_RESOURCE_GPU,
ACTIVITY_COORDINATOR_CONDITION_GOOD));
// Subscribe to the policy for coordination notifications.
unique_subscription subscription;
RETURN_IF_FAILED(SubscribeActivityCoordinatorPolicy(
policy.get(),
DeferredWorkEventCallback,
&workerContext,
&subscription));;
// Destroy the policy because we no longer need it.
policy.reset();
// We want our task to complete within 48h, so we allocate 24h under our
// ideal policy and before falling back to a downgraded policy.
bool workerCompleted;
{
unique_lock<mutex> scopedLock(workerContext.ContextLock);
workerCompleted = workerContext.CompletionSignal.wait_for(
scopedLock,
hours(24),
[&workerContext] { return workerContext.IsComplete; });
}
if (workerCompleted) {
// Since our work is complete, we should clean up our subscription by
// unsubscribing. This would normally be handled quietly by our RAII
// types, but we release them explicitly to demonstrate API flow for
// developers manually managing resources.
subscription.reset();
return S_OK;
}
// We passed our soft deadline, so downgrade the policy and wait the
// remaining 24h until our hard deadline has been reached. Since
// Subscriptions and policies are independent of each other, we need to
// create a new subscription with our downgraded policy to receive
// notifications based on its configuration.
//
// The downgraded policy uses medium conditions for all needed resources.
// This gives us the best chance to run while helping to prevent us from
// critically degrading the user experience, which we are more likely to do
// when falling back to manual execution.
RETURN_IF_FAILED(CreateActivityCoordinatorPolicy(
ACTIVITY_COORDINATOR_POLICY_TEMPLATE_MEDIUM,
&policy));
RETURN_IF_FAILED(SetActivityCoordinatorPolicyResourceCondition(
policy.get(),
ACTIVITY_COORDINATOR_RESOURCE_GPU,
ACTIVITY_COORDINATOR_CONDITION_MEDIUM));
subscription.reset();
RETURN_IF_FAILED(SubscribeActivityCoordinatorPolicy(
policy.get(),
DeferredWorkEventCallback,
&workerContext,
&subscription));
{
unique_lock<mutex> scopedLock(workerContext.ContextLock);
workerCompleted = workerContext.CompletionSignal.wait_for(
scopedLock,
hours(24),
[&workerContext] { return workerContext.IsComplete; });
}
// We passed our deadline, so unsubscribe and manually resume our task.
subscription.reset();
ResumeWorker(&workerContext);
// We destroyed our subscription, so we wait indefinitely for completion as
// there's nothing to pause execution of our task.
unique_lock<mutex> scopedLock(workerContext.ContextLock);
workerContext.CompletionSignal.wait(
scopedLock,
[&workerContext] { return workerContext.IsComplete; });
return S_OK;
}