Jak: pomocí zaznamenán posun čekací doba
Podávání zlepšit celkové účinnosti některé aplikace, které obsahují úkoly, které mají vysoké částky čekací doby.Toto téma ukazuje, jak pomocí zaznamenán posun čekací doba, která je způsobena čtení dat ze síťového připojení.
Příklad
V tomto příkladu Asynchronní agenti knihovny stahování souborů ze serverů HTTP.http_reader Je odvozen z třídy concurrency::agent a používá zpráva předávání asynchronně číst názvy URL ke stažení.
http_reader Používá třídy concurrency::task_group třídy současně číst každý soubor.Každý úkol volá concurrency::Context::Oversubscribe metodou s _BeginOversubscription parametr nastaven na hodnotu true povolení zaznamenán v aktuálním kontextu.Každý úkol použije tříd Microsoft Foundation (MFC) CInternetSession a CHttpFile třídy ke stažení souboru.Nakonec každý úkol volá Context::Oversubscribe se _BeginOversubscription parametru false zakázat podávání.
Podávání povolena, vytvoří jeden další podproces, ve kterém se úlohy modulu runtime.Každý z těchto podprocesů také oversubscribe aktuální kontext a tím vytvořit další podprocesy.http_reader Používá třídy concurrency::unbounded_buffer objektu omezit počet podprocesů, které aplikace používá.Agent inicializuje s pevným počtem token hodnoty vyrovnávací paměti.Pro každou operaci stahování jsou v agent přečte hodnotu token z vyrovnávací paměti před operace začíná a zapíše tuto hodnotu zpět do vyrovnávací paměti po dokončení operace.Pokud vyrovnávací paměti je prázdný, agent čeká jedna operace stažení zapisovat hodnoty zpět do vyrovnávací paměti.
Následující příklad omezuje počet současných úloh dvakrát počet podprocesů hardwaru k dispozici.Tato hodnota je vhodný výchozí bod při experimentování s zaznamenán.Můžete použít hodnotu, která odpovídá zejména zpracování prostředí nebo dynamicky měnit tuto hodnotu reagovat na skutečné pracovní zátěže.
// download-oversubscription.cpp
// compile with: /EHsc /MD /D "_AFXDLL"
#define _WIN32_WINNT 0x0501
#include <afxinet.h>
#include <concrtrm.h>
#include <agents.h>
#include <ppl.h>
#include <sstream>
#include <iostream>
#include <array>
using namespace concurrency;
using namespace std;
// Calls the provided work function and returns the number of milliseconds
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
__int64 begin = GetTickCount();
f();
return GetTickCount() - begin;
}
// Downloads the file at the given URL.
CString GetHttpFile(CInternetSession& session, const CString& strUrl);
// Reads files from HTTP servers.
class http_reader : public agent
{
public:
explicit http_reader(CInternetSession& session,
ISource<string>& source,
unsigned int& total_bytes,
unsigned int max_concurrent_reads)
: _session(session)
, _source(source)
, _total_bytes(total_bytes)
{
// Add one token to the available tasks buffer for each
// possible concurrent read operation. The value of each token
// is not important, but can be useful for debugging.
for (unsigned int i = 0; i < max_concurrent_reads; ++i)
send(_available_tasks, i);
}
// Signals to the agent that there are no more items to download.
static const string input_sentinel;
protected:
void run()
{
// A task group. Each task in the group downloads one file.
task_group tasks;
// Holds the total number of bytes downloaded.
combinable<unsigned int> total_bytes;
// Read from the source buffer until the application
// sends the sentinel value.
string url;
while ((url = receive(_source)) != input_sentinel)
{
// Wait for a task to release an available slot.
unsigned int token = receive(_available_tasks);
// Create a task to download the file.
tasks.run([&, token, url] {
// Print a message.
wstringstream ss;
ss << L"Downloading " << url.c_str() << L"..." << endl;
wcout << ss.str();
// Download the file.
string content = download(url);
// Update the total number of bytes downloaded.
total_bytes.local() += content.size();
// Release the slot for another task.
send(_available_tasks, token);
});
}
// Wait for all tasks to finish.
tasks.wait();
// Compute the total number of bytes download on all threads.
_total_bytes = total_bytes.combine(plus<unsigned int>());
// Set the status of the agent to agent_done.
done();
}
// Downloads the file at the given URL.
string download(const string& url)
{
// Enable oversubscription.
Context::Oversubscribe(true);
// Download the file.
string content = GetHttpFile(_session, url.c_str());
// Disable oversubscription.
Context::Oversubscribe(false);
return content;
}
private:
// Manages the network connection.
CInternetSession& _session;
// A message buffer that holds the URL names to download.
ISource<string>& _source;
// The total number of bytes downloaded
unsigned int& _total_bytes;
// Limits the agent to a given number of simultaneous tasks.
unbounded_buffer<unsigned int> _available_tasks;
};
const string http_reader::input_sentinel("");
int wmain()
{
// Create an array of URL names to download.
// A real-world application might read the names from user input.
array<string, 21> urls = {
"http://www.adatum.com/",
"https://www.adventure-works.com/",
"http://www.alpineskihouse.com/",
"http://www.cpandl.com/",
"http://www.cohovineyard.com/",
"http://www.cohowinery.com/",
"http://www.cohovineyardandwinery.com/",
"https://www.contoso.com/",
"http://www.consolidatedmessenger.com/",
"http://www.fabrikam.com/",
"https://www.fourthcoffee.com/",
"http://www.graphicdesigninstitute.com/",
"http://www.humongousinsurance.com/",
"http://www.litwareinc.com/",
"http://www.lucernepublishing.com/",
"http://www.margiestravel.com/",
"http://www.northwindtraders.com/",
"https://www.proseware.com/",
"http://www.fineartschool.net",
"http://www.tailspintoys.com/",
http_reader::input_sentinel,
};
// Manages the network connection.
CInternetSession session("Microsoft Internet Browser");
// A message buffer that enables the application to send URL names to the
// agent.
unbounded_buffer<string> source_urls;
// The total number of bytes that the agent has downloaded.
unsigned int total_bytes = 0u;
// Create an http_reader object that can oversubscribe each processor by one.
http_reader reader(session, source_urls, total_bytes, 2*GetProcessorCount());
// Compute the amount of time that it takes for the agent to download all files.
__int64 elapsed = time_call([&] {
// Start the agent.
reader.start();
// Use the message buffer to send each URL name to the agent.
for_each(begin(urls), end(urls), [&](const string& url) {
send(source_urls, url);
});
// Wait for the agent to finish downloading.
agent::wait(&reader);
});
// Print the results.
wcout << L"Downloaded " << total_bytes
<< L" bytes in " << elapsed << " ms." << endl;
}
// Downloads the file at the given URL and returns the size of that file.
CString GetHttpFile(CInternetSession& session, const CString& strUrl)
{
CString strResult;
// Reads data from an HTTP server.
CHttpFile* pHttpFile = NULL;
try
{
// Open URL.
pHttpFile = (CHttpFile*)session.OpenURL(strUrl, 1,
INTERNET_FLAG_TRANSFER_ASCII |
INTERNET_FLAG_RELOAD | INTERNET_FLAG_DONT_CACHE);
// Read the file.
if(pHttpFile != NULL)
{
UINT uiBytesRead;
do
{
char chBuffer[10000];
uiBytesRead = pHttpFile->Read(chBuffer, sizeof(chBuffer));
strResult += chBuffer;
}
while (uiBytesRead > 0);
}
}
catch (CInternetException)
{
// TODO: Handle exception
}
// Clean up and return.
delete pHttpFile;
return strResult;
}
Tento příklad vytvoří v počítači, který má čtyři procesory následující výstup:
Downloading http://www.adatum.com/...
Downloading https://www.adventure-works.com/...
Downloading http://www.alpineskihouse.com/...
Downloading http://www.cpandl.com/...
Downloading http://www.cohovineyard.com/...
Downloading http://www.cohowinery.com/...
Downloading http://www.cohovineyardandwinery.com/...
Downloading https://www.contoso.com/...
Downloading http://www.consolidatedmessenger.com/...
Downloading http://www.fabrikam.com/...
Downloading https://www.fourthcoffee.com/...
Downloading http://www.graphicdesigninstitute.com/...
Downloading http://www.humongousinsurance.com/...
Downloading http://www.litwareinc.com/...
Downloading http://www.lucernepublishing.com/...
Downloading http://www.margiestravel.com/...
Downloading http://www.northwindtraders.com/...
Downloading https://www.proseware.com/...
Downloading http://www.fineartschool.net...
Downloading http://www.tailspintoys.com/...
Downloaded 1801040 bytes in 3276 ms.
V příkladu můžete rychleji při podávání je povolena, protože ostatní úkoly čekat na dokončení latentní operace spustit další úkoly.
Probíhá kompilace kódu
Příklad kódu zkopírujte a vložte do projektu Visual Studio nebo vložit do souboru s názvem stahování oversubscription.cpp a potom spusťte jeden z následujících příkazů v okně příkazového řádku Visual Studio.
cl.exe /EHsc /MD /D "_AFXDLL" download-oversubscription.cpp
cl.exe /EHsc /MT download-oversubscription.cpp
Robustní programování
Vždy zakážete podávání po již nevyžadují.Zvažte funkce, která zpracovává výjimku, která je vyvolána jiné funkce.Pokud není zaznamenán před vrátí, žádné další paralelní práce také oversubscribe v aktuálním kontextu.
Můžete použít Inicializace je získání prostředků (RAII) vzorek omezit zaznamenán do daného oboru.Podle vzoru RAII je struktura dat přiřazené v zásobníku.Že struktura dat inicializuje nebo získá zdroje při vytvoření a ničí nebo uvolní prostředku při datovou strukturu je zničen.Vzorek RAII zaručuje, že se objekt je volána před ukončí ohraničujícím oboru.Proto je prostředek správně spravována při vyvolání výjimky nebo funkce obsahuje více return příkazy.
Následující příklad definuje strukturu s názvem scoped_blocking_signal.Konstruktoru scoped_blocking_signal struktury umožňuje podávání a jež zakáže se objekt.
struct scoped_blocking_signal
{
scoped_blocking_signal()
{
concurrency::Context::Oversubscribe(true);
}
~scoped_blocking_signal()
{
concurrency::Context::Oversubscribe(false);
}
};
Následující příklad změní text download metoda RAII zajistit, že podávání je zakázáno před vrátí.Tato metoda zajišťuje, že download metoda je bezpečné výjimku.
// Downloads the file at the given URL.
string download(const string& url)
{
scoped_blocking_signal signal;
// Download the file.
return string(GetHttpFile(_session, url.c_str()));
}