Share via

Facebook x.com LinkedIn Email

Asynchronous File I/O

Synchronous I/O means that the method is blocked until the I/O operation is complete, and then the method returns its data. With asynchronous I/O, a user can call BeginRead. The main thread can continue doing other work, and later the user will be able to process the data. Also, multiple I/O requests can be pending simultaneously.

To be informed when this data is available, you can call EndRead or EndWrite passing in the IAsyncResult corresponding to the I/O request you issued. You can also provide a callback method that should call EndRead or EndWrite to figure out how many bytes were read or written. Asynchronous I/O can offer better performance when many I/O requests are pending simultaneously, but generally requires some significant restructuring of your application to work correctly.

The Stream class supports the mixing of synchronous and asynchronous reads and writes on the same stream, regardless of whether the operating system allows this. Stream provides default implementations of asynchronous read and write operations in terms of their synchronous implementations, and provides default implementations of synchronous read and write operations in terms of their asynchronous implementations.

When implementing a class derived from Stream, it is necessary to provide an implementation for either the synchronous or the asynchronous Read and Write methods. While overriding Read and Write is permissible, and the default implementations of the asynchronous methods (BeginRead, EndRead, BeginWrite, and EndWrite) will work with your implementation of the synchronous methods, this does not provide the most efficient performance. Similarly, the synchronous Read and Write methods will work correctly if you provide an implementation of the asynchronous methods, but performance is generally better if you specifically implement the synchronous methods. The default implementations of ReadByte and WriteByte call the synchronous Read and Write methods with a one-element byte array. When deriving classes from Stream, if you have an internal byte buffer, it is strongly recommended that you override these methods to access your internal buffer for better performance.

A stream that connects to a backing store overrides either the synchronous or asynchronous Read and Write methods to get the functionality of the other by default. If a stream does not support asynchronous or synchronous operations, the implementer need only make the appropriate methods throw exceptions.

The following example is an asynchronous implementation of a hypothetical bulk image processor, followed by a synchronous implementation example. This code is designed to perform a CPU-intensive operation on every file in a directory. For more information, see the Asynchronous Programming Design Patterns topic.

Imports System
Imports System.IO
Imports System.Threading
Imports System.Runtime.InteropServices
Imports System.Runtime.Remoting.Messaging
Imports System.Security.Permissions
Imports Microsoft.Win32.SafeHandles
Module BulkImageProcAsync
 Dim ImageBaseName As String = "tmpImage-"
 Dim numImages As Integer = 200
 Dim numPixels As Integer = 512 * 512
 ' ProcessImage has a simple O(N) loop, and you can vary the number
 ' of times you repeat that loop to make the application more CPU-
 ' bound or more IO-bound.
 Dim processImageRepeats As Integer = 20
 ' Threads must decrement NumImagesToFinish, and protect
 ' their access to it through a mutex.
 Dim NumImagesToFinish As Integer = numImages
 Dim NumImagesMutex(-1) As [Object]
 ' WaitObject is signalled when all image processing is done.
 Dim WaitObject(-1) As [Object]
 Structure ImageStateObject
 Public pixels() As Byte
 Public imageNum As Integer
 Public fs As FileStream
 End Structure
 <SecurityPermissionAttribute(SecurityAction.Demand, Flags:=SecurityPermissionFlag.UnmanagedCode)> _
 Sub MakeImageFiles()
 Dim sides As Integer = Fix(Math.Sqrt(numPixels))
 Console.Write("Making {0} {1}x{1} images... ", numImages, sides)
 Dim pixels(numPixels) As Byte
 Dim i As Integer
 For i = 0 To numPixels
 pixels(i) = 255
 Next i
 Dim fs As FileStream
 For i = 0 To numImages
 fs = New FileStream(ImageBaseName + i.ToString() + ".tmp", FileMode.Create, FileAccess.Write, FileShare.None, 8192, False)
 fs.Write(pixels, 0, pixels.Length)
 FlushFileBuffers(fs.SafeFileHandle)
 fs.Close()
 Next i
 fs = Nothing
 Console.WriteLine("Done.")
 End Sub
 Sub ReadInImageCallback(ByVal asyncResult As IAsyncResult)
 Dim state As ImageStateObject = CType(asyncResult.AsyncState, ImageStateObject)
 Dim stream As Stream = state.fs
 Dim bytesRead As Integer = stream.EndRead(asyncResult)
 If bytesRead <> numPixels Then
 Throw New Exception(String.Format("In ReadInImageCallback, got the wrong number of " + "bytes from the image: {0}.", bytesRead))
 End If
 ProcessImage(state.pixels, state.imageNum)
 stream.Close()
 ' Now write out the image. 
 ' Using asynchronous I/O here appears not to be best practice.
 ' It ends up swamping the threadpool, because the threadpool
 ' threads are blocked on I/O requests that were just queued to
 ' the threadpool. 
 Dim fs As New FileStream(ImageBaseName + state.imageNum.ToString() + ".done", FileMode.Create, FileAccess.Write, FileShare.None, 4096, False)
 fs.Write(state.pixels, 0, numPixels)
 fs.Close()
 ' This application model uses too much memory.
 ' Releasing memory as soon as possible is a good idea, 
 ' especially global state.
 state.pixels = Nothing
 fs = Nothing
 ' Record that an image is finished now.
 SyncLock NumImagesMutex
 NumImagesToFinish -= 1
 If NumImagesToFinish = 0 Then
 Monitor.Enter(WaitObject)
 Monitor.Pulse(WaitObject)
 Monitor.Exit(WaitObject)
 End If
 End SyncLock
 End Sub
 Sub ProcessImage(ByVal pixels() As Byte, ByVal imageNum As Integer)
 Console.WriteLine("ProcessImage {0}", imageNum)
 Dim y As Integer
 ' Perform some CPU-intensive operation on the image.
 Dim x As Integer
 For x = 0 To processImageRepeats
 For y = 0 To numPixels
 pixels(y) = 1
 Next y
 Next x
 Console.WriteLine("ProcessImage {0} done.", imageNum)
 End Sub
 Sub ProcessImagesInBulk()
 Console.WriteLine("Processing images... ")
 Dim t0 As Long = Environment.TickCount
 NumImagesToFinish = numImages
 Dim readImageCallback As New AsyncCallback(AddressOf ReadInImageCallback)
 Dim i As Integer
 For i = 0 To numImages
 Dim state As New ImageStateObject()
 state.pixels = New Byte(numPixels) {}
 state.imageNum = i
 ' Very large items are read only once, so you can make the 
 ' buffer on the FileStream very small to save memory.
 Dim fs As New FileStream(ImageBaseName + i.ToString() + ".tmp", FileMode.Open, FileAccess.Read, FileShare.Read, 1, True)
 state.fs = fs
 fs.BeginRead(state.pixels, 0, numPixels, readImageCallback, state)
 Next i
 ' Determine whether all images are done being processed. 
 ' If not, block until all are finished.
 Dim mustBlock As Boolean = False
 SyncLock NumImagesMutex
 If NumImagesToFinish > 0 Then
 mustBlock = True
 End If
 End SyncLock
 If mustBlock Then
 Console.WriteLine("All worker threads are queued. " + " Blocking until they complete. numLeft: {0}", NumImagesToFinish)
 Monitor.Enter(WaitObject)
 Monitor.Wait(WaitObject)
 Monitor.Exit(WaitObject)
 End If
 Dim t1 As Long = Environment.TickCount
 Console.WriteLine("Total time processing images: {0}ms", t1 - t0)
 End Sub
 Sub Cleanup()
 Dim i As Integer
 For i = 0 To numImages
 File.Delete(ImageBaseName + i.ToString + ".tmp")
 File.Delete(ImageBaseName + i.ToString + ".done")
 Next i
 End Sub
 Sub TryToClearDiskCache()
 ' Try to force all pending writes to disk, and clear the
 ' disk cache of any data.
 Dim bytes(100 * (1 << 20)) As Byte
 Dim i As Integer
 For i = 0 To bytes.Length - 1
 bytes(i) = 0
 Next i
 bytes = Nothing
 GC.Collect()
 Thread.Sleep(2000)
 End Sub
 Sub Main(ByVal args() As String)
 Console.WriteLine("Bulk image processing sample application," + " using asynchronous IO")
 Console.WriteLine("Simulates applying a simple " + "transformation to {0} ""images""", numImages)
 Console.WriteLine("(Async FileStream & Threadpool benchmark)")
 Console.WriteLine("Warning - this test requires {0} " + "bytes of temporary space", numPixels * numImages * 2)
 If args.Length = 1 Then
 processImageRepeats = Int32.Parse(args(0))
 Console.WriteLine("ProcessImage inner loop - {0}.", processImageRepeats)
 End If
 MakeImageFiles()
 TryToClearDiskCache()
 ProcessImagesInBulk()
 Cleanup()
 End Sub
 <DllImport("KERNEL32", SetLastError:=True)> _
 Sub FlushFileBuffers(ByVal handle As SafeFileHandle)
 End Sub
End Module
using System;
using System.IO;
using System.Threading;
using System.Runtime.InteropServices;
using System.Runtime.Remoting.Messaging;
using System.Security.Permissions;
using Microsoft.Win32.SafeHandles;
public class BulkImageProcAsync
{
 public const String ImageBaseName = "tmpImage-";
 public const int numImages = 200;
 public const int numPixels = 512 * 512;
 // ProcessImage has a simple O(N) loop, and you can vary the number
 // of times you repeat that loop to make the application more CPU-
 // bound or more IO-bound.
 public static int processImageRepeats = 20;
 // Threads must decrement NumImagesToFinish, and protect
 // their access to it through a mutex.
 public static int NumImagesToFinish = numImages;
 public static Object[] NumImagesMutex = new Object[0];
 // WaitObject is signalled when all image processing is done.
 public static Object[] WaitObject = new Object[0];
 public class ImageStateObject
 {
 public byte[] pixels;
 public int imageNum;
 public FileStream fs;
 }
 [SecurityPermissionAttribute(SecurityAction.Demand, Flags=SecurityPermissionFlag.UnmanagedCode)]
 public static void MakeImageFiles()
 {
 int sides = (int)Math.Sqrt(numPixels);
 Console.Write("Making {0} {1}x{1} images... ", numImages,
 sides);
 byte[] pixels = new byte[numPixels];
 int i;
 for (i = 0; i < numPixels; i++)
 pixels[i] = (byte)i;
 FileStream fs;
 for (i = 0; i < numImages; i++)
 {
 fs = new FileStream(ImageBaseName + i + ".tmp",
 FileMode.Create, FileAccess.Write, FileShare.None,
 8192, false);
 fs.Write(pixels, 0, pixels.Length);
 FlushFileBuffers(fs.SafeFileHandle);
 fs.Close();
 }
 fs = null;
 Console.WriteLine("Done.");
 }
 public static void ReadInImageCallback(IAsyncResult asyncResult)
 {
 ImageStateObject state = (ImageStateObject)asyncResult.AsyncState;
 Stream stream = state.fs;
 int bytesRead = stream.EndRead(asyncResult);
 if (bytesRead != numPixels)
 throw new Exception(String.Format
 ("In ReadInImageCallback, got the wrong number of " +
 "bytes from the image: {0}.", bytesRead));
 ProcessImage(state.pixels, state.imageNum);
 stream.Close();
 // Now write out the image. 
 // Using asynchronous I/O here appears not to be best practice.
 // It ends up swamping the threadpool, because the threadpool
 // threads are blocked on I/O requests that were just queued to
 // the threadpool. 
 FileStream fs = new FileStream(ImageBaseName + state.imageNum +
 ".done", FileMode.Create, FileAccess.Write, FileShare.None,
 4096, false);
 fs.Write(state.pixels, 0, numPixels);
 fs.Close();
 // This application model uses too much memory.
 // Releasing memory as soon as possible is a good idea, 
 // especially global state.
 state.pixels = null;
 fs = null;
 // Record that an image is finished now.
 lock (NumImagesMutex)
 {
 NumImagesToFinish--;
 if (NumImagesToFinish == 0)
 {
 Monitor.Enter(WaitObject);
 Monitor.Pulse(WaitObject);
 Monitor.Exit(WaitObject);
 }
 }
 }
 public static void ProcessImage(byte[] pixels, int imageNum)
 {
 Console.WriteLine("ProcessImage {0}", imageNum);
 int y;
 // Perform some CPU-intensive operation on the image.
 for (int x = 0; x < processImageRepeats; x += 1)
 for (y = 0; y < numPixels; y += 1)
 pixels[y] += 1;
 Console.WriteLine("ProcessImage {0} done.", imageNum);
 }
 public static void ProcessImagesInBulk()
 {
 Console.WriteLine("Processing images... ");
 long t0 = Environment.TickCount;
 NumImagesToFinish = numImages;
 AsyncCallback readImageCallback = new
 AsyncCallback(ReadInImageCallback);
 for (int i = 0; i < numImages; i++)
 {
 ImageStateObject state = new ImageStateObject();
 state.pixels = new byte[numPixels];
 state.imageNum = i;
 // Very large items are read only once, so you can make the 
 // buffer on the FileStream very small to save memory.
 FileStream fs = new FileStream(ImageBaseName + i + ".tmp",
 FileMode.Open, FileAccess.Read, FileShare.Read, 1, true);
 state.fs = fs;
 fs.BeginRead(state.pixels, 0, numPixels, readImageCallback,
 state);
 }
 // Determine whether all images are done being processed. 
 // If not, block until all are finished.
 bool mustBlock = false;
 lock (NumImagesMutex)
 {
 if (NumImagesToFinish > 0)
 mustBlock = true;
 }
 if (mustBlock)
 {
 Console.WriteLine("All worker threads are queued. " +
 " Blocking until they complete. numLeft: {0}",
 NumImagesToFinish);
 Monitor.Enter(WaitObject);
 Monitor.Wait(WaitObject);
 Monitor.Exit(WaitObject);
 }
 long t1 = Environment.TickCount;
 Console.WriteLine("Total time processing images: {0}ms",
 (t1 - t0));
 }
 public static void Cleanup()
 {
 for (int i = 0; i < numImages; i++)
 {
 File.Delete(ImageBaseName + i + ".tmp");
 File.Delete(ImageBaseName + i + ".done");
 }
 }
 public static void TryToClearDiskCache()
 {
 // Try to force all pending writes to disk, and clear the
 // disk cache of any data.
 byte[] bytes = new byte[100 * (1 << 20)];
 for (int i = 0; i < bytes.Length; i++)
 bytes[i] = 0;
 bytes = null;
 GC.Collect();
 Thread.Sleep(2000);
 }
 public static void Main(String[] args)
 {
 Console.WriteLine("Bulk image processing sample application," +
 " using asynchronous IO");
 Console.WriteLine("Simulates applying a simple " +
 "transformation to {0} \"images\"", numImages);
 Console.WriteLine("(Async FileStream & Threadpool benchmark)");
 Console.WriteLine("Warning - this test requires {0} " +
 "bytes of temporary space", (numPixels * numImages * 2));
 if (args.Length == 1)
 {
 processImageRepeats = Int32.Parse(args[0]);
 Console.WriteLine("ProcessImage inner loop - {0}.",
 processImageRepeats);
 }
 MakeImageFiles();
 TryToClearDiskCache();
 ProcessImagesInBulk();
 Cleanup();
 }
 [DllImport("KERNEL32", SetLastError = true)]
 private static extern void FlushFileBuffers(SafeFileHandle handle);
}

Here is a synchronous example of the same idea.

Imports System
Imports System.IO
Imports System.Threading
Imports System.Runtime.InteropServices
Imports System.Runtime.Remoting.Messaging
Imports System.Security.Permissions
Imports Microsoft.Win32.SafeHandles
Module BulkImageProcSync
 Dim ImageBaseName As String = "tmpImage-"
 Dim numImages As Integer = 200
 Dim numPixels As Integer = 512 * 512
 ' ProcessImage has a simple O(N) loop, and you can vary the number
 ' of times you repeat that loop to make the application more CPU-
 ' bound or more IO-bound.
 Dim processImageRepeats As Integer = 20
 <SecurityPermissionAttribute(SecurityAction.Demand, Flags:=SecurityPermissionFlag.UnmanagedCode)> _
 Sub MakeImageFiles()
 Dim sides As Integer = Fix(Math.Sqrt(numPixels))
 Console.Write("Making {0} {1}x{1} images... ", numImages, sides)
 Dim pixels(numPixels) As Byte
 Dim i As Integer
 For i = 0 To numPixels
 pixels(i) = 255
 Next i
 Dim fs As FileStream
 For i = 0 To numImages
 fs = New FileStream(ImageBaseName + i.ToString + ".tmp", FileMode.Create, FileAccess.Write, FileShare.None, 8192, False)
 fs.Write(pixels, 0, pixels.Length)
 FlushFileBuffers(fs.SafeFileHandle)
 fs.Close()
 Next i
 fs = Nothing
 Console.WriteLine("Done.")
 End Sub
 Sub ProcessImage(ByVal pixels() As Byte, ByVal imageNum As Integer)
 Console.WriteLine("ProcessImage {0}", imageNum)
 Dim y As Integer
 ' Perform some CPU-intensive operation on the image.
 Dim x As Integer
 For x = 0 To processImageRepeats
 For y = 0 To numPixels
 pixels(y) = 1
 Next y
 Next x
 Console.WriteLine("ProcessImage {0} done.", imageNum)
 End Sub
 Sub ProcessImagesInBulk()
 Console.WriteLine("Processing images... ")
 Dim t0 As Long = Environment.TickCount
 Dim pixels(numPixels) As Byte
 Dim input As FileStream
 Dim output As FileStream
 Dim i As Integer
 For i = 0 To numImages
 input = New FileStream(ImageBaseName + i.ToString + ".tmp", FileMode.Open, FileAccess.Read, FileShare.Read, 4196, False)
 input.Read(pixels, 0, numPixels)
 input.Close()
 ProcessImage(pixels, i)
 output = New FileStream(ImageBaseName + i.ToString + ".done", FileMode.Create, FileAccess.Write, FileShare.None, 4196, False)
 output.Write(pixels, 0, numPixels)
 output.Close()
 Next i
 input = Nothing
 output = Nothing
 Dim t1 As Long = Environment.TickCount
 Console.WriteLine("Total time processing images: {0}ms", t1 - t0)
 End Sub
 Sub Cleanup()
 Dim i As Integer
 For i = 0 To numImages
 File.Delete(ImageBaseName + i.ToString + ".tmp")
 File.Delete(ImageBaseName + i.ToString + ".done")
 Next i
 End Sub
 Sub TryToClearDiskCache()
 Dim bytes(100 * (1 << 20)) As Byte
 Dim i As Integer
 For i = 0 To bytes.Length - 1
 bytes(i) = 0
 Next i
 bytes = Nothing
 GC.Collect()
 Thread.Sleep(2000)
 End Sub
 Sub Main(ByVal args() As String)
 Console.WriteLine("Bulk image processing sample application," + " using synchronous I/O.")
 Console.WriteLine("Simulates applying a simple " + "transformation to {0} ""images.""", numImages)
 Console.WriteLine("(ie, Sync FileStream benchmark).")
 Console.WriteLine("Warning - this test requires {0} " + "bytes of temporary space", numPixels * numImages * 2)
 If args.Length = 1 Then
 processImageRepeats = Int32.Parse(args(0))
 Console.WriteLine("ProcessImage inner loop {0}", processImageRepeats)
 End If
 MakeImageFiles()
 TryToClearDiskCache()
 ProcessImagesInBulk()
 Cleanup()
 End Sub
 <DllImport("KERNEL32", SetLastError:=True)> _
 Sub FlushFileBuffers(ByVal handle As SafeFileHandle)
 End Sub
End Module
using System;
using System.IO;
using System.Threading;
using System.Runtime.InteropServices;
using System.Runtime.Remoting.Messaging;
using System.Security.Permissions;
using Microsoft.Win32.SafeHandles;
public class BulkImageProcSync
{
 public const String ImageBaseName = "tmpImage-";
 public const int numImages = 200;
 public const int numPixels = 512 * 512;
 // ProcessImage has a simple O(N) loop, and you can vary the number
 // of times you repeat that loop to make the application more CPU-
 // bound or more IO-bound.
 public static int processImageRepeats = 20;
 [SecurityPermissionAttribute(SecurityAction.Demand, Flags=SecurityPermissionFlag.UnmanagedCode)]
 public static void MakeImageFiles()
 {
 int sides = (int)Math.Sqrt(numPixels);
 Console.Write("Making {0} {1}x{1} images... ", numImages,
 sides);
 byte[] pixels = new byte[numPixels];
 int i;
 for (i = 0; i < numPixels; i++)
 pixels[i] = (byte)i;
 FileStream fs;
 for (i = 0; i < numImages; i++)
 {
 fs = new FileStream(ImageBaseName + i + ".tmp",
 FileMode.Create, FileAccess.Write, FileShare.None,
 8192, false);
 fs.Write(pixels, 0, pixels.Length);
 FlushFileBuffers(fs.SafeFileHandle);
 fs.Close();
 }
 fs = null;
 Console.WriteLine("Done.");
 }
 public static void ProcessImage(byte[] pixels, int imageNum)
 {
 Console.WriteLine("ProcessImage {0}", imageNum);
 int y;
 // Perform some CPU-intensive operation on the image.
 for (int x = 0; x < processImageRepeats; x += 1)
 for (y = 0; y < numPixels; y += 1)
 pixels[y] += 1;
 Console.WriteLine("ProcessImage {0} done.", imageNum);
 }
 public static void ProcessImagesInBulk()
 {
 Console.WriteLine("Processing images... ");
 long t0 = Environment.TickCount;
 byte[] pixels = new byte[numPixels];
 FileStream input;
 FileStream output;
 for (int i = 0; i < numImages; i++)
 {
 input = new FileStream(ImageBaseName + i + ".tmp",
 FileMode.Open, FileAccess.Read, FileShare.Read,
 4196, false);
 input.Read(pixels, 0, numPixels);
 input.Close();
 ProcessImage(pixels, i);
 output = new FileStream(ImageBaseName + i + ".done",
 FileMode.Create, FileAccess.Write, FileShare.None,
 4196, false);
 output.Write(pixels, 0, numPixels);
 output.Close();
 }
 input = null;
 output = null;
 long t1 = Environment.TickCount;
 Console.WriteLine("Total time processing images: {0}ms",
 (t1 - t0));
 }
 public static void Cleanup()
 {
 for (int i = 0; i < numImages; i++)
 {
 File.Delete(ImageBaseName + i + ".tmp");
 File.Delete(ImageBaseName + i + ".done");
 }
 }
 public static void TryToClearDiskCache()
 {
 byte[] bytes = new byte[100 * (1 << 20)];
 for (int i = 0; i < bytes.Length; i++)
 bytes[i] = 0;
 bytes = null;
 GC.Collect();
 Thread.Sleep(2000);
 }
 public static void Main(String[] args)
 {
 Console.WriteLine("Bulk image processing sample application," +
 " using synchronous I/O.");
 Console.WriteLine("Simulates applying a simple " +
 "transformation to {0} \"images.\"", numImages);
 Console.WriteLine("(ie, Sync FileStream benchmark).");
 Console.WriteLine("Warning - this test requires {0} " +
 "bytes of temporary space", (numPixels * numImages * 2));
 if (args.Length == 1)
 {
 processImageRepeats = Int32.Parse(args[0]);
 Console.WriteLine("ProcessImage inner loop � {0}",
 processImageRepeats);
 }
 MakeImageFiles();
 TryToClearDiskCache();
 ProcessImagesInBulk();
 Cleanup();
 }
 [DllImport("KERNEL32", SetLastError = true)]
 private static extern void FlushFileBuffers(SafeFileHandle handle);
}

See Also

Reference

Stream

Stream.Read

Stream.Write

Stream.BeginRead

Stream.BeginWrite

Stream.EndRead

Stream.EndWrite

IAsyncResult

Mutex

Other Resources

File and Stream I/O


  • Last updated on 2013年02月04日