Jak: przesunięcie czasu oczekiwania za pomocą Nadsubskrypcji
Nadsubskrypcji można poprawić ogólną efektywność niektóre aplikacje, które zawierają zadania, które mają dużą ilość czasu oczekiwania.W tym temacie ilustruje sposób użycia nadsubskrypcji, aby zrównoważyć opóźnienia powodowanego przez odczyt danych z połączenia sieciowego.
Przykład
W tym przykładzie Biblioteka agentów asynchroniczne do pobierania plików z serwerów HTTP.http_reader Klasa pochodzi od concurrency::agent przechodzącej asynchronicznie odczytać nazwy adresu URL, w których pobierania wiadomości do zastosowań i.
http_reader Klasy zastosowań concurrency::task_group klasy jednocześnie odczytać każdego pliku.Każde zadanie wywołuje concurrency::Context::Oversubscribe metody z _BeginOversubscription ustawiono parametr true umożliwiających nadsubskrypcji w bieżącym kontekście.Każde zadanie, a następnie używa Microsoft Foundation Classes (MFC) CInternetSession i CHttpFile klasy, aby pobrać plik.Na koniec każdego zadania wywołuje Context::Oversubscribe z _BeginOversubscription ustawiono parametr false wyłączyć nadsubskrypcji.
Po włączeniu nadsubskrypcji runtime tworzy jeden wątek dodatkowe do uruchamiania zadań.Każdy z tych wątków również oversubscribe bieżącego kontekstu i tym samym tworzyć dodatkowe wątki.http_reader Klasy zastosowań concurrency::unbounded_buffer obiekt, aby ograniczyć liczbę wątków używanych przez aplikację.Agent inicjuje bufor ze stałą liczbą token wartości.Dla każdej operacji pobierania agent odczytuje wartość tokenu z bufora przed operacja rozpoczyna się, a następnie zapisuje wartość wróć do buforu po zakończeniu operacji.Gdy bufor jest pusty, agent czeka na jedną z operacji pobierania ponownie zapisać wartość do buforu.
Poniższy przykład ogranicza liczbę jednoczesnych zadań dwa razy liczba wątków dostępnego sprzętu.Ta wartość jest dobry punkt wyjścia do użycia podczas eksperymentowania z nadsubskrypcji.Można użyć wartość, która dopasowuje środowiska przetwarzania lub dynamicznie zmienić tę wartość, aby odpowiedzieć na rzeczywiste obciążenie pracą.
// download-oversubscription.cpp
// compile with: /EHsc /MD /D "_AFXDLL"
#define _WIN32_WINNT 0x0501
#include <afxinet.h>
#include <concrtrm.h>
#include <agents.h>
#include <ppl.h>
#include <sstream>
#include <iostream>
#include <array>
using namespace concurrency;
using namespace std;
// Calls the provided work function and returns the number of milliseconds
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
__int64 begin = GetTickCount();
f();
return GetTickCount() - begin;
}
// Downloads the file at the given URL.
CString GetHttpFile(CInternetSession& session, const CString& strUrl);
// Reads files from HTTP servers.
class http_reader : public agent
{
public:
explicit http_reader(CInternetSession& session,
ISource<string>& source,
unsigned int& total_bytes,
unsigned int max_concurrent_reads)
: _session(session)
, _source(source)
, _total_bytes(total_bytes)
{
// Add one token to the available tasks buffer for each
// possible concurrent read operation. The value of each token
// is not important, but can be useful for debugging.
for (unsigned int i = 0; i < max_concurrent_reads; ++i)
send(_available_tasks, i);
}
// Signals to the agent that there are no more items to download.
static const string input_sentinel;
protected:
void run()
{
// A task group. Each task in the group downloads one file.
task_group tasks;
// Holds the total number of bytes downloaded.
combinable<unsigned int> total_bytes;
// Read from the source buffer until the application
// sends the sentinel value.
string url;
while ((url = receive(_source)) != input_sentinel)
{
// Wait for a task to release an available slot.
unsigned int token = receive(_available_tasks);
// Create a task to download the file.
tasks.run([&, token, url] {
// Print a message.
wstringstream ss;
ss << L"Downloading " << url.c_str() << L"..." << endl;
wcout << ss.str();
// Download the file.
string content = download(url);
// Update the total number of bytes downloaded.
total_bytes.local() += content.size();
// Release the slot for another task.
send(_available_tasks, token);
});
}
// Wait for all tasks to finish.
tasks.wait();
// Compute the total number of bytes download on all threads.
_total_bytes = total_bytes.combine(plus<unsigned int>());
// Set the status of the agent to agent_done.
done();
}
// Downloads the file at the given URL.
string download(const string& url)
{
// Enable oversubscription.
Context::Oversubscribe(true);
// Download the file.
string content = GetHttpFile(_session, url.c_str());
// Disable oversubscription.
Context::Oversubscribe(false);
return content;
}
private:
// Manages the network connection.
CInternetSession& _session;
// A message buffer that holds the URL names to download.
ISource<string>& _source;
// The total number of bytes downloaded
unsigned int& _total_bytes;
// Limits the agent to a given number of simultaneous tasks.
unbounded_buffer<unsigned int> _available_tasks;
};
const string http_reader::input_sentinel("");
int wmain()
{
// Create an array of URL names to download.
// A real-world application might read the names from user input.
array<string, 21> urls = {
"http://www.adatum.com/",
"https://www.adventure-works.com/",
"http://www.alpineskihouse.com/",
"http://www.cpandl.com/",
"http://www.cohovineyard.com/",
"http://www.cohowinery.com/",
"http://www.cohovineyardandwinery.com/",
"https://www.contoso.com/",
"http://www.consolidatedmessenger.com/",
"http://www.fabrikam.com/",
"https://www.fourthcoffee.com/",
"http://www.graphicdesigninstitute.com/",
"http://www.humongousinsurance.com/",
"http://www.litwareinc.com/",
"http://www.lucernepublishing.com/",
"http://www.margiestravel.com/",
"http://www.northwindtraders.com/",
"https://www.proseware.com/",
"http://www.fineartschool.net",
"http://www.tailspintoys.com/",
http_reader::input_sentinel,
};
// Manages the network connection.
CInternetSession session("Microsoft Internet Browser");
// A message buffer that enables the application to send URL names to the
// agent.
unbounded_buffer<string> source_urls;
// The total number of bytes that the agent has downloaded.
unsigned int total_bytes = 0u;
// Create an http_reader object that can oversubscribe each processor by one.
http_reader reader(session, source_urls, total_bytes, 2*GetProcessorCount());
// Compute the amount of time that it takes for the agent to download all files.
__int64 elapsed = time_call([&] {
// Start the agent.
reader.start();
// Use the message buffer to send each URL name to the agent.
for_each(begin(urls), end(urls), [&](const string& url) {
send(source_urls, url);
});
// Wait for the agent to finish downloading.
agent::wait(&reader);
});
// Print the results.
wcout << L"Downloaded " << total_bytes
<< L" bytes in " << elapsed << " ms." << endl;
}
// Downloads the file at the given URL and returns the size of that file.
CString GetHttpFile(CInternetSession& session, const CString& strUrl)
{
CString strResult;
// Reads data from an HTTP server.
CHttpFile* pHttpFile = NULL;
try
{
// Open URL.
pHttpFile = (CHttpFile*)session.OpenURL(strUrl, 1,
INTERNET_FLAG_TRANSFER_ASCII |
INTERNET_FLAG_RELOAD | INTERNET_FLAG_DONT_CACHE);
// Read the file.
if(pHttpFile != NULL)
{
UINT uiBytesRead;
do
{
char chBuffer[10000];
uiBytesRead = pHttpFile->Read(chBuffer, sizeof(chBuffer));
strResult += chBuffer;
}
while (uiBytesRead > 0);
}
}
catch (CInternetException)
{
// TODO: Handle exception
}
// Clean up and return.
delete pHttpFile;
return strResult;
}
Ten przykład generuje następujące dane wyjściowe na komputerze, który ma cztery procesory:
Downloading http://www.adatum.com/...
Downloading https://www.adventure-works.com/...
Downloading http://www.alpineskihouse.com/...
Downloading http://www.cpandl.com/...
Downloading http://www.cohovineyard.com/...
Downloading http://www.cohowinery.com/...
Downloading http://www.cohovineyardandwinery.com/...
Downloading https://www.contoso.com/...
Downloading http://www.consolidatedmessenger.com/...
Downloading http://www.fabrikam.com/...
Downloading https://www.fourthcoffee.com/...
Downloading http://www.graphicdesigninstitute.com/...
Downloading http://www.humongousinsurance.com/...
Downloading http://www.litwareinc.com/...
Downloading http://www.lucernepublishing.com/...
Downloading http://www.margiestravel.com/...
Downloading http://www.northwindtraders.com/...
Downloading https://www.proseware.com/...
Downloading http://www.fineartschool.net...
Downloading http://www.tailspintoys.com/...
Downloaded 1801040 bytes in 3276 ms.
Przykład mogą działać szybciej, gdy nadsubskrypcji jest włączone, ponieważ dodatkowe zadania uruchamiane podczas gdy inne zadania czekać na zakończenie operacji utajonej.
Kompilowanie kodu
Skopiuj przykładowy kod i wklej go w projekcie programu Visual Studio lub wkleić go w pliku o nazwie pobierania oversubscription.cpp , a następnie uruchom jeden z następujących poleceń w oknie wiersza polecenia usługi programu Visual Studio.
cl.exe /EHsc /MD /D "_AFXDLL" download-oversubscription.cpp
cl.exe /EHsc /MT download-oversubscription.cpp
Stabilne programowanie
Zawsze należy wyłączyć nadsubskrypcji, po nie jest już wymagany.Należy wziąć pod uwagę funkcji, która nie obsługuje wyjątek, który jest generowany przez inną funkcję.Jeśli nie można wyłączyć nadsubskrypcji zanim funkcja zwraca, wszelkich dodatkowych prac równolegle będą również oversubscribe bieżącego kontekstu.
Można użyć Inicjowania jest przejęcie zasobu (RAII) deseń, aby ograniczyć nadsubskrypcji do danego zakresu.W obszarze wzorzec RAII struktura danych jest przydzielane na stosie.Struktury danych inicjuje lub nabywa zasobu, gdy jest tworzony i niszczy lub zwalnia tego zasobu, kiedy niszczony jest struktura danych.Deseń RAII gwarancje destruktor nosi przed kończy zakres okalającego.W związku z tym, gdy jest wyjątek lub gdy funkcja zawiera wiele zasobu jest poprawnie zarządzane return sprawozdania.
Poniższy przykład definiuje strukturę, o nazwie scoped_blocking_signal.Konstruktor scoped_blocking_signal struktury umożliwia nadsubskrypcji i destruktor wyłącza nadsubskrypcji.
struct scoped_blocking_signal
{
scoped_blocking_signal()
{
concurrency::Context::Oversubscribe(true);
}
~scoped_blocking_signal()
{
concurrency::Context::Oversubscribe(false);
}
};
Poniższy przykład modyfikuje treści download metodę RAII do zapewnienia, że nadsubskrypcji jest wyłączone, zanim funkcja zwraca.Technika ta zapewnia, że download metoda jest bezpieczny wyjątek.
// Downloads the file at the given URL.
string download(const string& url)
{
scoped_blocking_signal signal;
// Download the file.
return string(GetHttpFile(_session, url.c_str()));
}