Dela via


How to: Use parallel_invoke to Execute Parallel Operations

This example shows how to use the concurrency::parallel_invoke algorithm to improve the performance of a program that performs multiple operations on a shared data source. Because no operations modify the source, they can be executed in parallel in a straightforward manner.

Example

Consider the following code example that creates a variable of type MyDataType, calls a function to initialize that variable, and then performs multiple lengthy operations on that data.

MyDataType data;
initialize_data(data);

lengthy_operation1(data);
lengthy_operation2(data);
lengthy_operation3(data);

If the lengthy_operation1, lengthy_operation2, and lengthy_operation3 functions do not modify the MyDataType variable, these functions can be executed in parallel without additional modifications.

The following example modifies the previous example to run in parallel. The parallel_invoke algorithm executes each task in parallel and returns after all tasks are finished.

MyDataType data;
initialize_data(data);

concurrency::parallel_invoke(
   [&data] { lengthy_operation1(data); },
   [&data] { lengthy_operation2(data); },
   [&data] { lengthy_operation3(data); }
);

The following example downloads The Iliad by Homer from gutenberg.org and performs multiple operations on that file. The example first performs these operations serially and then performs the same operations in parallel.

// parallel-word-mining.cpp 
// compile with: /EHsc /MD /DUNICODE /D_AFXDLL
#define _WIN32_WINNT 0x0501
#include <afxinet.h>
#include <ppl.h>
#include <string>
#include <iostream>
#include <vector>
#include <map>
#include <algorithm>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds  
// that it takes to call that function. 
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// Downloads the file at the given URL.
CString get_http_file(CInternetSession& session, const CString& url);

// Adds each word in the provided string to the provided vector of strings. 
void make_word_list(const wstring& text, vector<wstring>& words);

// Finds the most common words whose length are greater than or equal to the  
// provided minimum. 
vector<pair<wstring, size_t>> find_common_words(const vector<wstring>& words, 
   size_t min_length, size_t count);

// Finds the longest sequence of words that have the same first letter.
vector<wstring> find_longest_sequence(const vector<wstring>& words);

// Finds all pairs of palindromes that appear in the provided collection 
// of words.
vector<pair<wstring, wstring>> find_palindromes(const vector<wstring>& words,
   size_t min_length);

int wmain()
{  
   // Manages the network connection.
   CInternetSession session(L"Microsoft Internet Browser");

   // Download 'The Iliad' from gutenberg.org.
   wcout << L"Downloading 'The Iliad'..." << endl;
   wstring file = get_http_file(session, L"http://www.gutenberg.org/files/6130/6130-0.txt");
   wcout << endl;

   // Convert the text to a list of individual words.
   vector<wstring> words;
   make_word_list(file, words);

   // Compare the time that it takes to perform several operations on the data 
   // serially and in parallel.
   __int64 elapsed;

   vector<pair<wstring, size_t>> common_words;
   vector<wstring> longest_sequence;   
   vector<pair<wstring, wstring>> palindromes;

   wcout << L"Running serial version...";
   elapsed = time_call([&] {
      common_words = find_common_words(words, 5, 9);
      longest_sequence = find_longest_sequence(words);
      palindromes = find_palindromes(words, 5);
   });
   wcout << L" took " << elapsed << L" ms." << endl;

   wcout << L"Running parallel version...";
   elapsed = time_call([&] {
      parallel_invoke(         
         [&] { common_words = find_common_words(words, 5, 9); },
         [&] { longest_sequence = find_longest_sequence(words); },
         [&] { palindromes = find_palindromes(words, 5); }
      );
   });
   wcout << L" took " << elapsed << L" ms." << endl;
   wcout << endl;

   // Print results.

   wcout << L"The most common words that have five or more letters are:" 
         << endl;
   for_each(begin(common_words), end(common_words), 
      [](const pair<wstring, size_t>& p) {
         wcout << L"   " << p.first << L" (" << p.second << L")" << endl; 
      });

   wcout << L"The longest sequence of words that have the same first letter is:" 
         << endl << L"   ";
   for_each(begin(longest_sequence), end(longest_sequence), 
      [](const wstring& s) {
         wcout << s << L' '; 
      });
   wcout << endl;

   wcout << L"The following palindromes appear in the text:" << endl;
   for_each(begin(palindromes), end(palindromes), 
      [](const pair<wstring, wstring>& p) {
         wcout << L"   "  << p.first << L" " << p.second << endl;
      });
}

// Downloads the file at the given URL.
CString get_http_file(CInternetSession& session, const CString& url)
{
   CString result;

   // Reads data from an HTTP server.
   CHttpFile* http_file = NULL;

   try
   {
      // Open URL.
      http_file = reinterpret_cast<CHttpFile*>(session.OpenURL(url, 1));

      // Read the file. 
      if(http_file != NULL)
      {           
         UINT bytes_read;
         do
         {
            char buffer[10000];
            bytes_read = http_file->Read(buffer, sizeof(buffer));
            result += buffer;
         }
         while (bytes_read > 0);
      }
    }
   catch (CInternetException)
   {
      // TODO: Handle exception
   }

   // Clean up and return. 
   delete http_file;

   return result;
}

// Adds each word in the provided string to the provided vector of strings. 
void make_word_list(const wstring& text, vector<wstring>& words)
{
   // Add continuous sequences of alphanumeric characters to the  
   // string vector. 
   wstring current_word;
   for_each(begin(text), end(text), [&](wchar_t ch) {
      if (!iswalnum(ch))
      {
         if (current_word.length() > 0)
         {
            words.push_back(current_word);
            current_word.clear();
         }
      }
      else
      {
         current_word += ch;
      }
   });
}

// Finds the most common words whose length are greater than or equal to the  
// provided minimum. 
vector<pair<wstring, size_t>> find_common_words(const vector<wstring>& words, 
   size_t min_length, size_t count)
{
   typedef pair<wstring, size_t> pair;

   // Counts the occurrences of each word.
   map<wstring, size_t> counts;

   for_each(begin(words), end(words), [&](const wstring& word) {
      // Increment the count of words that are at least the minimum length. 
      if (word.length() >= min_length)
      {
         auto find = counts.find(word);
         if (find != end(counts))
            find->second++;
         else
            counts.insert(make_pair(word, 1));
      }
   });

   // Copy the contents of the map to a vector and sort the vector by 
   // the number of occurrences of each word.
   vector<pair> wordvector;
   copy(begin(counts), end(counts), back_inserter(wordvector));

   sort(begin(wordvector), end(wordvector), [](const pair& x, const pair& y) {
      return x.second > y.second;
   });

   size_t size = min(wordvector.size(), count);
   wordvector.erase(begin(wordvector) + size, end(wordvector));

   return wordvector;
}

// Finds the longest sequence of words that have the same first letter.
vector<wstring> find_longest_sequence(const vector<wstring>& words)
{
   // The current sequence of words that have the same first letter.
   vector<wstring> candidate_list;
   // The longest sequence of words that have the same first letter.
   vector<wstring> longest_run;

   for_each(begin(words), end(words), [&](const wstring& word) {
      // Initialize the candidate list if it is empty. 
      if (candidate_list.size() == 0)
      {
         candidate_list.push_back(word);
      }
      // Add the word to the candidate sequence if the first letter 
      // of the word is the same as each word in the sequence. 
      else if (word[0] == candidate_list[0][0])
      {
         candidate_list.push_back(word);
      }
      // The initial letter has changed; reset the candidate list. 
      else 
      {
         // Update the longest sequence if needed. 
         if (candidate_list.size() > longest_run.size())
            longest_run = candidate_list;

         candidate_list.clear();
         candidate_list.push_back(word);         
      }
   });

   return longest_run;
}

// Finds all pairs of palindromes that appear in the provided collection 
// of words.
vector<pair<wstring, wstring>> find_palindromes(const vector<wstring>& words, 
   size_t min_length)
{
   typedef pair<wstring, wstring> pair;
   vector<pair> result;

   // Copy the words to a new vector object and sort that vector.
   vector<wstring> wordvector;
   copy(begin(words), end(words), back_inserter(wordvector));
   sort(begin(wordvector), end(wordvector));

   // Add each word in the original collection to the result whose palindrome  
   // also exists in the collection. 
   for_each(begin(words), end(words), [&](const wstring& word) {
      if (word.length() >= min_length)
      {
         wstring rev = word;
         reverse(begin(rev), end(rev));

         if (rev != word && binary_search(begin(wordvector), end(wordvector), rev))
         {
            auto candidate1 = make_pair(word, rev);
            auto candidate2 = make_pair(rev, word);
            if (find(begin(result), end(result), candidate1) == end(result) &&
                find(begin(result), end(result), candidate2) == end(result))
               result.push_back(candidate1);
         }
      }
   });

   return result;
}

This example produces the following sample output.

Downloading 'The Iliad'...

Running serial version... took 953 ms.
Running parallel version... took 656 ms.

The most common words that have five or more letters are:
   their (953)
   shall (444)
   which (431)
   great (398)
   Hector (349)
   Achilles (309)
   through (301)
   these (268)
   chief (259)
The longest sequence of words that have the same first letter is:
   through the tempest to the tented
The following palindromes appear in the text:
   spots stops
   speed deeps
   keels sleek

This example uses the parallel_invoke algorithm to call multiple functions that act on the same data source. You can use the parallel_invoke algorithm to call any set of functions in parallel, not only those that act on the same data.

Because the parallel_invoke algorithm calls each work function in parallel, its performance is bounded by the function that takes the longest time to finish (that is, if the runtime processes each function on a separate processor). If this example performs more tasks in parallel than the number of available processors, multiple tasks can run on each processor. In this case, performance is bounded by the group of tasks that takes the longest time to finish.

Because this example performs three tasks in parallel, you should not expect performance to scale on computers that have more than three processors. To improve performance more, you can break the longest-running tasks into smaller tasks and run those tasks in parallel.

You can use the parallel_invoke algorithm instead of the concurrency::task_group and concurrency::structured_task_group classes if you do not require support for cancellation. For an example that compares the usage of the parallel_invoke algorithm versus task groups, see How to: Use parallel_invoke to Write a Parallel Sort Routine.

Compiling the Code

To compile the code, copy it and then paste it in a Visual Studio project, or paste it in a file that is named parallel-word-mining.cpp and then run the following command in a Visual Studio Command Prompt window.

cl.exe /EHsc /MD /DUNICODE /D_AFXDLL parallel-word-mining.cpp

See Also

Reference

parallel_invoke Function

Concepts

Parallel Algorithms