Comment : utiliser le surabonnement pour compenser la latence
Le surabonnement peut améliorer l'efficacité globale de certaines applications qui contiennent des tâches qui ont une quantité élevée de latence. Cette rubrique illustre comment utiliser le surabonnement afin de compenser la latence provoquée par la lecture de données à partir d'une connexion réseau.
Exemple
Cet exemple utilise la Bibliothèque d'agents asynchrones pour télécharger des fichiers à partir de serveurs HTTP. La classe http_reader dérive de concurrency::agent et utilise le passage de message pour lire de façon asynchrone les noms des URL à télécharger.
La classe http_reader utilise la classe concurrency::task_group pour lire chaque fichier simultanément. Chaque tâche appelle la méthode concurrency::Context::Oversubscribe avec le paramètre _BeginOversubscription défini sur la valeur true pour permettre le surabonnement dans le contexte actuel. Chaque tâche utilise ensuite les classes MFC (Microsoft Foundation Classes) CInternetSession et CHttpFile pour télécharger le fichier. Pour finir, chaque tâche appelle Context::Oversubscribe avec le paramètre _BeginOversubscription défini à false pour désactiver le surabonnement.
Lorsque le surabonnement est activé, le runtime crée un thread supplémentaire dans lequel exécuter des tâches. Chacun de ces threads peut également surabonner le contexte actuel et ainsi créer des threads supplémentaires. La classe http_reader utilise un objet concurrency::unbounded_buffer pour limiter le nombre de threads utilisés par l'application. L'agent initialise la mémoire tampon avec un nombre fixe de valeurs de jeton. Pour chaque opération de téléchargement, l'agent lit une valeur de jeton à partir de la mémoire tampon avant que l'opération démarre, puis réécrit cette valeur dans la mémoire tampon une fois l'opération terminée. Lorsque la mémoire tampon est vide, l'agent attend que l'une des opérations de téléchargement réécrive une valeur dans la mémoire tampon.
L'exemple suivant limite le nombre de tâches simultanées à deux fois le nombre de threads matériels disponibles. Cette valeur est un bon point de départ à utiliser en cas d'expérimentation avec le surabonnement. Vous pouvez utiliser une valeur adaptée à un environnement de traitement particulier ou modifier cette valeur de manière dynamique afin de répondre à la charge de travail réelle.
// download-oversubscription.cpp
// compile with: /EHsc /MD /D "_AFXDLL"
#define _WIN32_WINNT 0x0501
#include <afxinet.h>
#include <concrtrm.h>
#include <agents.h>
#include <ppl.h>
#include <sstream>
#include <iostream>
#include <array>
using namespace concurrency;
using namespace std;
// Calls the provided work function and returns the number of milliseconds
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
__int64 begin = GetTickCount();
f();
return GetTickCount() - begin;
}
// Downloads the file at the given URL.
CString GetHttpFile(CInternetSession& session, const CString& strUrl);
// Reads files from HTTP servers.
class http_reader : public agent
{
public:
explicit http_reader(CInternetSession& session,
ISource<string>& source,
unsigned int& total_bytes,
unsigned int max_concurrent_reads)
: _session(session)
, _source(source)
, _total_bytes(total_bytes)
{
// Add one token to the available tasks buffer for each
// possible concurrent read operation. The value of each token
// is not important, but can be useful for debugging.
for (unsigned int i = 0; i < max_concurrent_reads; ++i)
send(_available_tasks, i);
}
// Signals to the agent that there are no more items to download.
static const string input_sentinel;
protected:
void run()
{
// A task group. Each task in the group downloads one file.
task_group tasks;
// Holds the total number of bytes downloaded.
combinable<unsigned int> total_bytes;
// Read from the source buffer until the application
// sends the sentinel value.
string url;
while ((url = receive(_source)) != input_sentinel)
{
// Wait for a task to release an available slot.
unsigned int token = receive(_available_tasks);
// Create a task to download the file.
tasks.run([&, token, url] {
// Print a message.
wstringstream ss;
ss << L"Downloading " << url.c_str() << L"..." << endl;
wcout << ss.str();
// Download the file.
string content = download(url);
// Update the total number of bytes downloaded.
total_bytes.local() += content.size();
// Release the slot for another task.
send(_available_tasks, token);
});
}
// Wait for all tasks to finish.
tasks.wait();
// Compute the total number of bytes download on all threads.
_total_bytes = total_bytes.combine(plus<unsigned int>());
// Set the status of the agent to agent_done.
done();
}
// Downloads the file at the given URL.
string download(const string& url)
{
// Enable oversubscription.
Context::Oversubscribe(true);
// Download the file.
string content = GetHttpFile(_session, url.c_str());
// Disable oversubscription.
Context::Oversubscribe(false);
return content;
}
private:
// Manages the network connection.
CInternetSession& _session;
// A message buffer that holds the URL names to download.
ISource<string>& _source;
// The total number of bytes downloaded
unsigned int& _total_bytes;
// Limits the agent to a given number of simultaneous tasks.
unbounded_buffer<unsigned int> _available_tasks;
};
const string http_reader::input_sentinel("");
int wmain()
{
// Create an array of URL names to download.
// A real-world application might read the names from user input.
array<string, 21> urls = {
"http://www.adatum.com/",
"https://www.adventure-works.com/",
"http://www.alpineskihouse.com/",
"http://www.cpandl.com/",
"http://www.cohovineyard.com/",
"http://www.cohowinery.com/",
"http://www.cohovineyardandwinery.com/",
"https://www.contoso.com/",
"http://www.consolidatedmessenger.com/",
"http://www.fabrikam.com/",
"https://www.fourthcoffee.com/",
"http://www.graphicdesigninstitute.com/",
"http://www.humongousinsurance.com/",
"http://www.litwareinc.com/",
"http://www.lucernepublishing.com/",
"http://www.margiestravel.com/",
"http://www.northwindtraders.com/",
"https://www.proseware.com/",
"http://www.fineartschool.net",
"http://www.tailspintoys.com/",
http_reader::input_sentinel,
};
// Manages the network connection.
CInternetSession session("Microsoft Internet Browser");
// A message buffer that enables the application to send URL names to the
// agent.
unbounded_buffer<string> source_urls;
// The total number of bytes that the agent has downloaded.
unsigned int total_bytes = 0u;
// Create an http_reader object that can oversubscribe each processor by one.
http_reader reader(session, source_urls, total_bytes, 2*GetProcessorCount());
// Compute the amount of time that it takes for the agent to download all files.
__int64 elapsed = time_call([&] {
// Start the agent.
reader.start();
// Use the message buffer to send each URL name to the agent.
for_each(begin(urls), end(urls), [&](const string& url) {
send(source_urls, url);
});
// Wait for the agent to finish downloading.
agent::wait(&reader);
});
// Print the results.
wcout << L"Downloaded " << total_bytes
<< L" bytes in " << elapsed << " ms." << endl;
}
// Downloads the file at the given URL and returns the size of that file.
CString GetHttpFile(CInternetSession& session, const CString& strUrl)
{
CString strResult;
// Reads data from an HTTP server.
CHttpFile* pHttpFile = NULL;
try
{
// Open URL.
pHttpFile = (CHttpFile*)session.OpenURL(strUrl, 1,
INTERNET_FLAG_TRANSFER_ASCII |
INTERNET_FLAG_RELOAD | INTERNET_FLAG_DONT_CACHE);
// Read the file.
if(pHttpFile != NULL)
{
UINT uiBytesRead;
do
{
char chBuffer[10000];
uiBytesRead = pHttpFile->Read(chBuffer, sizeof(chBuffer));
strResult += chBuffer;
}
while (uiBytesRead > 0);
}
}
catch (CInternetException)
{
// TODO: Handle exception
}
// Clean up and return.
delete pHttpFile;
return strResult;
}
Cet exemple produit la sortie suivante sur un ordinateur qui a quatre processeurs :
L'exemple peut s'exécuter plus rapidement lorsque le surabonnement est activé car des tâches supplémentaires s'exécutent pendant que d'autres tâches attendent qu'une opération latente se termine.
Compilation du code
Copiez l'exemple de code et collez-le dans un projet Visual Studio , ou collez-le dans un fichier nommé download-oversubscription.cpp puis exécutez l'une des commandes suivantes dans une fenêtre d'invite de commandes Visual Studio.
cl.exe /EHsc /MD /D "_AFXDLL" download-oversubscription.cpp
cl.exe /EHsc /MT download-oversubscription.cpp
Programmation fiable
Désactivez toujours le surabonnement une fois que vous n'en avez plus besoin. Considérez une fonction qui ne gère pas une exception levée par une autre fonction. Si vous ne désactivez pas le surabonnement avant que la fonction ne retourne, tout travail parallèle supplémentaire surabonnera également le contexte actuel.
Vous pouvez utiliser le modèle RAII (Resource Acquisition Is Initialization) pour limiter le surabonnement à une portée donnée. Selon le modèle RAII, une structure de données est allouée sur la pile. Cette structure de données initialise ou acquiert une ressource lorsqu'elle est créée et détruit ou libère cette ressource lorsque la structure de données est détruite. Le modèle RAII garantit que le destructeur est appelé avant que la portée englobante ne quitte. Par conséquent, la ressource est gérée correctement lorsqu'une exception est levée ou lorsqu'une fonction contient plusieurs instructions return.
L'exemple suivant définit une structure nommée scoped_blocking_signal. Le constructeur de la structure scoped_blocking_signal active le surabonnement et le destructeur désactive le surabonnement.
struct scoped_blocking_signal
{
scoped_blocking_signal()
{
concurrency::Context::Oversubscribe(true);
}
~scoped_blocking_signal()
{
concurrency::Context::Oversubscribe(false);
}
};
L'exemple suivant modifie le corps de la méthode download de façon à utiliser RAII pour s'assurer que le surabonnement est désactivé avant que la fonction ne soit retournée. Cette technique garantit que la méthode download est sécurisée du point de vue des exceptions.
// Downloads the file at the given URL.
string download(const string& url)
{
scoped_blocking_signal signal;
// Download the file.
return string(GetHttpFile(_session, url.c_str()));
}
Voir aussi
Référence
Context::Oversubscribe, méthode