Condividi tramite


Completamento asincrono I/O

Utilizzando gli eventi di completamento I/O, un thread del pool dei thread elabora i dati solo quando questi vengono ricevuti. Al termine dell'elaborazione, poi, il thread torna al pool dei thread.

Per effettuare una chiamata I/O asincrona, un handle di I/O del sistema operativo deve essere associato al pool dei thread ed è necessario specificare un metodo di callback. Al termine dell'operazione di I/O, un thread del pool di thread richiama il metodo di callback.

Nell'esempio di codice C# che segue viene dimostrata una semplice operazione di I/O asincrona.

Nota   Questo esempio richiede più di 100 MB di memoria disponibile.

using System;
using System.IO;
using System.Threading;
using System.Runtime.InteropServices;

public class BulkImageProcAsync{
   public const String ImageBaseName = "tmpImage-";
   public const int numImages = 200;
   public const int numPixels = 512*512;

   // ProcessImage has a simple O(N) loop, and you can vary the number
   // of times you repeat that loop to make the application more CPU-bound
   // or more I/O-bound.
   public static int processImageRepeats = 20;

   // Threads must decrement NumImagesToFinish, and protect
   // their access to it through a mutex.
   public static int NumImagesToFinish = numImages;
   public static Object NumImagesMutex = new Object[0];
   
   // WaitObject is signalled when all image processing is done.
   public static Object WaitObject = new Object[0];

   public class ImageStateObject{
      public byte[] pixels;
      public int imageNum;
      public FileStream fs;
   }

   public static void MakeImageFiles(){
      int sides = (int) Math.Sqrt(numPixels);
      Console.Write("Making "+numImages+" "+sides+"x"+sides+" images...  ");
      byte[] pixels = new byte[numPixels];
      for(int i=0; i<numPixels; i++)
      pixels[i] = (byte) i;

      for(int i=0; i<numImages; i++){
         FileStream fs = new FileStream(
            ImageBaseName+i+".tmp", 
            FileMode.Create,
            FileAccess.Write, 
            FileShare.None, 
            8192, 
            false
         );
         fs.Write(pixels, 0, pixels.Length);
         FlushFileBuffers(fs.Handle);
         fs.Close();
      }
      Console.WriteLine("Done.");
   }

   public static void ReadInImageCallback(IAsyncResult asyncResult){

      ImageStateObject state = (ImageStateObject) asyncResult.AsyncState;

      Stream stream = state.fs;
      int bytesRead = stream.EndRead(asyncResult);
      if (bytesRead != numPixels)
         throw new Exception("In ReadInImageCallback, got wrong number of bytes from the image!  got: " + bytesRead);

      ProcessImage(state.pixels, state.imageNum);
      stream.Close();

      // Now write out the image. No async here.
      FileStream fs = new FileStream(
         ImageBaseName + state.imageNum + ".done", 
         FileMode.Create, 
         FileAccess.Write, 
         FileShare.None, 
         4096, 
         false
      );

      fs.Write(state.pixels, 0, numPixels);
      fs.Close();

      // This application model uses too much memory.
      // Releasing memory as soon as possible is a good idea, 
      // especially global state.
      state.pixels = null;

      // Record that an image is done now.
      lock(NumImagesMutex){
         NumImagesToFinish--;
         if (NumImagesToFinish==0){
            lock(WaitObject){
               Monitor.Pulse(WaitObject);
            }
         }
      }
   }

   public static void ProcessImage(byte[] pixels, int imageNum){

      Console.WriteLine("ProcessImage "+imageNum);

      // Do some CPU-intensive operation on the image.
      for(int i=0; i<processImageRepeats; i++)
         for(int j=0; j<numPixels; j++)
            pixels[j] += 1;

      Console.WriteLine("ProcessImage "+imageNum+" done.");
   }

   public static void ProcessImagesInBulk(){

      Console.WriteLine("Processing images...  ");
      long t0 = Environment.TickCount;
      NumImagesToFinish = numImages;

      AsyncCallback readImageCallback = new AsyncCallback(ReadInImageCallback);
      for(int i=0; i<numImages; i++){

         ImageStateObject state = new ImageStateObject();
         state.pixels = new byte[numPixels];
         state.imageNum = i;

         // Very large items are read only once, so the 
         // buffer on the file stream can be very small to save memory.

         FileStream fs = new FileStream(
            ImageBaseName+i+".tmp",
            FileMode.Open, 
            FileAccess.Read, 
            FileShare.Read, 
            1, 
            true
         );
         state.fs = fs;
         fs.BeginRead(state.pixels, 0, numPixels, readImageCallback, state);
      }

      // Determine whether all images are done being processed.  
      // If not, block until all are finished.
      bool mustBlock = false;
      lock (NumImagesMutex){
         if (NumImagesToFinish > 0)
            mustBlock = true;
      }
      if (mustBlock){
         Console.WriteLine(
            "All worker threads are queued... Blocking until they complete.  numLeft: " + NumImagesToFinish
         );
            lock(WaitObject){
            Monitor.Pulse(WaitObject);
            }      
      }

      long t1 = Environment.TickCount;
      Console.WriteLine("Total time processing images: {0} ms", (t1-t0));
   }

   public static void Cleanup(){
      for(int i=0; i<numImages; i++){
         File.Delete(ImageBaseName+i+".tmp");
         File.Delete(ImageBaseName+i+".done");
      }
   }

   public static void TryToClearDiskCache(){

      // Try to force all pending writes to disk, and clear the
      // disk cache of any data.
      byte[] bytes = new byte[100*(1<<20)];
      for(int i=0; i<bytes.Length; i++)
      bytes[i] = 0;
      bytes = null;
      GC.Collect();
      Thread.Sleep(2000);
   }

   public static void Main(String[] args){

      Console.WriteLine("Bulk image processing sample application, using asynchronous I/O");
      Console.WriteLine("Simulates applying a simple transformation to "+numImages+" \"images\"");
      Console.WriteLine("(ie, Async FileStream & Threadpool benchmark)");
      Console.WriteLine("Warning - this test requires "+(numPixels * numImages * 2)+" bytes of tmp space");

      if (args.Length==1){
         processImageRepeats = Int32.Parse(args[0]);
         Console.WriteLine("ProcessImage inner loop - "+processImageRepeats);
      }

      MakeImageFiles();
      TryToClearDiskCache();
      ProcessImagesInBulk();
      Cleanup();
   }
 
   [DllImport("KERNEL32", SetLastError=true)]
   private static extern void FlushFileBuffers(IntPtr handle);

}

Vedere anche

Oggetti e funzionalità del threading | ThreadPool