引言
本文主要从线程的基础用法,CLR线程池当中工作者线程与I/O线程的开发,并行操作PLINQ等多个方面介绍多线程的开发。
其中委托的BeginInvoke方法以及回调函数最为常用。而 I/O线程可能容易遭到大家的忽略,其实在开发多线程系统,更应该多留意I/O线程的操作。特别是在ASP.NET开发当中,可能更多人只会留意在客户端使用Ajax或者在服务器端使用UpdatePanel。其实合理使用I/O线程在通讯项目或文件下载时,能尽量降低IIS的压力。并行编程是Framework4.0中极力推广的异步操作方式,更值得更深入地学习。希望本篇文章能对各位的学习研究有所帮助,当中有所错漏的地方敬请点评。
目录
五、CLR线程池的I/O线程
在前一节所介绍的线程都属于CLR线程池的工作者线程,这一节开始为大家介绍一下CLR线程池的I/O线程
I/O 线程是.NET专为访问外部资源所设置的一种线程,因为访问外部资源常常要受到外界因素的影响,为了防止让主线程受影响而长期处于阻塞状态,.NET为多个I/O操作都建立起了异步方法,例如:FileStream、TCP/IP、WebRequest、WebService等等,而且每个异步方法的使用方式都非常类似,都是以BeginXXX为开始,以EndXXX结束,下面为大家一一解说。
5.1 异步读写 FileStream
需要在 FileStream 异步调用 I/O线程,必须使用以下构造函数建立 FileStream 对象,并把useAsync设置为 true。
FileStream stream = new FileStream ( string path, FileMode mode, FileAccess access, FileShare share, int bufferSize,bool useAsync ) ;
其中 path 是文件的相对路径或绝对路径; mode 确定如何打开或创建文件; access 确定访问文件的方式; share 确定文件如何进程共享; bufferSize 是代表缓冲区大小,一般默认最小值为8,在启动异步读取或写入时,文件大小一般大于缓冲大小; userAsync代表是否启动异步I/O线程。
5.1.1 异步写入
FileStream中包含BeginWrite、EndWrite 方法可以启动I/O线程进行异步写入。
public override IAsyncResult BeginWrite ( byte[] array, int offset, int numBytes, AsyncCallback userCallback, Object stateObject )
public override void EndWrite (IAsyncResult asyncResult )
BeginWrite 返回值为IAsyncResult, 使用方式与委托的BeginInvoke方法相似,最好就是使用回调函数,避免线程阻塞。在最后两个参数中,参数AsyncCallback用于绑定回调函数; 参数Object用于传递外部数据。要注意一点:AsyncCallback所绑定的回调函数必须是带单个 IAsyncResult 参数的无返回值方法。
在例子中,把FileStream作为外部数据传递到回调函数当中,然后在回调函数中利用IAsyncResult.AsyncState获取FileStream对象,最后通过FileStream.EndWrite(IAsyncResult)结束写入。1 class Program 2 { 3 static void Main(string[] args) 4 { 5 //把线程池的最大值设置为1000 6 ThreadPool.SetMaxThreads(1000, 1000); 7 ThreadPoolMessage("Start"); 8 9 //新立文件File.sour 10 FileStream stream = new FileStream("File.sour", FileMode.OpenOrCreate, 11 FileAccess.ReadWrite,FileShare.ReadWrite,1024,true); 12 byte[] bytes = new byte[16384]; 13 string message = "An operating-system ThreadId has no fixed relationship........"; 14 bytes = Encoding.Unicode.GetBytes(message); 15 16 //启动异步写入 17 stream.BeginWrite(bytes, 0, (int)bytes.Length,new AsyncCallback(Callback),stream); 18 stream.Flush(); 19 20 Console.ReadKey(); 21 } 22 23 static void Callback(IAsyncResult result) 24 { 25 //显示线程池现状 26 Thread.Sleep(200); 27 ThreadPoolMessage("AsyncCallback"); 28 //结束异步写入 29 FileStream stream = (FileStream)result.AsyncState; 30 stream.EndWrite(result); 31 stream.Close(); 32 } 33 34 //显示线程池现状 35 static void ThreadPoolMessage(string data) 36 { 37 int a, b; 38 ThreadPool.GetAvailableThreads(out a, out b); 39 string message = string.Format("{0}\n CurrentThreadId is {1}\n "+ 40 "WorkerThreads is:{2} CompletionPortThreads is :{3}", 41 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); 42 Console.WriteLine(message); 43 } 44 }
由输出结果可以看到,在使用FileStream.BeginWrite方法后,系统将自动启动CLR线程池中I/O线程。
5.1.2 异步读取
FileStream 中包含 BeginRead 与 EndRead 可以异步调用I/O线程进行读取。
public override IAsyncResult BeginRead ( byte[] array,int offset,int numBytes, AsyncCallback userCallback,Object stateObject)
public override int EndRead(IAsyncResult asyncResult)
其使用方式与BeginWrite和EndWrite相似,AsyncCallback用于绑定回调函数; Object用于传递外部数据。在回调函数只需要使用IAsyncResut.AsyncState就可获取外部数据。EndWrite 方法会返回从流读取到的字节数量。
首先定义 FileData 类,里面包含FileStream对象,byte[] 数组和长度。然后把FileData对象作为外部数据传到回调函数,在回调函数中,把IAsyncResult.AsyncState强制转换为FileData,然后通过FileStream.EndRead(IAsyncResult)结束读取。最后比较一下长度,若读取到的长度与输入的数据长度不一至,则抛出异常。
1 class Program 2 { 3 public class FileData 4 { 5 public FileStream Stream; 6 public int Length; 7 public byte[] ByteData; 8 } 9 10 static void Main(string[] args) 11 { 12 //把线程池的最大值设置为1000 13 ThreadPool.SetMaxThreads(1000, 1000); 14 ThreadPoolMessage("Start"); 15 ReadFile(); 16 17 Console.ReadKey(); 18 } 19 20 static void ReadFile() 21 { 22 byte[] byteData=new byte[80961024]; 23 FileStream stream = new FileStream("File1.sour", FileMode.OpenOrCreate, 24 FileAccess.ReadWrite, FileShare.ReadWrite, 1024, true); 25 26 //把FileStream对象,byte[]对象,长度等有关数据绑定到FileData对象中,以附带属性方式送到回调函数 27 FileData fileData = new FileData(); 28 fileData.Stream = stream; 29 fileData.Length = (int)stream.Length; 30 fileData.ByteData = byteData; 31 32 //启动异步读取 33 stream.BeginRead(byteData, 0, fileData.Length, new AsyncCallback(Completed), fileData); 34 } 35 36 static void Completed(IAsyncResult result) 37 { 38 ThreadPoolMessage("Completed"); 39 40 //把AsyncResult.AsyncState转换为FileData对象,以FileStream.EndRead完成异步读取 41 FileData fileData = (FileData)result.AsyncState; 42 int length=fileData.Stream.EndRead(result); 43 fileData.Stream.Close(); 44 45 //如果读取到的长度与输入长度不一致,则抛出异常 46 if (length != fileData.Length) 47 throw new Exception("Stream is not complete!"); 48 49 string data=Encoding.ASCII.GetString(fileData.ByteData, 0, fileData.Length); 50 Console.WriteLine(data.Substring(2,22)); 51 } 52 53 //显示线程池现状 54 static void ThreadPoolMessage(string data) 55 { 56 int a, b; 57 ThreadPool.GetAvailableThreads(out a, out b); 58 string message = string.Format("{0}\n CurrentThreadId is {1}\n "+ 59 "WorkerThreads is:{2} CompletionPortThreads is :{3}", 60 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); 61 Console.WriteLine(message); 62 } 63 64 }
由输出结果可以看到,在使用FileStream.BeginRead方法后,系统将自动启动CLR线程池中I/O线程。
5.2 异步操作TCP/IP套接字
在介绍 TCP/IP 套接字前先简单介绍一下 NetworkStream 类,它是用于网络访问的基础数据流。 NetworkStream 提供了好几个方法控制套接字数据的发送与接收, 其中BeginRead、EndRead、BeginWrite、EndWrite 能够实现异步操作,而且异步线程是来自于CLR线程池的I/O线程。
public override int ReadByte ()
public override int Read (byte[] buffer,int offset, int size)public override void WriteByte (byte value)
public override void Write (byte[] buffer,int offset, int size)public override IAsyncResult BeginRead (byte [] buffer, int offset, int size, AsyncCallback callback, Object state )
public override int EndRead(IAsyncResult result)public override IAsyncResult BeginWrite (byte [] buffer, int offset, int size, AsyncCallback callback, Object state )
public override void EndWrite(IAsyncResult result)
若要创建 NetworkStream,必须提供已连接的 Socket。而在.NET中使用TCP/IP套接字不需要直接与Socket打交道,因为.NET把Socket的大部分操作都放在System.Net.TcpListener和System.Net.Sockets.TcpClient里面,这两个类大大地简化了Socket的操作。一般套接字对象Socket包含一个Accept()方法,此方法能产生阻塞来等待客户端的请求,而在TcpListener类里也包含了一个相似的方法 public TcpClient AcceptTcpClient()用于等待客户端的请求。此方法将会返回一个TcpClient 对象,通过 TcpClient 的 public NetworkStream GetStream()方法就能获取NetworkStream对象,控制套接字数据的发送与接收。
下面以一个例子说明异步调用TCP/IP套接字收发数据的过程。
首先在服务器端建立默认地址127.0.0.1用于收发信息,使用此地址与端口500新建TcpListener对象,调用TcpListener.Start 侦听传入的连接请求,再使用一个死循环来监听信息。
在ChatClient类包括有接收信息与发送信息两个功能:当接收到客户端请求时,它会利用 NetworkStream.BeginRead 读取客户端信息,并在回调函数ReceiveAsyncCallback中输出信息内容,若接收到的信息的大小小于1时,它将会抛出一个异常。当信息成功接收后,再使用 NetworkStream.BeginWrite 方法回馈信息到客户端
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 //设置CLR线程池最大线程数 6 ThreadPool.SetMaxThreads(1000, 1000); 7 8 //默认地址为127.0.0.1 9 IPAddress ipAddress = IPAddress.Parse("127.0.0.1"); 10 TcpListener tcpListener = new TcpListener(ipAddress, 500); 11 tcpListener.Start(); 12 13 //以一个死循环来实现监听 14 while (true) 15 { //调用一个ChatClient对象来实现监听 16 ChatClient chatClient = new ChatClient(tcpListener.AcceptTcpClient()); 17 } 18 } 19 } 20 21 public class ChatClient 22 { 23 static TcpClient tcpClient; 24 static byte[] byteMessage; 25 static string clientEndPoint; 26 27 public ChatClient(TcpClient tcpClient1) 28 { 29 tcpClient = tcpClient1; 30 byteMessage = new byte[tcpClient.ReceiveBufferSize]; 31 32 //显示客户端信息 33 clientEndPoint = tcpClient.Client.RemoteEndPoint.ToString(); 34 Console.WriteLine("Client's endpoint is " + clientEndPoint); 35 36 //使用NetworkStream.BeginRead异步读取信息 37 NetworkStream networkStream = tcpClient.GetStream(); 38 networkStream.BeginRead(byteMessage, 0, tcpClient.ReceiveBufferSize, 39 new AsyncCallback(ReceiveAsyncCallback), null); 40 } 41 42 public void ReceiveAsyncCallback(IAsyncResult iAsyncResult) 43 { 44 //显示CLR线程池状态 45 Thread.Sleep(100); 46 ThreadPoolMessage("\nMessage is receiving"); 47 48 //使用NetworkStream.EndRead结束异步读取 49 NetworkStream networkStreamRead = tcpClient.GetStream(); 50 int length=networkStreamRead.EndRead(iAsyncResult); 51 52 //如果接收到的数据长度少于1则抛出异常 53 if (length < 1) 54 { 55 tcpClient.GetStream().Close(); 56 throw new Exception("Disconnection!"); 57 } 58 59 //显示接收信息 60 string message = Encoding.UTF8.GetString(byteMessage, 0, length); 61 Console.WriteLine("Message:" + message); 62 63 //使用NetworkStream.BeginWrite异步发送信息 64 byte[] sendMessage = Encoding.UTF8.GetBytes("Message is received!"); 65 NetworkStream networkStreamWrite=tcpClient.GetStream(); 66 networkStreamWrite.BeginWrite(sendMessage, 0, sendMessage.Length, 67 new AsyncCallback(SendAsyncCallback), null); 68 } 69 70 //把信息转换成二进制数据,然后发送到客户端 71 public void SendAsyncCallback(IAsyncResult iAsyncResult) 72 { 73 //显示CLR线程池状态 74 Thread.Sleep(100); 75 ThreadPoolMessage("\nMessage is sending"); 76 77 //使用NetworkStream.EndWrite结束异步发送 78 tcpClient.GetStream().EndWrite(iAsyncResult); 79 80 //重新监听 81 tcpClient.GetStream().BeginRead(byteMessage, 0, tcpClient.ReceiveBufferSize, 82 new AsyncCallback(ReceiveAsyncCallback), null); 83 } 84 85 //显示线程池现状 86 static void ThreadPoolMessage(string data) 87 { 88 int a, b; 89 ThreadPool.GetAvailableThreads(out a, out b); 90 string message = string.Format("{0}\n CurrentThreadId is {1}\n " + 91 "WorkerThreads is:{2} CompletionPortThreads is :{3}\n", 92 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); 93 94 Console.WriteLine(message); 95 } 96 }
而在客户端只是使用简单的开发方式,利用TcpClient连接到服务器端,然后调用NetworkStream.Write方法发送信息,最后调用NetworkStream.Read方法读取回馈信息
1 static void Main(string[] args) 2 { 3 //连接服务端 4 TcpClient tcpClient = new TcpClient("127.0.0.1", 500); 5 6 //发送信息 7 NetworkStream networkStream = tcpClient.GetStream(); 8 byte[] sendMessage = Encoding.UTF8.GetBytes("Client request connection!"); 9 networkStream.Write(sendMessage, 0, sendMessage.Length); 10 networkStream.Flush(); 11 12 //接收信息 13 byte[] receiveMessage=new byte[1024]; 14 int count=networkStream.Read(receiveMessage, 0,1024); 15 Console.WriteLine(Encoding.UTF8.GetString(receiveMessage)); 16 Console.ReadKey(); 17 }
注意观察运行结果,服务器端的异步操作线程都是来自于CLR线程池的I/O线程
5.3 异步WebRequest
System.Net.WebRequest 是 .NET 为实现访问 Internet 的 “请求/响应模型” 而开发的一个 abstract 基类, 它主要有三个子类:FtpWebRequest、HttpWebRequest、FileWebRequest。当使用WebRequest.Create(string uri)创建对象时,应用程序就可以根据请求协议判断实现类来进行操作。FileWebRequest、FtpWebRequest、HttpWebRequest 各有其作用:FileWebRequest 使用 “file://路径” 的URI方式实现对本地资源和内部文件的请求/响应、FtpWebRequest 使用FTP文件传输协议实现文件请求/响应、HttpWebRequest 用于处理HTTP的页面请求/响应。由于使用方法相类似,下面就以常用的HttpWebRequest为例子介绍一下异步WebRequest的使用方法。
在使用ASP.NET开发网站的时候,往往会忽略了HttpWebRequest的使用,因为开发都假设客户端是使用浏览器等工具去阅读页面的。但如果你对REST开发方式有所了解,那对 HttpWebRequest 就应该非常熟悉。它可以在路径参数、头文件、页面主体、Cookie 等多处地方加入请求条件,然后对回复数据进行适当处理。HttpWebRequest 包含有以下几个常用方法用于处理请求/响应:
public override Stream GetRequestStream ()
public override WebResponse GetResponse ()public override IAsyncResult BeginGetRequestStream ( AsyncCallback callback, Object state )
public override Stream EndGetRequestStream ( IAsyncResult asyncResult )public override IAsyncResult BeginGetResponse ( AsyncCallback callback, Object state )public override WebResponse EndGetResponse ( IAsyncResult asyncResult )其中BeginGetRequestStream、EndGetRequestStream 用于异步向HttpWebRequest对象写入请求信息; BeginGetResponse、EndGetResponse 用于异步发送页面请求并获取返回信息。使用异步方式操作Internet的“请求/响应”,避免主线程长期处于等待状态,而操作期间异步线程是来自CLR线程池的I/O线程。
下面以简单的例子介绍一下异步请求的用法。
首先为Person类加上可序列化特性,在服务器端建立Hanlder.ashx,通过Request.InputStream 获取到请求数据并把数据转化为String对象,此实例中数据是以 “Id:1” 的形式实现传送的。然后根据Id查找对应的Person对象,并把Person对象写入Response.OutStream 中返还到客户端。
在客户端先把 HttpWebRequird.Method 设置为 "post",使用异步方式通过BeginGetRequireStream获取请求数据流,然后写入请求数据 “Id:1”。再使用异步方法BeginGetResponse 获取回复数据,最后把数据反序列化为Person对象显示出来。
注意:HttpWebRequire.Method默认为get,在写入请求前必须把HttpWebRequire.Method设置为post,否则在使用BeginGetRequireStream 获取请求数据流的时候,系统就会发出 “无法发送具有此谓词类型的内容正文" 的异常。
Model
1 namespace Model 2 { 3 [Serializable] 4 public class Person 5 { 6 public int ID 7 { 8 get; 9 set; 10 } 11 public string Name 12 { 13 get; 14 set; 15 } 16 public int Age 17 { 18 get; 19 set; 20 } 21 } 22 }
服务器端
1 public class Handler : IHttpHandler { 2 3 public void ProcessRequest(HttpContext context) 4 { 5 //把信息转换为String,找出输入条件Id 6 byte[] bytes=new byte[1024]; 7 int length=context.Request.InputStream.Read(bytes,0,1024); 8 string condition = Encoding.Default.GetString(bytes); 9 int id = int.Parse(condition.Split(new string[] { ":" }, 10 StringSplitOptions.RemoveEmptyEntries)[1]); 11 12 //根据Id查找对应Person对象 13 var person = GetPersonList().Where(x => x.ID == id).First(); 14 15 //所Person格式化为二进制数据写入OutputStream 16 BinaryFormatter formatter = new BinaryFormatter(); 17 formatter.Serialize(context.Response.OutputStream, person); 18 } 19 20 //模拟源数据 21 private IListGetPersonList() 22 { 23 var personList = new List (); 24 25 var person1 = new Person(); 26 person1.ID = 1; 27 person1.Name = "Leslie"; 28 person1.Age = 30; 29 personList.Add(person1); 30 ........... 31 return personList; 32 } 33 34 public bool IsReusable 35 { 36 get { return true;} 37 } 38 }
客户端
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 ThreadPool.SetMaxThreads(1000, 1000); 6 Request(); 7 Console.ReadKey(); 8 } 9 10 static void Request() 11 { 12 ThreadPoolMessage("Start"); 13 //使用WebRequest.Create方法建立HttpWebRequest对象 14 HttpWebRequest webRequest = (HttpWebRequest)WebRequest.Create( 15 "http://localhost:5700/Handler.ashx"); 16 webRequest.Method = "post"; 17 18 //对写入数据的RequestStream对象进行异步请求 19 IAsyncResult result=webRequest.BeginGetRequestStream( 20 new AsyncCallback(EndGetRequestStream),webRequest); 21 } 22 23 static void EndGetRequestStream(IAsyncResult result) 24 { 25 ThreadPoolMessage("RequestStream Complete"); 26 //获取RequestStream 27 HttpWebRequest webRequest = (HttpWebRequest)result.AsyncState; 28 Stream stream=webRequest.EndGetRequestStream(result); 29 30 //写入请求条件 31 byte[] condition = Encoding.Default.GetBytes("Id:1"); 32 stream.Write(condition, 0, condition.Length); 33 34 //异步接收回传信息 35 IAsyncResult responseResult = webRequest.BeginGetResponse( 36 new AsyncCallback(EndGetResponse), webRequest); 37 } 38 39 static void EndGetResponse(IAsyncResult result) 40 { 41 //显出线程池现状 42 ThreadPoolMessage("GetResponse Complete"); 43 44 //结束异步请求,获取结果 45 HttpWebRequest webRequest = (HttpWebRequest)result.AsyncState; 46 WebResponse webResponse = webRequest.EndGetResponse(result); 47 48 //把输出结果转化为Person对象 49 Stream stream = webResponse.GetResponseStream(); 50 BinaryFormatter formatter = new BinaryFormatter(); 51 var person=(Person)formatter.Deserialize(stream); 52 Console.WriteLine(string.Format("Person Id:{0} Name:{1} Age:{2}", 53 person.ID, person.Name, person.Age)); 54 } 55 56 //显示线程池现状 57 static void ThreadPoolMessage(string data) 58 { 59 int a, b; 60 ThreadPool.GetAvailableThreads(out a, out b); 61 string message = string.Format("{0}\n CurrentThreadId is {1}\n " + 62 "WorkerThreads is:{2} CompletionPortThreads is :{3}\n", 63 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); 64 65 Console.WriteLine(message); 66 } 67 }
从运行结果可以看到,BeginGetRequireStream、BeginGetResponse方法是使用CLR线程池的I/O线程。
5.4 异步调用WebService
相比TCP/IP套接字,在使用WebService的时候,服务器端需要更复杂的操作处理,使用时间往往会更长。为了避免客户端长期处于等待状态,在配置服务引用时选择 “生成异步操作”,系统可以自动建立异步调用的方式。
以.NET 2.0以前,系统都是使用ASMX来设计WebService,而近年来WCF可说是火热登场,下面就以WCF为例子简单介绍一下异步调用WebService的例子。
由于系统可以自动生成异步方法,使用起来非常简单,首先在服务器端建立服务ExampleService,里面包含方法Method。客户端引用此服务时,选择 “生成异步操作”。然后使用 BeginMethod 启动异步方法, 在回调函数中调用EndMethod结束异步调用。
服务端
1 [ServiceContract] 2 public interface IExampleService 3 { 4 [OperationContract] 5 string Method(string name); 6 } 7 8 public class ExampleService : IExampleService 9 { 10 public string Method(string name) 11 { 12 return "Hello " + name; 13 } 14 } 15 16 class Program 17 { 18 static void Main(string[] args) 19 { 20 ServiceHost host = new ServiceHost(typeof(ExampleService)); 21 host.Open(); 22 Console.ReadKey(); 23 host.Close(); 24 } 25 } 26 2728 29 4530 4431 4332 3633 3534 37 38 4239 4140
客户端
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 //设置最大线程数 6 ThreadPool.SetMaxThreads(1000, 1000); 7 ThreadPoolMessage("Start"); 8 9 //建立服务对象,异步调用服务方法 10 ExampleServiceReference.ExampleServiceClient exampleService = new 11 ExampleServiceReference.ExampleServiceClient(); 12 exampleService.BeginMethod("Leslie",new AsyncCallback(AsyncCallbackMethod), 13 exampleService); 14 Console.ReadKey(); 15 } 16 17 static void AsyncCallbackMethod(IAsyncResult result) 18 { 19 Thread.Sleep(1000); 20 ThreadPoolMessage("Complete"); 21 ExampleServiceReference.ExampleServiceClient example = 22 (ExampleServiceReference.ExampleServiceClient)result.AsyncState; 23 string data=example.EndMethod(result); 24 Console.WriteLine(data); 25 } 26 27 //显示线程池现状 28 static void ThreadPoolMessage(string data) 29 { 30 int a, b; 31 ThreadPool.GetAvailableThreads(out a, out b); 32 string message = string.Format("{0}\n CurrentThreadId is {1}\n " + 33 "WorkerThreads is:{2} CompletionPortThreads is :{3}\n", 34 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); 35 36 Console.WriteLine(message); 37 } 38 } 39 4041 42 7343 6244 6150 6052 53 54 5956 58 63 7267 7168 7069
注意观察运行结果,异步调用服务时,回调函数都是运行于CLR线程池的I/O线程当中。
六、异步 SqlCommand
从ADO.NET 2.0开始,SqlCommand就新增了几个异步方法执行SQL命令。相对于同步执行方式,它使主线程不需要等待数据库的返回结果,在使用复杂性查询或批量插入时将有效提高主线程的效率。使用异步SqlCommand的时候,请注意把ConnectionString 的 Asynchronous Processing 设置为 true 。
注意:SqlCommand异步操作的特别之处在于线程并不依赖于CLR线程池,而是由Windows内部提供,这比使用异步委托更有效率。但如果需要使用回调函数的时候,回调函数的线程依然是来自于CLR线程池的工作者线程。
SqlCommand有以下几个方法支持异步操作:
public IAsyncResult BeginExecuteNonQuery (......)
public int EndExecuteNonQuery(IAsyncResult)public IAsyncResult BeginExecuteReader(......)
public SqlDataReader EndExecuteReader(IAsyncResult)public IAsyncResult BeginExecuteXmlReader (......)
public XmlReader EndExecuteXmlReader(IAsyncResult)
由于使用方式相似,此处就以 BeginExecuteNonQuery 为例子,介绍一下异步SqlCommand的使用。首先建立connectionString,注意把Asynchronous Processing设置为true来启动异步命令,然后把SqlCommand.CommandText设置为 WAITFOR DELAY "0:0:3" 来虚拟数据库操作。再通过BeginExecuteNonQuery启动异步操作,利用轮询方式监测操作情况。最后在操作完成后使用EndExecuteNonQuery完成异步操作。
1 class Program 2 { 3 //把Asynchronous Processing设置为true 4 static string connectionString = "Data Source=LESLIE-PC;Initial Catalog=Business;“+ 5 "Integrated Security=True;Asynchronous Processing=true"; 6 7 static void Main(string[] args) 8 { 9 //把CLR线程池最大线程数设置为1000 10 ThreadPool.SetMaxThreads(1000, 1000); 11 ThreadPoolMessage("Start"); 12 13 //使用WAITFOR DELAY命令来虚拟操作 14 SqlConnection connection = new SqlConnection(connectionString); 15 SqlCommand command = new SqlCommand("WAITFOR DELAY '0:0:3';", connection); 16 connection.Open(); 17 18 //启动异步SqlCommand操作,利用轮询方式监测操作 19 IAsyncResult result = command.BeginExecuteNonQuery(); 20 ThreadPoolMessage("BeginRead"); 21 while (!result.AsyncWaitHandle.WaitOne(500)) 22 Console.WriteLine("Main thread do work........"); 23 24 //结束异步SqlCommand 25 int count= command.EndExecuteNonQuery(result); 26 ThreadPoolMessage("\nCompleted"); 27 Console.ReadKey(); 28 } 29 30 //显示线程池现状 31 static void ThreadPoolMessage(string data) 32 { 33 int a, b; 34 ThreadPool.GetAvailableThreads(out a, out b); 35 string message = string.Format("{0}\n CurrentThreadId is {1}\n "+ 36 "WorkerThreads is:{2} CompletionPortThreads is :{3}\n", 37 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); 38 Console.WriteLine(message); 39 } 40 }
注意运行结果,SqlCommand的异步执行线程并不属于CLR线程池。
如果觉得使用轮询方式过于麻烦,可以使用回调函数,但要注意当调用回调函数时,线程是来自于CLR线程池的工作者线程。
1 class Program 2 { 3 //把Asynchronous Processing设置为true 4 static string connectionString = "Data Source=LESLIE-PC;Initial Catalog=Business;”+ 5 “Integrated Security=True;Asynchronous Processing=true"; 6 static void Main(string[] args) 7 { 8 //把CLR线程池最大线程数设置为1000 9 ThreadPool.SetMaxThreads(1000, 1000); 10 ThreadPoolMessage("Start"); 11 12 //使用WAITFOR DELAY命令来虚拟操作 13 SqlConnection connection = new SqlConnection(connectionString); 14 SqlCommand command = new SqlCommand("WAITFOR DELAY '0:0:3';", connection); 15 connection.Open(); 16 17 //启动异步SqlCommand操作,并把SqlCommand对象传递到回调函数 18 IAsyncResult result = command.BeginExecuteNonQuery( 19 new AsyncCallback(AsyncCallbackMethod),command); 20 Console.ReadKey(); 21 } 22 23 static void AsyncCallbackMethod(IAsyncResult result) 24 { 25 Thread.Sleep(200); 26 ThreadPoolMessage("AsyncCallback"); 27 SqlCommand command = (SqlCommand)result.AsyncState; 28 int count=command.EndExecuteNonQuery(result); 29 command.Connection.Close(); 30 } 31 32 //显示线程池现状 33 static void ThreadPoolMessage(string data) 34 { 35 int a, b; 36 ThreadPool.GetAvailableThreads(out a, out b); 37 string message = string.Format("{0}\n CurrentThreadId is {1}\n "+ 38 "WorkerThreads is:{2} CompletionPortThreads is :{3}\n", 39 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); 40 41 Console.WriteLine(message); 42 } 43 }
运行结果:
七、并行编程与PLINQ
要使用多线程开发,必须非常熟悉Thread的使用,而且在开发过程中可能会面对很多未知的问题。为了简化开发,.NET 4.0 特别提供一个并行编程库System.Threading.Tasks,它可以简化并行开发,你无需直接跟线程或线程池打交道,就可以简单建立多线程应用程序。此外,.NET还提供了新的一组扩展方法PLINQ,它具有自动分析查询功能,如果并行查询能提高系统效率,则同时运行,如果查询未能从并行查询中受益,则按原顺序查询。下面将详细介绍并行操作的方式。
7.1 泛型委托
使用并行编程可以同时操作多个委托,在介绍并行编程前先简单介绍一下两个泛型委托System.Func<>与System.Action<>。
Func<>是一个能接受多个参数和一个返回值的泛型委托,它能接受0个到16个输入参数, 其中 T1,T2,T3,T4......T16 代表自定的输入类型,TResult为自定义的返回值。
public delegate TResult Func<TResult>()public delegate TResult Func<T1,TResult>(T1 arg1)public delegate TResult Func<T1,T2, TResult>(T1 arg1,T2 arg2)public delegate TResult Func<T1,T2, T3, TResult>(T1 arg1,T2 arg2,T3 arg3)public delegate TResult Func<T1,T2, T3, ,T4, TResult>(T1 arg1,T2 arg2,T3 arg3,T4 arg4)..............public delegate TResult Func<T1,T2, T3, ,T4, ...... ,T16,TResult>(T1 arg1,T2 arg2,T3 arg3,T4 arg4,...... ,T16 arg16)Action<>与Func<>十分相似,不同在于Action<>的返回值为void,Action能接受0~16个参数
public delegate void Action<T1>() public delegate void Action<T1,T2>(T1 arg1,T2 arg2) public delegate void Action<T1,T2, T3>(T1 arg1,T2 arg2, T3 arg3)............. public delegate void Action<T1,T2, T3, ,T4, ...... ,T16>(T1 arg1,T2 arg2,T3 arg3,T4 arg4,...... ,T16 arg16)
7.2 任务并行库(TPL)
System.Threading.Tasks中的类被统称为任务并行库(Task Parallel Library,TPL),TPL使用CLR线程池把工作分配到CPU,并能自动处理工作分区、线程调度、取消支持、状态管理以及其他低级别的细节操作,极大地简化了多线程的开发。
TPL包括常用的数据并行与任务并行两种执行方式:
7.2.1 数据并行
数据并行的核心类就是System.Threading.Tasks.Parallel,它包含两个静态方法 Parallel.For 与 Parallel.ForEach, 使用方式与for、foreach相仿。通过这两个方法可以并行处理System.Func<>、System.Action<>委托。
以下一个例子就是利用 public static ParallelLoopResult For( int from, int max, Action<int>) 方法对List<Person>进行并行查询。
假设使用单线程方式查询3个Person对象,需要用时大约6秒,在使用并行方式,只需使用2秒就能完成查询,而且能够避开Thread的繁琐处理。1 class Program 2 { 3 static void Main(string[] args) 4 { 5 //设置最大线程数 6 ThreadPool.SetMaxThreads(1000, 1000); 7 //并行查询 8 Parallel.For(0, 3,n => 9 { 10 Thread.Sleep(2000); //模拟查询 11 ThreadPoolMessage(GetPersonList()[n]); 12 }); 13 Console.ReadKey(); 14 } 15 16 //模拟源数据 17 static IListGetPersonList() 18 { 19 var personList = new List (); 20 21 var person1 = new Person(); 22 person1.ID = 1; 23 person1.Name = "Leslie"; 24 person1.Age = 30; 25 personList.Add(person1); 26 ........... 27 return personList; 28 } 29 30 //显示线程池现状 31 static void ThreadPoolMessage(Person person) 32 { 33 int a, b; 34 ThreadPool.GetAvailableThreads(out a, out b); 35 string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" + 36 " CurrentThreadId is {3}\n WorkerThreads is:{4}" + 37 " CompletionPortThreads is :{5}\n", 38 person.ID, person.Name, person.Age, 39 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); 40 41 Console.WriteLine(message); 42 } 43 }
观察运行结果,对象并非按照原排列顺序进行查询,而是使用并行方式查询。
若想停止操作,可以利用ParallelLoopState参数,下面以ForEach作为例子。
public static ParallelLoopResult ForEach<TSource>( IEnumerable<TSource> source, Action<TSource, ParallelLoopState> action)其中source为数据集,在Action<TSource,ParallelLoopState>委托的ParallelLoopState参数当中包含有Break()和 Stop()两个方法都可以使迭代停止。Break的使用跟传统for里面的使用方式相似,但因为处于并行处理当中,使用Break并不能保证所有运行能立即停止,在当前迭代之前的迭代会继续执行。若想立即停止操作,可以使用Stop方法,它能保证立即终止所有的操作,无论它们是处于当前迭代的之前还是之后。1 class Program 2 { 3 static void Main(string[] args) 4 { 5 //设置最大线程数 6 ThreadPool.SetMaxThreads(1000, 1000); 7 8 //并行查询 9 Parallel.ForEach(GetPersonList(), (person, state) => 10 { 11 if (person.ID == 2) 12 state.Stop(); 13 ThreadPoolMessage(person); 14 }); 15 Console.ReadKey(); 16 } 17 18 //模拟源数据 19 static IListGetPersonList() 20 { 21 var personList = new List (); 22 23 var person1 = new Person(); 24 person1.ID = 1; 25 person1.Name = "Leslie"; 26 person1.Age = 30; 27 personList.Add(person1); 28 .......... 29 return personList; 30 } 31 32 //显示线程池现状 33 static void ThreadPoolMessage(Person person) 34 { 35 int a, b; 36 ThreadPool.GetAvailableThreads(out a, out b); 37 string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" + 38 " CurrentThreadId is {3}\n WorkerThreads is:{4}" + 39 " CompletionPortThreads is :{5}\n", 40 person.ID, person.Name, person.Age, 41 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); 42 43 Console.WriteLine(message); 44 } 45 }
观察运行结果,当Person的ID等于2时,运行将会停止。
当要在多个线程中调用本地变量,可以使用以下方法:
public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<Of TSource>, Func<Of TLocal>, Func<Of TSource,ParallelLoopState,TLocal,TLocal>, Action<Of TLocal>)其中第一个参数为数据集;第二个参数是一个Func委托,用于在每个线程执行前进行初始化;第 三个参数是委托Func<Of T1,T2,T3,TResult>,它能对数据集的每个成员进行迭代,当中T1是数据集的成员,T2是一个ParallelLoopState对 象,它可以控制迭代的状态,T3是线程中的本地变量;第四个参数是一个Action委托,用于对每个线程的最终状态进行最终操作。在以下例子中,使用ForEach计算多个Order的总体价格。在ForEach方法中,首先把参数初始化为0f,然后用把同一个Order的多个OrderItem价格进行累加,计算出Order的价格,最后把多个Order的价格进行累加,计算出多个Order的总体价格。
1 public class Order 2 { 3 public int ID; 4 public float Price; 5 } 6 7 public class OrderItem 8 { 9 public int ID; 10 public string Goods; 11 public int OrderID; 12 public float Price; 13 public int Count; 14 } 15 16 class Program 17 { 18 static void Main(string[] args) 19 { 20 //设置最大线程数 21 ThreadPool.SetMaxThreads(1000, 1000); 22 float totalPrice = 0f; 23 //并行查询 24 var parallelResult = Parallel.ForEach(GetOrderList(), 25 () => 0f, //把参数初始值设为0 26 (order, state, orderPrice) => 27 { 28 //计算单个Order的价格 29 orderPrice = GetOrderItem().Where(item => item.OrderID == order.ID) 30 .Sum(item => item.Price * item.Count); 31 order.Price = orderPrice; 32 ThreadPoolMessage(order); 33 34 return orderPrice; 35 }, 36 (finallyPrice) => 37 { 38 totalPrice += finallyPrice;//计算多个Order的总体价格 39 } 40 ); 41 42 while (!parallelResult.IsCompleted) 43 Console.WriteLine("Doing Work!"); 44 45 Console.WriteLine("Total Price is:" + totalPrice); 46 Console.ReadKey(); 47 } 48 //虚拟数据 49 static IListGetOrderList() 50 { 51 IList orderList = new List (); 52 Order order1 = new Order(); 53 order1.ID = 1; 54 orderList.Add(order1); 55 ............ 56 return orderList; 57 } 58 //虚拟数据 59 static IList GetOrderItem() 60 { 61 IList itemList = new List (); 62 63 OrderItem orderItem1 = new OrderItem(); 64 orderItem1.ID = 1; 65 orderItem1.Goods = "iPhone 4S"; 66 orderItem1.Price = 6700; 67 orderItem1.Count = 2; 68 orderItem1.OrderID = 1; 69 itemList.Add(orderItem1); 70 ........... 71 return itemList; 72 } 73 74 //显示线程池现状 75 static void ThreadPoolMessage(Order order) 76 { 77 int a, b; 78 ThreadPool.GetAvailableThreads(out a, out b); 79 string message = string.Format("OrderID:{0} OrderPrice:{1}\n" + 80 " CurrentThreadId is {2}\n WorkerThreads is:{3}" + 81 " CompletionPortThreads is:{4}\n", 82 order.ID, order.Price, 83 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); 84 85 Console.WriteLine(message); 86 } 87 }
运行结果
7.2.2 任务并行
在TPL当中还可以使用Parallel.Invoke方法触发多个异步任务,其中 actions 中可以包含多个方法或者委托,parallelOptions用于配置Parallel类的操作。
public static void Invoke(Action[] actions )public static void Invoke(ParallelOptions parallelOptions, Action[] actions )下面例子中利用了Parallet.Invoke并行查询多个Person,actions当中可以绑定方法、lambda表达式或者委托,注意绑定方法时必须是返回值为void的无参数方法。1 class Program 2 { 3 static void Main(string[] args) 4 { 5 //设置最大线程数 6 ThreadPool.SetMaxThreads(1000, 1000); 7 8 //任务并行 9 Parallel.Invoke(option, 10 PersonMessage, 11 ()=>ThreadPoolMessage(GetPersonList()[1]), 12 delegate(){ 13 ThreadPoolMessage(GetPersonList()[2]); 14 }); 15 Console.ReadKey(); 16 } 17 18 static void PersonMessage() 19 { 20 ThreadPoolMessage(GetPersonList()[0]); 21 } 22 23 //显示线程池现状 24 static void ThreadPoolMessage(Person person) 25 { 26 int a, b; 27 ThreadPool.GetAvailableThreads(out a, out b); 28 string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" + 29 " CurrentThreadId is {3}\n WorkerThreads is:{4}" + 30 " CompletionPortThreads is :{5}\n", 31 person.ID, person.Name, person.Age, 32 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); 33 34 Console.WriteLine(message); 35 } 36 37 //模拟源数据 38 static IListGetPersonList() 39 { 40 var personList = new List (); 41 42 var person1 = new Person(); 43 person1.ID = 1; 44 person1.Name = "Leslie"; 45 person1.Age = 30; 46 personList.Add(person1); 47 .......... 48 return personList; 49 } 50 }
运行结果
7.3 Task简介
以Thread创建的线程被默认为前台线程,当然你可以把线程IsBackground属性设置为true,但TPL为此提供了一个更简单的类Task。
Task存在于System.Threading.Tasks命名空间当中,它可以作为异步委托的简单替代品。通过Task的Factory属性将返回TaskFactory类,以TaskFactory.StartNew(Action)方法可以创建一个新线程,所创建的线程默认为后台线程。1 class Program 2 { 3 static void Main(string[] args) 4 { 5 ThreadPool.SetMaxThreads(1000, 1000); 6 Task.Factory.StartNew(() => ThreadPoolMessage()); 7 Console.ReadKey(); 8 } 9 10 //显示线程池现状 11 static void ThreadPoolMessage() 12 { 13 int a, b; 14 ThreadPool.GetAvailableThreads(out a, out b); 15 string message = string.Format("CurrentThreadId is:{0}\n" + 16 "CurrentThread IsBackground:{1}\n" + 17 "WorkerThreads is:{2}\nCompletionPortThreads is:{3}\n", 18 Thread.CurrentThread.ManagedThreadId, 19 Thread.CurrentThread.IsBackground.ToString(), 20 a.ToString(), b.ToString()); 21 Console.WriteLine(message); 22 } 23 }
运行结果
若要取消处理,可以利用CancellationTakenSource对象,在TaskFactory中包含有方法
public Task StartNew( Action action, CancellationToken cancellationToken )在方法中加入CancellationTakenSource对象的CancellationToken属性,可以控制任务的运行,调用CancellationTakenSource.Cancel时任务就会自动停止。下面以图片下载为例子介绍一下TaskFactory的使用。服务器端页面
1 2 357 584 26 27 28
首先在服务器页面中显示多个*.jpg图片,每个图片都有对应的CheckBox检测其选择情况。
所选择图片的路径会记录在Application["Url"]当中传递到Handler.ashx当中。Handler.ashx 处理图片的下载,它从 Application["Url"] 当中获取所选择图片的路径,并把图片转化成byte[]二进制数据。
再把图片的数量,每副图片的二进制数据的长度记录在OutputStream的头部。最后把图片的二进制数据记入 OutputStream 一并输出。1 public class Handler : IHttpHandler 2 { 3 public void ProcessRequest(HttpContext context) 4 { 5 //获取图片名,把图片数量写OutputStream 6 ListurlList = (List )context.Application["Url"]; 7 context.Response.OutputStream.Write(BitConverter.GetBytes(urlList.Count), 0, 4); 8 9 //把图片转换成二进制数据 10 List imageList = GetImages(urlList); 11 12 //把每副图片长度写入OutputStream 13 foreach (string image in imageList) 14 { 15 byte[] imageByte=Convert.FromBase64String(image); 16 context.Response.OutputStream.Write(BitConverter.GetBytes(imageByte.Length),0,4); 17 } 18 19 //把图片写入OutputStream 20 foreach (string image in imageList) 21 { 22 byte[] imageByte = Convert.FromBase64String(image); 23 context.Response.OutputStream.Write(imageByte,0,imageByte.Length); 24 } 25 } 26 27 //获取多个图片的二进制数据 28 private List GetImages(List urlList) 29 { 30 List imageList = new List (); 31 foreach (string url in urlList) 32 imageList.Add(GetImage(url)); 33 return imageList; 34 } 35 36 //获取单副图片的二进制数据 37 private string GetImage(string url) 38 { 39 string path = "E:/My Projects/Example/WebSite/Images/"+url; 40 FileStream stream = new FileStream(path, FileMode.Open, FileAccess.Read); 41 byte[] imgBytes = new byte[10240]; 42 int imgLength = stream.Read(imgBytes, 0, 10240); 43 return Convert.ToBase64String(imgBytes,0,imgLength); 44 } 45 46 public bool IsReusable 47 { 48 get{ return false;} 49 } 50 }
客户端
建立一个WinForm窗口,里面加入一个WebBrowser连接到服务器端的Default.aspx页面。
当按下Download按键时,系统就会利用TaskFactory.StartNew的方法建立异步线程,使用WebRequest方法向Handler.ashx发送请求。接收到回传流时,就会根据头文件的内容判断图片的数量与每副图片的长度,把二进制数据转化为*.jpg文件保存。系统利用TaskFactory.StartNew(action,cancellationToken) 方式异步调用GetImages方法进行图片下载。
当用户按下Cancel按钮时,异步任务就会停止。值得注意的是,在图片下载时调用了CancellationToken.ThrowIfCancellationRequested方法,目的在检查并行任务的运行情况,在并行任务被停止时释放出OperationCanceledException异常,确保用户按下Cancel按钮时,停止所有并行任务。1 public partial class Form1 : Form 2 { 3 private CancellationTokenSource tokenSource = new CancellationTokenSource(); 4 5 public Form1() 6 { 7 InitializeComponent(); 8 ThreadPool.SetMaxThreads(1000, 1000); 9 } 10 11 private void downloadToolStripMenuItem_Click(object sender, EventArgs e) 12 { 13 Task.Factory.StartNew(GetImages,tokenSource.Token); 14 } 15 16 private void cancelToolStripMenuItem_Click(object sender, EventArgs e) 17 { 18 tokenSource.Cancel(); 19 } 20 21 private void GetImages() 22 { 23 //发送请求,获取输出流 24 WebRequest webRequest = HttpWebRequest.Create("Http://localhost:5800/Handler.ashx"); 25 Stream responseStream=webRequest.GetResponse().GetResponseStream(); 26 27 byte[] responseByte = new byte[81960]; 28 IAsyncResult result=responseStream.BeginRead(responseByte,0,81960,null,null); 29 int responseLength = responseStream.EndRead(result); 30 31 //获取图片数量 32 int imageCount = BitConverter.ToInt32(responseByte, 0); 33 34 //获取每副图片的长度 35 int[] lengths = new int[imageCount]; 36 for (int n = 0; n < imageCount; n++) 37 { 38 int length = BitConverter.ToInt32(responseByte, (n + 1) * 4); 39 lengths[n] = length; 40 } 41 try 42 { 43 //保存图片 44 for (int n = 0; n < imageCount; n++) 45 { 46 string path = string.Format("E:/My Projects/Example/Test/Images/pic{0}.jpg", n); 47 FileStream file = new FileStream(path, FileMode.Create, FileAccess.ReadWrite); 48 49 //计算字节偏移量 50 int offset = (imageCount + 1) * 4; 51 for (int a = 0; a < n; a++) 52 offset += lengths[a]; 53 54 file.Write(responseByte, offset, lengths[n]); 55 file.Flush(); 56 57 //模拟操作 58 Thread.Sleep(1000); 59 60 //检测CancellationToken变化 61 tokenSource.Token.ThrowIfCancellationRequested(); 62 } 63 } 64 catch (OperationCanceledException ex) 65 { 66 MessageBox.Show("Download cancel!"); 67 } 68 } 69 }
7.4 并行查询(PLINQ)
并行 LINQ (PLINQ) 是 LINQ 模式的并行实现,主要区别在于 PLINQ 尝试充分利用系统中的所有处理器。 它利用所有处理器的方法,把数据源分成片段,然后在多个处理器上对单独工作线程上的每个片段并行执行查询, 在许多情况下,并行执行意味着查询运行速度显著提高。但这并不说明所有PLINQ都会使用并行方式,当系统测试要并行查询会对系统性能造成损害时,那将自动化地使用同步执行。在System.Linq.ParallelEnumerable类中,包含了并行查询的大部分方法。
方法成员 | 说明 |
AsParallel | PLINQ 的入口点。 指定如果可能,应并行化查询的其余部分。 |
AsSequential(Of TSource) | 指定查询的其余部分应像非并行 LINQ 查询一样按顺序运行。 |
AsOrdered | 指定 PLINQ 应保留查询的其余部分的源序列排序,直到例如通过使用 orderby(在 Visual Basic 中为 Order By)子句更改排序为止。 |
AsUnordered(Of TSource) | 指定查询的其余部分的 PLINQ 不需要保留源序列的排序。 |
WithCancellation(Of TSource) | 指定 PLINQ 应定期监视请求取消时提供的取消标记和取消执行的状态。 |
WithDegreeOfParallelism(Of TSource) | 指定 PLINQ 应当用来并行化查询的处理器的最大数目。 |
WithMergeOptions(Of TSource) | 提供有关 PLINQ 应当如何(如果可能)将并行结果合并回到使用线程上的一个序列的提示。 |
WithExecutionMode(Of TSource) | 指定 PLINQ 应当如何并行化查询(即使默认行为是按顺序运行查询)。 |
ForAll(Of TSource) | 多线程枚举方法,与循环访问查询结果不同,它允许在不首先合并回到使用者线程的情况下并行处理结果。 |
Aggregate 重载 | 对于 PLINQ 唯一的重载,它启用对线程本地分区的中间聚合以及一个用于合并所有分区结果的最终聚合函数。 |
7.4.1 AsParallel
通常想要实现并行查询,只需向数据源添加 AsParallel 查询操作即可。
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 var personList=GetPersonList().AsParallel() 6 .Where(x=>x.Age>30); 7 Console.ReadKey(); 8 } 9 10 //模拟源数据 11 static IListGetPersonList() 12 { 13 var personList = new List (); 14 15 var person1 = new Person(); 16 person1.ID = 1; 17 person1.Name = "Leslie"; 18 person1.Age = 30; 19 personList.Add(person1); 20 ........... 21 return personList; 22 } 23 }
7.4.2 AsOrdered
若要使查询结果必须保留源序列排序方式,可以使用AsOrdered方法。
AsOrdered依然使用并行方式,只是在查询过程加入额外信息,在并行结束后把查询结果再次进行排列。1 class Program 2 { 3 static void Main(string[] args) 4 { 5 var personList=GetPersonList().AsParallel().AsOrdered() 6 .Where(x=>x.Age<30); 7 Console.ReadKey(); 8 } 9 10 static IListGetPersonList() 11 {......} 12 }
默认情况下,PLINQ 使用主机上的所有处理器,这些处理器的数量最多可达 64 个。
通过使用 WithDegreeOfParallelism(Of TSource) 方法,可以指示 PLINQ 使用不多于指定数量的处理器。1 class Program 2 { 3 static void Main(string[] args) 4 { 5 var personList=GetPersonList().AsParallel().WithDegreeOfParallelism(2) 6 .Where(x=>x.Age<30); 7 Console.ReadKey(); 8 } 9 10 static IListGetPersonList() 11 {.........} 12 }
7.4.4 ForAll
如果要对并行查询结果进行操作,一般会在for或foreach中执行,执行枚举操作时会使用同步方式。
有见及此,PLINQ中包含了ForAll方法,它可以使用并行方式对数据集进行操作。1 class Program 2 { 3 static void Main(string[] args) 4 { 5 ThreadPool.SetMaxThreads(1000, 1000); 6 GetPersonList().AsParallel().ForAll(person =>{ 7 ThreadPoolMessage(person); 8 }); 9 Console.ReadKey(); 10 } 11 12 static IListGetPersonList() 13 {.......} 14 15 //显示线程池现状 16 static void ThreadPoolMessage(Person person) 17 { 18 int a, b; 19 ThreadPool.GetAvailableThreads(out a, out b); 20 string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" + 21 " CurrentThreadId is {3}\n WorkerThreads is:{4}" + 22 " CompletionPortThreads is :{5}\n", 23 person.ID, person.Name, person.Age, 24 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); 25 Console.WriteLine(message); 26 } 27 }
运行结果
7.4.5 WithCancellation
如果需要停止查询,可以使用 WithCancellation(Of TSource) 运算符并提供 CancellationToken 实例作为参数。
与第三节Task的例子相似,如果标记上的 IsCancellationRequested 属性设置为 true,则 PLINQ 将会注意到它,并停止所有线程上的处理,然后引发 OperationCanceledException。这可以保证并行查询能够立即停止。1 class Program 2 { 3 static CancellationTokenSource tokenSource = new CancellationTokenSource(); 4 5 static void Main(string[] args) 6 { 7 Task.Factory.StartNew(Cancel); 8 try 9 { 10 GetPersonList().AsParallel().WithCancellation(tokenSource.Token) 11 .ForAll(person => 12 { 13 ThreadPoolMessage(person); 14 }); 15 } 16 catch (OperationCanceledException ex) 17 { } 18 Console.ReadKey(); 19 } 20 21 //在10~50毫秒内发出停止信号 22 static void Cancel() 23 { 24 Random random = new Random(); 25 Thread.Sleep(random.Next(10,50)); 26 tokenSource.Cancel(); 27 } 28 29 static IListGetPersonList() 30 {......} 31 32 //显示线程池现状 33 static void ThreadPoolMessage(Person person) 34 { 35 int a, b; 36 ThreadPool.GetAvailableThreads(out a, out b); 37 string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" + 38 " CurrentThreadId is {3}\n WorkerThreads is:{4}" + 39 " CompletionPortThreads is :{5}\n", 40 person.ID, person.Name, person.Age, 41 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); 42 Console.WriteLine(message); 43 } 44 } 45
八、定时器与锁
8.1定时器
若要长期定时进行一些工作,比如像邮箱更新,实时收听信息等等,可以利用定时器Timer进行操作。
在System.Threading命名空间中存在Timer类与对应的TimerCallback委托,它可以在后台线程中执行一些长期的定时操作,使主线程不受干扰。Timer类中最常用的构造函数为 public Timer( timerCallback , object , int , int )timerCallback委托可以绑定执行方法,执行方法必须返回void,它可以是无参数方法,也可以带一个object参数的方法。第二个参数是为 timerCallback 委托输入的参数对象。第三个参数是开始执行前等待的时间。第四个参数是每次执行之间的等待时间。开发实例
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 ThreadPool.SetMaxThreads(1000, 1000); 6 7 TimerCallback callback = new TimerCallback(ThreadPoolMessage); 8 Timer t = new Timer(callback,"Hello Jack! ", 0, 1000); 9 Console.ReadKey(); 10 } 11 12 //显示线程池现状 13 static void ThreadPoolMessage(object data) 14 { 15 int a, b; 16 ThreadPool.GetAvailableThreads(out a, out b); 17 string message = string.Format("{0}\n CurrentThreadId is:{1}\n" + 18 " CurrentThread IsBackground:{2}\n" + 19 " WorkerThreads is:{3}\n CompletionPortThreads is:{4}\n", 20 data + "Time now is " + DateTime.Now.ToLongTimeString(), 21 Thread.CurrentThread.ManagedThreadId, 22 Thread.CurrentThread.IsBackground.ToString(), 23 a.ToString(), b.ToString()); 24 Console.WriteLine(message); 25 } 26 }
注意观察运行结果,每次调用Timer绑定的方法时不一定是使用同一线程,但线程都会是来自工作者线程的后台线程。
8.2 锁在使用多线程开发时,存在一定的共用数据,为了避免多线程同时操作同一数据,.NET提供了lock、Monitor、Interlocked等多个锁定数据的方式。
8.2.1 lock
lock的使用比较简单,如果需要锁定某个对象时,可以直接使用lock(this)的方式。
1 private void Method() 2 { 3 lock(this) 4 { 5 //在此进行的操作能保证在同一时间内只有一个线程对此对象操作 6 } 7 }
如果操作只锁定某段代码,可以事先建立一个object对象,并对此对象进行操作锁定,这也是.net提倡的锁定用法。
1 class Control 2 { 3 private object obj=new object(); 4 5 public void Method() 6 { 7 lock(obj) 8 {.......} 9 } 10 }
8.2.2 Montior
Montior存在于System.Thread命名空间内,相比lock,Montior使用更灵活。
它存在 Enter, Exit 两个方法,它可以对对象进行锁定与解锁,比lock使用更灵活。1 class Control 2 { 3 private object obj=new object(); 4 5 public void Method() 6 { 7 Monitor.Enter(obj); 8 try 9 {......} 10 catch(Excetion ex) 11 {......} 12 finally 13 { 14 Monitor.Exit(obj); 15 } 16 } 17 } 18
使用try的方式,能确保程序不会因死锁而释放出异常!
而且在finally中释放obj对象能够确保无论是否出现死锁状态,系统都会释放obj对象。而且Monitor中还存在Wait方法可以让线程等待一段时间,然后在完成时使用Pulse、PulseAll等方法通知等待线程。
8.2.3 Interlocked
Interlocked存在于System.Thread命名空间内,它的操作比Monitor使用更简单。
它存在CompareExchange、Decrement、Exchange、Increment等常用方法让参数在安全的情况进行数据交换。Increment、Decrement 可以使参数安全地加1或减1并返回递增后的新值。
1 class Example 2 { 3 private int a=1; 4 5 public void AddOne() 6 { 7 int newA=Interlocked.Increment(ref a); 8 } 9 }
Exchange可以安全地变量赋值。
1 public void SetData() 2 { 3 Interlocked.Exchange(ref a,100); 4 }
CompareExchange使用特别方便,它相当于if的用法,当a等于1时,则把100赋值给a。
1 public void CompareAndExchange() 2 { 3 Interlocked.CompareExchange(ref a,100,1); 4 }
熟悉掌握多线程开发,对提高系统工作效率非常有帮助,尤其是回调方法与最近火热的并行编程更应该引起各位的重视。
在下用了较长的时间总结所用过的多线程开发方式,希望本篇文章能对各位的学习研究有所帮助,当中有所错漏的地方敬请点评。对 .NET 开发有兴趣的朋友欢迎加入QQ群: 共同探讨 !
C#综合揭秘
作者:风尘浪子