`
feipigwang
  • 浏览: 738985 次
  • 性别: Icon_minigender_2
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

.NET平台下可复用的Tcp通信层实现

 
阅读更多
2006年已经来临,回首刚走过的2005,心中感慨万千。在人生和生活的目标上,有了清晰明确的定位,终于知道了自己喜欢什么样的生活,喜欢什么样的生活方式;在技术上,成熟了不少,眼界也开阔的不少,从面向对象到组件、从.NetJ2EE、从微软到开源,颇有收获。特别值得一提的是,认识了Rod Johnson这个大牛人,也终于在自己的项目中正式使用Spring.net框架来开发了,这确实是一个优秀的框架。而在已经到来的2006年,我有一个主要目标就是B/S应用开发,来填补自己在企业级开发上的另一半空白。
以前就很想将自己在
Tcp通信层的开发心得、经验共享出来,但一直没有实现,究其原因,还是自己太懒了。今天终于找到一个时机,写下这篇文章,也算是对2005年的另一种形式的回忆吧。

绝大多数C/S(包括多层)结构的系统中,终端与服务器的通信都是通过Tcp进行的(使用Udp的也有一些,但是其相对于Tcp简单许多,所以不在这里的讨论之列)。通常,这样的C/S系统都需要处理极大的并发,也就是说随时都可能有成千上万个用户在线,并且每分钟都可能有数以百计的用户上线/下线。由于每个用户都与服务器存在着一个Tcp连接,如何管理所有这些连接,并使我们的Tcp通信层稳定高效地工作,是我开发的这个“TcpTcp通信层”设计实现的主要目标。
自从20049月开始至今,我就一直负责某C/S系统的服务器端的架构设计,并负责整个通信层的实现,在探索的过程中,逐渐形成了一套可复用的“Tcp通信层框架”(“框架”这个词真的蛮吓人,呵呵),其位于EnterpriseServerBase类库的EnterpriseServerBase.Network命名空间中。现将我在通信层这一块的设计/开发经验记录于此,以便日后回顾。也期大家多多赐教。
我期望的“Tcp通信层”并不只是能接受连接、管理连接、转发用户请求这么简单,为了构建一个高度可复用的、灵活的、可接插的Tcp通信层,需要定义很多的规则、接口、契约,这需要做很多的工作。“Tcp通信层”决不仅仅只是Tcp协议通信,由于通信与消息联系紧密,不可避免的需要将“通信的消息”纳入到我们的分析中来,比如,基于Tcp传输的特性,我们可能需要对接收到的消息进行分裂、重组等(后文中会解释为什么、以及如何做)。请允许我在这里澄清一下,如果只是解决“仅仅”的Tcp通信问题,我只需要介绍Tcp组件就可以了,但是如果要解决“整个Tcp通信层”的问题,并使之可高度复用,那就需要介绍很多额外的东西,比如,上面提到的“消息”,以及“消息”所涉及的通信协议。
在我们应用的通信层中,存在以Tcp组件为核心的多个组件,这些组件相互协作,以构建/实现高度可复用的Tcp通信层。这些组件之间的关系简单图示如下:

我先解释一下上图。当网络(Tcp)组件从某个Tcp连接上接收到一个请求时,会将请求转发给消息分派器,消息分派器通过IDataStreamHelper组件获取请求消息的类型,然后根据此类型要求处理器工厂创建对应类型的请求处理器,请求处理器处理请求并返回结果。接下来再由网络组件把结果返回给终端用户。在消息分派器进行请求消息分派之前,可能涉及一系列的操作,像消息加密/解密、消息分裂/重组、消息验证等。而且,根据不同的应用,可能有其它的消息转换要求,而且这些操作可能是多样化的,为了满足这种多样性和可接插性,这就需要消息分派器提供一个插入点,让我们可以随心所欲地插入自定义的对请求/回复消息的预处理和后处理。
上图中消息分派器中可接插的操作除了消息分裂器(使用实线框)是必须的,消息加密器和消息验证器(使用虚线框)是可选的,应根据你应用的实际情况加以决定是否使用。关于这几个典型的可接插的组件的功能作用会在后文中介绍。在继续介绍Tcp组件的实现之前,有必要先提一下IDataStreamHelper接口的作用,IDataStreamHelper接口用于抽象我们实际的通信协议,并能从任何一请求/回复消息中提取关于本条消息的元数据,比如,消息的长度、类型等信息。具体的应用必须根据自己的消息协议来实现IDataStreamHelper接口。关于该接口的定义也在后文中给出。
关于上图,需要提醒的是,整个消息的流动是由Tcp组件驱动的!这篇文章以Tcp组件和消息分派器组件为索引来组织整个可复用的Tcp通信层的实现。首先,我们来深入到Tcp组件的具体实现中去。

一.Tcp组件

1Tcp组件的主要职责
Tcp组件的主要职责并不是在一个很短的时间内总结出来的,它是逐步完善的(至今可能还不够全面)。为了使Tcp组件具有高度的可复用性,需要考虑很多的需求,而所有这些需求中具有共性的、占主导位置的需求就被纳入到Tcp组件的职责中来了。这个职责的集合如下:
(1) 管理所有的Tcp连接以及连接对应的上下文(Context)。
(2) 当某用户上线或下线时,能发出事件通知。
(3) 当在线用户(连接)的数量发生变化时,能发出事件通知。
(4) 当用户的请求得到回复时,发出事件通知。这一点对于记录用户请求和跟踪用户请求非常有用)
(5) 能及时主动关闭指定连接。比如,当某一非法用户登录后,用户验证组件通知Tcp组件强行关闭该用户对应的连接。
(6)
除了能转发用户请求及对请求的应答(通过消息分派器)外,还能直接对指定的用户发送数据。这也要求我们的Tcp连接是多线程安全的。
(7) 提供绕开Tcp组件直接从Tcp连接同步接收数据的功能。比如,客户端需要上传一个Blob,我们可能希望直接从Tcp连接进行接收数据,这是有好处的,后面可以看到。
这里列出的是Tcp组件的主要职责,还有很多细节性的没有罗列出来,如果一个Tcp组件解决了上述所有问题,对我来说,应该就是一个很好用、很适用的Tcp组件了。

2Tcp组件接口定义
相信很多朋友和我一样,刚接触Tcp服务端开发的时候,通常是当一个Tcp连接建立的时候,就分配一个线程在该连接上监听请求消息,这种方式的缺点有很多,最主要的缺点是效率低、管理复杂。
我的最初的Tcp组件是C++版本的,那时很有幸接触到了windows平台上最高效的Tcp通信模型――完成端口模型,完全理解这个模型需要点时间,但是Win32 多线程程序设计》(侯捷翻译)和《windows网络编程》这两本书可以给你不少帮助。异步机制是完成端口的基础,完成端口模型的本质思想是将"启动异步操作的线程""提供服务的线程"(即工作者线程)拆伙。理解这一点很重要。在.Net中没有对应的组件或类对应于完成端口模型,解决方案有两个:一是通过P/Invoke来实现自己的完成端口组件,另一种方式是通过.Net的现有通信设施来模拟完成端口实现。
本文给出第二种方案的实现说明,另外,我也给出通过“异步+线程池”的方式的Tcp组件实现,这种方式对于大并发量也可以很好的管理。也就是我,我的EnterpriseServerBase类库中,有两种不同方式的Tcp组件实现,一个是模拟完成端口模型,一个是“异步+线程池”方式。无论是哪种方式,它们都实现了相同的接口ITcpITcp这个接口涵盖了上述的Tcp组件的所有职责,这个接口并不复杂,如果理解了,使用起来也非常简单。我们来看看这个接口的定义:

publicinterfaceITcp:INet,ITcpEventList,ITcpClientsController
{
intConnectionCount{get;}//当前连接的数量
}

这个接口继承了另外三个接口,INet ,ITcpEventList ,ITcpClientsControllerINet接口是为了统一基于TcpUdp的通信组件而抽象出来的,它包含了以下内容:

publicinterfaceINet
{
voidInitializeAll(IReqestStreamDispatcheri_dispatcher,intport,booluserValidated);
voidInitializeAll();
voidUnitializeAll();

NetAddinTypeGetProtocalType();
//Udp,Tcp
eventCallBackDynamicMessageDynamicMsgArrived;//通常是通信插件中一些与服务和用户无关的动态信息,如监听线程重启等
voidStart();
voidStop();

IReqestStreamDispatcherDispatcher{
set;}//支持依赖注入
intPort{get;set;}
boolUserValidated{set;}
}

publicenumNetAddinType
{
Tcp,Udp
}

publicdelegatevoidCallBackDynamicMessage(stringmsg);

IReqestStreamDispatcher就是我们上述图中的消息分派器,这是Tcp通信层中的中央,它的重要性已从前面的关系图中可见一斑了。IReqestStreamDispatcher需要在初始化的时候提供,或者通过Dispatcher属性通过IOC容器进行设值法注入。UserValidated属性用于决定当用户的第一个请求不是登录请求时,是否立即关闭Tcp连接。其它的属性已经加上了注释,非常容易理解。
ITcpEventList接口说明了Tcp组件应当发布的事件,主要对应于前述Tcp组件职责的(2)(3)(4)点。其定义如下:
publicinterfaceITcpEventList
{
eventCallBackForTcpUser2SomeOneConnected;//上线
eventCallBackForTcpUser1SomeOneDisConnected;//掉线
eventCallBackForTcpCountConnectionCountChanged;//在线人数变化
eventCallBackForTcpMonitorServiceCommitted;//用户请求的服务的回复信息
eventCallBackForTcpUserUserAction;
}

每一个在线用户都对应着一个Tcp连接,我们使用tcp连接的Hashcode作为ConnectID来标志每一个连接。UserAction将用户与服务器的交互分为三类:登录、退出和标准功能访问,如以下枚举所示。

publicenumTcpUserAction
{
Logon,Exit,FunctionAccess,
//标准的功能访问
}

最后一个接口ITcpClientsController,主要用来完成上述Tcp组件职责的(5)(6)(7)三点。定义如下:

///<summary>
///ITcpController用于服务器主动控制TCP客户的连接
///</summary>
publicinterfaceITcpClientsController
{
//同步接收消息
boolSynRecieveFrom(intConnectID,byte[]buffer,intoffset,intsize,outintreadCount);

//主动给某个客户同步发信息
voidSendData(intConnectID,byte[]data);
voidSendData(intConnectID,byte[]data,intoffset,intsize);

//主动关闭连接
voidDisposeOneConnection(intconnectID,DisconnectedCausecause);
}

这个接口中的方法的含义是一目了然的。
上述的几个接口已经完整的覆盖了前述的Tcp组件的所有职责,在了解了这些接口定义的基础上,大家已经能够使用EnterpriseServerBase类库中的Tcp组件了。如果想复用的不仅仅是Tcp组件,而是整个Tcp通信层,你就需要关注后面的内容。不管怎样,为了文章的完整性,我在这里先给出前面提到的Tcp组件的两种实现。

3Tcp组件基本元素实现
在实现Tcp组件之前,有一些基本元素需要先建立起来,比如安全的网络流、Tcp监听器、用户连接上下文、上下文管理者等。1)安全的网络流SafeNetworkStream
前面已经提到过,为了能在Tcp组件外部 对指定的连接发送数据,必须保证我们的Tcp连接是线程安全的,而System.Net.Sockets.NetworkStream是非线程安全的,我们必须自己对其进行封装,以保证这一点。System.Net.Sockets.NetworkStream的线程安全的封装就是EnterpriseServerBase.Network.SafeNetworkStream类,它继承了ISafeNetworkStream接口:

///<summary>
///ISafeNetworkStream线程安全的网络流。
///注意:如果调用的异步的begin方法,就一定要调用对应的End方法,否则锁将得不到释放。
///作者:朱伟sky.zhuwei@163.com
///</summary>
publicinterfaceISafeNetworkStream:ITcpSender,ITcpReciever
{
voidFlush();
voidClose();
}


//用于在TCP连接上发送数据,支持同步和异步
publicinterfaceITcpSender
{
voidWrite(byte[]buffer,intoffset,intsize);
IAsyncResultBeginWrite(
byte[]buffer,intoffset,intsize,AsyncCallbackcallback,objectstate);
voidEndWrite(IAsyncResultasyncResult);
}

//用于在TCP连接上接收数据,支持同步和异步
publicinterfaceITcpReciever
{
intRead(byte[]buffer,intoffset,intsize);
IAsyncResultBeginRead(
byte[]buffer,intoffset,intsize,AsyncCallbackcallback,objectstate);
intEndRead(IAsyncResultasyncResult);
}

该接口几乎与System.Net.Sockets.NetworkStream提供的方法一样,只不过它们是线程安全的。这样,针对同一个SafeNetworkStream,我们就可以在不同的线程中同时在其上进行数据接收/发送(主要是发送)了。

2Tcp监听器EnterpriseServerBase.Network.XTcpListener
不可否认,System.Net.Sockets.TcpListener只是提供了一些最低阶的工作,为了将监听线程、端口、监听事件整合起来,我引入了EnterpriseServerBase.Network.XTcpListener类,它可以启动和停止,并且当有Tcp连接建立的时候,会触发事件。XTcpListener实现了IXTcpListener接口,其定义如下:

publicinterfaceIXTcpListener
{
voidStart();//开始或启动监听线程
voidStop();//暂停,但不退出监听线程

voidExitListenThread();//退出监听线程

eventCBackUserLogonTcpConnectionEstablished;//新的Tcp连接成功建立
eventCallBackDynamicMsgDynamicMsgArrived;
}

XTcpListener可以在不同的Tcp组件中复用,这是一种更细粒度的复用。

3)用户连接上下文ContextKey
ContextKey用于将所有的与一个用户Tcp连接相关的信息(比如接收缓冲区、连接的状态――空闲还是忙碌、等)封装起来,并且还能保存该用户的请求中上次未处理完的数据,将其放于接收缓冲区的头部,并与后面接收到的数据进行重组。说到这里,你可能不太明白,我需要解释一下。Tcp协议可以保证我们发出的消息完整的、有序的、正确的到达目的地,但是它不能保证,我们一次发送的数据对方也能一次接收完全。比如,我们发送了一个100Bytes的数据,对方可能要接收两次才能完全,先收到60Bytes,再收到40Bytes,这表明我们可能会收到“半条”消息。还有一种情况,你连续发了两条100Bytes的消息,而对方可能一次就接收了160Bytes,所以需要对消息进行分裂,从中分裂出完整的消息然后进行处理。这,就是前面所说的需要对消息进行分裂、重组的原因。知道这点后,IContextKey接口应该比较容易理解了,因为该接口的很多元素的存在都是为了辅助解决这个问题。IContextKey的定义如下:

publicinterfaceIContextKey
{
NetStreamStateStreamState{
get;set;}//网络流的当前状态--空闲、忙碌
ISafeNetworkStreamNetStream{get;set;}

byte[]Buffer{get;set;}//接收缓冲区
intBytesRead{get;set;}//本次接收的字节数
intPreLeftDataLen{get;set;}
boolIsFirstMsg{get;set;}//是否为建立连接后的第一条消息

intStartOffsetForRecieve{get;}
intMaxRecieveCapacity{get;}//本次可以接收的最大字节数
RequestDataRequestData{get;}

voidResetBuffer(byte[]leftData);//leftData表示上次没有处理完的数据,需要与后面来的数据进行重组,然后再次处理
}

对于消息的分裂和重组是由消息分裂器完成的,由于Tcp组件的实现不需要使用消息分裂器,所以消息分裂器的说明将在后面的消息分派器实现中讲解。

4)上下文管理者ContextKeyManager
ContextKeyManager用于管理所有的ContextKey,其实现的接口IContextKeyManager很容易理解:

publicinterfaceIContextKeyManager
{
voidInsertContextKey(ContextKeycontext_key);
voidDisposeAllContextKey();
boolIsAllStreamSafeToStop();//是否可以安全退出
voidRemoveContextKey(intstreamHashCode);
intConnectionCount{get;}
ISafeNetworkStreamGetNetStream(
intstreamHashCode);
eventCallBackCountChangedStreamCountChanged;
}

在上述四个基本元素的支持下,再来实现Tcp组件就方便了许多,无论是以何种方式(如完成端口模型、异步方式)实现Tcp组件,这些基本元素都是可以通用的,所以如果你要实现自己的Tcp组件,也可以考虑复用上述的一些基本元素。复用可以在不同的粒度进行,复用真是无处不在,呵呵。

4.完成端口Tcp组件实现
前面已经提到,完成端口模型本质思想是将"启动异步操作的线程""提供服务的线程"(即工作者线程)拆伙。只要做到这一点,就模拟了完成端口。
分析一下我们需要几种类型的线程,首先我们需要一个线程来接收TCP连接请求,这就是所谓监听线程,当成功的接收到一个连接后,就向连接发送一个异步接收数据的请求,由于是异步操作,所以会立即返回,然后再去接收新的连接请求,如此监听线程就循环运作起来了(已经封装成前述的XTcpListener组件了)。值得提出的是,在异步接收的回调函数中,应该对接收到的数据进行处理,完成端口模型所做的就是将接收到的数据放在了完成端口队列中,注意,是一个队列。第二种线程类型,就是工作者线程。工作者线程的个数有个经验值是( Cpu个数×2 2),当然具体取多少,还要取决于你的应用的要求。工作者线程的任务就是不断地从完成端口队列中取出数据,并处理它,然后如果有回复,再将回复写入对应的连接。
好,让我们来定义接口IRequestQueueManager,用于模拟完成端口的队列,该队列是线程安全的,用于将所有的请求进行排队,然后由工作者线程来轮流处理这些请求。

publicinterfaceIRequestQueueManager:IRequestPusher
{
objectPop();//弹出队列中的下一个请求
voidClear();
intLength{get;}//队列长度
}

publicinterfaceIRequestPusher
{
voidPush(objectpackage);//向队列中压入一个请求
}

IRequestQueueManager的基础上,可以将工作者线程和启动异步操作的线程拆开了。由于工作者线程只与端口队列相关,所以我决定将它们一起封装起来--成为IIOCPManager,用于管理请求队列和工作者线程。

///<summary>
///IIOCPManager完成端口管理者,主要管理工作者线程和完成端口队列。
///</summary>
publicinterfaceIIOCPManager:IRequestPusher
{
voidInitialize(IOCPPackageHandleri_packageHandler,intthreadCount);
voidStart();//启动工作者线程
voidStop();//退出工作者线程

intWorkThreadCount{get;}

eventCallBackPackageHandledPackageHandled;
}

//IOCPPackageHandler用于处理从完成端口队列中取出的package
publicinterfaceIOCPPackageHandler
{
voidHandlerPackage(objectpackage);//一般以同步实现
}

有了IRequestQueueManagerIIOCPManager的支持,实现基于完成端口模型的Tcp组件就非常简单了。当然,你也可以单独使用IIOCPManager。你只要提供一个监听者线程接收连接,并将从连接接收到的数据通过IRequestPusher接口放入端口队列就可以了。 当然,为了处理接收到的数据,我们需要提供一个实现了IOCPPackageHandler接口的对象给IOCPManager。值得提出的是,你可以在数据处理并发送了回复数据后,再次投递一个异步接收请求,以保证能源源不断的从对应的TCP连接接收数据。下面,我们来看基于完成端口模型的Tcp组件的完整实现。

完成端口Tcp组件
<!--<br><br>Code highlighting produced by Actipro CodeHighlighter (freeware)<br>http://www.CodeHighlighter.com/<br><br>-->1/**////<summary>
2///IocpTcp完成端口Tcp组件。
3///</summary>

4publicclassIocpTcp:ITcp,IOCPPackageHandler
5{
6members#regionmembers
7privateconstintBufferSize=1024;
8privateconstintMaxWorkThreadNum=50;
9
10privateIXTcpListenerxtcpListener;
11privateIIOCPManageriocpMgr=null;
12privateITcpReqStreamDispatchermessageDispatcher=null;
13privateContextKeyManagercontextKeyMgr=newContextKeyManager();
14privateboolstateIsStop=true;
15privateboolvalidateRequest=false;
16privateintcurPort=8888;
17#endregion

18
19publicIocpTcp()
20{
21
22}

23ITcp成员#regionITcp成员
24publicintConnectionCount
25{
26get
27{
28returnthis.contextKeyMgr.ConnectionCount;
29}

30}

31
32#endregion

33
34INet成员#regionINet成员
35
36InitializeAll,UnitializeAll#regionInitializeAll,UnitializeAll
37publicvoidInitializeAll(IReqestStreamDispatcheri_dispatcher,intport,booluserValidated)
38{
39this.messageDispatcher=i_dispatcherasITcpReqStreamDispatcher;
40if(this.messageDispatcher==null)
41{
42thrownewException("Can'tconvertIReqestStreamDispatchertoITcpReqStreamDispatcherinCompletePortManager.InitializeAllmethod!");
43}

44
45this.validateRequest=userValidated;
46this.curPort=port;
47
48this.InitializeAll();
49}

50
51publicvoidInitializeAll()
52{
53this.xtcpListener=newXTcpListener(this.curPort);
54this.xtcpListener.TcpConnectionEstablished+=newCBackUserLogon(xtcpListener_TcpConnectionEstablished);
55this.xtcpListener.DynamicMsgArrived+=newCallBackDynamicMsg(this.PutoutDynamicMsg);
56this.contextKeyMgr.StreamCountChanged+=newCallBackCountChanged(this.OnStreamCountChanged);
57
58this.iocpMgr=newIOCPManager();
59this.iocpMgr.Initialize(this,IocpTcp.MaxWorkThreadNum);
60}

61
62publicvoidUnitializeAll()
63{
64this.Stop();
65this.xtcpListener.ExitListenThread();
66
67//将事件容器清空==》防止外部框架再多次初始化的过程中将一个事件预定多次
68this.ConnectionCountChanged=null;
69this.DynamicMsgArrived=null;
70this.ServiceCommitted=null;
71this.SomeOneConnected=null;
72this.SomeOneDisConnected=null;
73this.UserAction=null;
74}

75#endregion

76
77Start,Stop#regionStart,Stop
78publicvoidStart()
79{
80try
81{
82if(this.stateIsStop)
83{
84this.stateIsStop=false;
85this.xtcpListener.Start();
86this.iocpMgr.Start();
87}

88}

89catch(Exceptionee)
90{
91throwee;
92}

93}

94
95publicvoidStop()
96{
97if(this.stateIsStop)
98{
99return;
100}

101
102this.stateIsStop=true;
103this.xtcpListener.Stop();
104this.iocpMgr.Stop();
105
106//关闭所有连接
107intcount=0;
108while(!this.contextKeyMgr.IsAllStreamSafeToStop())//等待所有流到达停止安全点
109{
110Thread.Sleep(200);
111if(10==count++)
112{
113break;
114}

115}

116this.contextKeyMgr.DisposeAllContextKey();
117}

118#endregion

119
120publiceventEnterpriseServerBase.Network.CallBackDynamicMessageDynamicMsgArrived;
121
122publicNetAddinTypeGetProtocalType()
123{
124returnNetAddinType.Tcp;
125}

126
127#endregion

128
129ITcpEventList成员#regionITcpEventList成员
130publiceventEnterpriseServerBase.Network.CallBackForTcpUser2SomeOneConnected;
131
132publiceventEnterpriseServerBase.Network.CallBackForTcpMonitorServiceCommitted;
133
134publiceventEnterpriseServerBase.Network.CallBackForTcpCountConnectionCountChanged;
135
136publiceventEnterpriseServerBase.Network.CallBackForTcpUser1SomeOneDisConnected;
137
138publiceventEnterpriseServerBase.Network.CallBackForTcpUserUserAction;
139
140#endregion

141
142ITcpClientsController成员#regionITcpClientsController成员
143
144publicvoidSendData(intConnectID,byte[]data)
145{
146this.SendData(ConnectID,data,0,data.Length);
147}

148
149publicvoidSendData(intConnectID,byte[]data,intoffset,intsize)
150{
151if((data==null)||(data.Length==0)||(offset<0)||(size<0)||(offset+size>data.Length))
152{
153return;
154}

155
156ISafeNetworkStreamnetStream=this.contextKeyMgr.GetNetStream(ConnectID);
157if(netStream==null)
158{
159return;
160}

161
162netStream.Write(data,offset,size);
163}

164
165publicboolSynRecieveFrom(intConnectID,byte[]buffer,intoffset,intsize,outintreadCount)
166{
167readCount=0;
168ISafeNetworkStreamnetStream=this.contextKeyMgr.GetNetStream(ConnectID);
169if(netStream==null)
170{
171returnfalse;
172}

173
174readCount=netStream.Read(buffer,offset,size);
175
176returntrue;
177}

178
179publicvoidDisposeOneConnection(intconnectID,EnterpriseServerBase.Network.DisconnectedCausecause)
180{
181this.DisposeOneConnection(connectID);
182
183if(this.SomeOneDisConnected!=null)
184{
185this.SomeOneDisConnected(connectID,cause);
186}

187
188this.ActivateUserActionEvent(connectID,TcpUserAction.Exit);
189}

190
191/**////<summary>
192///DisposeOneConnection主要由用户管理模块调用--当无法检测到掉线情况时,该方法保证资源被释放
193///</summary>

194privatevoidDisposeOneConnection(intconnectID)
195{
196this.contextKeyMgr.RemoveContextKey(connectID);
197}

198
199#endregion

200
201private#regionprivate
202BindRequestToQueue#regionBindRequestToQueue
203privatevoidBindRequestToQueue(IAsyncResultar)
204{
205try
206{
207ContextKeykey=(ContextKey)ar.AsyncState;
208key.BytesRead=key.NetStream.EndRead(ar);
209if(!this.CheckData(key))
210{
211return;
212}

213
214this.iocpMgr.Push(key);
215}

216catch(Exceptionee)
217{
218ee=ee;
219}

220}

221
222CheckData#regionCheckData
223privateboolCheckData(ContextKeykey)
224{
225intstreamHashcode=key.NetStream.GetHashCode();
226if(this.stateIsStop)
227{
228this.DisposeOneConnection(streamHashcode,DisconnectedCause.ServerStopped);
229returnfalse;
230}

231
232if(key.BytesRead==0)//表示客户端掉线或非正常关闭连接
233{
234this.DisposeOneConnection(streamHashcode,DisconnectedCause.LineOff);
235returnfalse;
236}

237
238if(key.BytesRead==8)//表示客户端正常关闭连接
239{
240stringss=System.Text.Encoding.BigEndianUnicode.GetString(key.Buffer,0,8);
241this.DisposeOneConnection(streamHashcode,DisconnectedCause.LineOff);
242returnfalse;
243}

244
245returntrue;
246}

247#endregion

248#endregion

249
250xtcpListener_TcpConnectionEstablished#regionxtcpListener_TcpConnectionEstablished
251privatevoidxtcpListener_TcpConnectionEstablished(NetworkStreamstream)
252{
253ISafeNetworkStreamsafeStream=newSafeNetworkStream(stream);
254ContextKeykey=newContextKey(safeStream,IocpTcp.BufferSize);
255key.ResetBuffer(null);
256this.contextKeyMgr.InsertContextKey(key);
257intconnectID=key.NetStream.GetHashCode();
258if(this.SomeOneConnected!=null)
259{
260this.SomeOneConnected(connectID);
261}

262
263this.ActivateUserActionEvent(connectID,TcpUserAction.Logon);
264
265key.IsFirstMsg=true;
266this.RecieveDataFrom(key);
267}

268#endregion

269
270ActivateUserActionEvent#regionActivateUserActionEvent
271privatevoidActivateUserActionEvent(intConnectID,TcpUserActionaction)
272{
273if(this.UserAction!=null)
274{
275this.UserAction(ConnectID,action);
276}

277}

278#endregion

279
280PutoutDynamicMsg#regionPutoutDynamicMsg
281privatevoidPutoutDynamicMsg(stringmsg)
282{
283if(this.DynamicMsgArrived!=null)
284{
285this.DynamicMsgArrived(msg);
286}

287}

288#endregion

289
290OnStreamCountChanged#regionOnStreamCountChanged
291privatevoidOnStreamCountChanged(intcount)
292{
293if(this.ConnectionCountChanged!=null)
294{
295this.ConnectionCountChanged(count);
296}

297}

298#endregion

299
300RecieveDataFrom#regionRecieveDataFrom
301privatevoidRecieveDataFrom(ContextKeykey)
302{
303try
304{
305key.StreamState=NetStreamState.Reading;
306key.NetStream.BeginRead(key.Buffer,key.StartOffsetForRecieve,key.MaxRecieveCapacity,newAsyncCallback(this.BindRequestToQueue),key);
307}

308catch(Exceptionee)
309{
310ee=ee;
311}

312
313}

314#endregion

315#endregion

316
317IOCPPackageHandler成员#regionIOCPPackageHandler成员
318
319publicvoidHandlerPackage(objectpackage)
320{
321ContextKeykey=packageasContextKey;
322if(key==null)
323{
324return;
325}

326
327intstreamHashCode=key.NetStream.GetHashCode();//是SafeNetworkStream的hashcode
328
329//处理请求
330try
331{
332byte[]leftData=null;
333ArrayListrepondList=this.messageDispatcher.DealRequestMessage(key.RequestData,outleftData,refkey.Validation);
334
335if(this.validateRequest)
336{
337if(key.Validation.gotoCloseConnection)
338{
339this.DisposeOneConnection(streamHashCode,key.Validation.cause);
340return;
341}

342}

343
344key.StreamState=NetStreamState.Writing;
345if(repondList!=null&&(repondList.Count!=0))
346{
347foreach(objectobjinrepondList)
348{
349byte[]respond_stream=(byte[])obj;
350key.NetStream.Write(respond_stream,0,respond_stream.Length);
351if(this.ServiceCommitted!=null)
352{
353RespondInformationinfo=newRespondInformation();
354info.ConnectID=streamHashCode;
355info.ServiceKey=this.messageDispatcher.GetServiceKey(respond_stream);
356info.repondData=respond_stream;
357this.ServiceCommitted(info);
358}

359this.ActivateUserActionEvent(streamHashCode,TcpUserAction.FunctionAccess);
360}

361}

362
363if(key.IsFirstMsg)
364{
365if(repondList==null||(repondList.Count==0))//表示第一条消息还未接收完全
366{
367key.IsFirstMsg=true;
368}

369else
370{
371key.IsFirstMsg=false;
372}

373}

374
375key.StreamState=NetStreamState.Idle;
376
377key.ResetBuffer(leftData);
378
379if(!this.stateIsStop)
380{
381//继续接收请求
382this.RecieveDataFrom(key);
383}

384else//停止服务
385{
386this.DisposeOneConnection(streamHashCode,DisconnectedCause.ServerStopped);
387}

388}

389catch(Exceptionee)
390{
391if(eeisSystem.IO.IOException)//正在读写流的时候,连接断开
392{
393this.DisposeOneConnection(streamHashCode,DisconnectedCause.ServerStopped);
394}

395
396ee=ee;
397}

398}

399
400#endregion

401
402INet成员#regionINet成员
403
404publicIReqestStreamDispatcherDispatcher
405{
406set
407{
408this.messageDispatcher=(ITcpReqStreamDispatcher)value;
409}

410}

411
412publicintPort
413{
414set
415{
416this.curPort=value;
417}

418get
419{
420returnthis.curPort;
421}

422}

423
424publicboolUserValidated
425{
426set
427{
428this.validateRequest=value;
429}

430}

431
432#endregion

433}



5.异步Tcp组件实现
这种方式的主要思想是:当一个新的Tcp连接建立时,就在该连接上发送一个异步接收的请求(BeginRead),并在异步回调中处理该请求,当请求处理完毕,再次发送异步接收请求,如此循环下去。异步接收启用的是系统默认线程池中的线程,所以,在异步Tcp组件中不用显式管理工作线程。异步Tcp组件的实现相对于完成端口模型而言简单许多,也单纯一些,不用管理请求队列,不需使用工作者线程等等。但是,相比于完成端口模型,其也有明显的缺陷:一个Tcp连接绑定到了一个线程,即使这个线程是后台线程池中的。如果用户数量巨大,这对性能是极其不利的;而完成端口模型,则可以限定工作者线程的个数,并且可以根据应用的类型进行灵活调节。
异步Tcp组件实现源码。

异步Tcp组件
<!--<br><br>Code highlighting produced by Actipro CodeHighlighter (freeware)<br>http://www.CodeHighlighter.com/<br><br>-->1/**////<summary>
2///AsynTcp异步Tcp组件。
3///</summary>

4publicclassAsynTcp:ITcp
5{
6members#regionmembers
7privateconstintBufferSize=1024;
8
9privateIXTcpListenerxtcpListener=null;
10privateITcpReqStreamDispatchermessageDispatcher=null;
11privateContextKeyManagercontextKeyMgr=newContextKeyManager();
12privateboolstateIsStop=true;
13privateboolvalidateRequest=false;
14privateintcurPort=8888;
15#endregion

16
17
18publicAsynTcp()
19{
20
21}

22
23INet成员#regionINet成员
24
25publiceventCallBackDynamicMessageDynamicMsgArrived;
26
27publicNetAddinTypeGetProtocalType()
28{
29
30returnNetAddinType.Tcp;
31}

32
33InitializeAll,UnitializeAll#regionInitializeAll,UnitializeAll
34publicvoidInitializeAll(IReqestStreamDispatcheri_dispatcher,intport,booluserValidated)
35{
36this.messageDispatcher=i_dispatcherasITcpReqStreamDispatcher;
37if(this.messageDispatcher==null)
38{
39thrownewException("Can'tconvertIReqestStreamDispatchertoITcpReqStreamDispatcherinCompletePortManager.InitializeAllmethod!");
40}

41
42this.curPort=port;
43this.validateRequest=userValidated;
44
45this.InitializeAll();
46}

47
48publicvoidInitializeAll()
49{
50this.xtcpListener=newXTcpListener(this.curPort);
51this.xtcpListener.TcpConnectionEstablished+=newCBackUserLogon(xtcpListener_TcpConnectionEstablished);
52this.xtcpListener.DynamicMsgArrived+=newCallBackDynamicMsg(this.PutoutDynamicMsg);
53this.contextKeyMgr.StreamCountChanged+=newCallBackCountChanged(this.OnStreamCountChanged);
54}

55
56publicvoidUnitializeAll()
57{
58this.Stop();
59this.xtcpListener.ExitListenThread();
60
61//将事件容器清空==》防止外部框架再多次初始化的过程中将一个事件预定多次
62this.ConnectionCountChanged=null;
63this.DynamicMsgArrived=null;
64this.ServiceCommitted=null;
65this.SomeOneConnected=null;
66this.SomeOneDisConnected=null;
67this.UserAction=null;
68}

69
70#endregion

71
72Start,Stop#regionStart,Stop
73publicvoidStart()
74{
75if(this.stateIsStop)
76{
77this.xtcpListener.Start();
78this.stateIsStop=false;
79}

80}

81
82publicvoidStop()
83{
84if(this.stateIsStop)
85{
86return;
87}

88
89this.stateIsStop=true;
90this.xtcpListener.Stop();
91
92//关闭所有连接
93intcount=0;
94while(!this.contextKeyMgr.IsAllStreamSafeToStop())//等待所有流到达停止安全点
95{
96Thread.Sleep(200);
97if(10==count++)
98{
99break;
100}

101}

102this.contextKeyMgr.DisposeAllContextKey();
103}

104#endregion

105
106#endregion

107
108ITcpEventList成员#regionITcpEventList成员
109
110publiceventEnterpriseServerBase.Network.CallBackForTcpUser2SomeOneConnected;
111
112publiceventEnterpriseServerBase.Network.CallBackForTcpMonitorServiceCommitted;
113
114publiceventEnterpriseServerBase.Network.CallBackForTcpCountConnectionCountChanged;
115
116publiceventEnterpriseServerBase.Network.CallBackForTcpUser1SomeOneDisConnected;
117
118publiceventEnterpriseServerBase.Network.CallBackForTcpUserUserAction;
119
120#endregion

121
122ITcpClientsController成员#regionITcpClientsController成员
123
124publicboolSynRecieveFrom(intConnectID,byte[]buffer,intoffset,intsize,outintreadCount)
125{
126readCount=0;
127ISafeNetworkStreamnetStream=this.contextKeyMgr.GetNetStream(ConnectID);
128if(netStream==null)
129{
130returnfalse;
131}

132
133readCount=netStream.Read(buffer,offset,size);
134
135returntrue;
136}

137
138publicvoidSendData(intConnectID,byte[]data)
139{
140this.SendData(ConnectID,data,0,data.Length);
141}

142
143publicvoidSendData(intConnectID,byte[]data,intoffset,intsize)
144{
145if((data==null)||(data.Length==0)||(offset<0)||(size<0)||(offset+size>data.Length))
146{
147return;
148}

149
150ISafeNetworkStreamnetStream=this.contextKeyMgr.GetNetStream(ConnectID);
151if(netStream==null)
152{
153return;
154}

155
156netStream.Write(data,offset,size);
157}

158
159publicvoidDisposeOneConnection(intconnectID,DisconnectedCausecause)
160{
161this.DisposeOneConnection(connectID);
162
163if(this.SomeOneDisConnected!=null)
164{
165this.SomeOneDisConnected(connectID,cause);
166}

167
168this.ActivateUserActionEvent(connectID,TcpUserAction.Exit);
169}

170
171#endregion

172
173ITcp成员#regionITcp成员
174publicintConnectionCount
175{
176get
177{
178returnthis.contextKeyMgr.ConnectionCount;
179}

180}

181
182#endregion

183
184private#regionprivate
185
186ActivateUserActionEvent#regionActivateUserActionEvent
187privatevoidActivateUserActionEvent(intConnectID,TcpUserActionaction)
188{
189if(this.UserAction!=null)
190{
191this.UserAction(ConnectID,action);
192}

193}

194#endregion

195
196DisposeOneConnection#regionDisposeOneConnection
197/**////<summary>
198///DisposeOneConnection主要由用户管理模块调用--当无法检测到掉线情况时,该方法保证资源被释放
199///</summary>

200privatevoidDisposeOneConnection(intconnectID)
201{
202this.contextKeyMgr.RemoveContextKey(connectID);
203}

204#endregion

205
206xtcpListener_TcpConnectionEstablished#regionxtcpListener_TcpConnectionEstablished
207privatevoidxtcpListener_TcpConnectionEstablished(NetworkStreamstream)
208{
209ISafeNetworkStreamsafeStream=newSafeNetworkStream(stream);
210
211ContextKeykey=newContextKey(safeStream,AsynTcp.BufferSize);
212key.ResetBuffer(null);
213this.contextKeyMgr.InsertContextKey(key);
214intconnectID=key.NetStream.GetHashCode();
215
216if(this.SomeOneConnected!=null)
217{
218this.SomeOneConnected(connectID);
219}

220this.ActivateUserActionEvent(connectID,TcpUserAction.Logon);
221
222key.IsFirstMsg=true;
223this.RecieveDataFrom(key);
224}

225#endregion

226
227PutoutDynamicMsg#regionPutoutDynamicMsg
228privatevoidPutoutDynamicMsg(stringmsg)
229{
230if(this.DynamicMsgArrived!=null)
231{
232this.DynamicMsgArrived(msg);
233}

234}

235#endregion

236
237OnStreamCountChanged#regionOnStreamCountChanged
238privatevoidOnStreamCountChanged(intcount)
239{
240if(this.ConnectionCountChanged!=null)
241{
242this.ConnectionCountChanged(count);
243}

244}

245#endregion

246
247RecieveDataFrom#regionRecieveDataFrom
248privatevoidRecieveDataFrom(ContextKeykey)
249{
250key.StreamState=NetStreamState.Reading;
251key.NetStream.BeginRead(key.Buffer,key.StartOffsetForRecieve,key.MaxRecieveCapacity,newAsyncCallback(this.ServeOverLap),key);
252
253}

254#endregion

255
256ServeOverLap#regionServeOverLap
257privatevoidServeOverLap(IAsyncResultar)
258{
259ContextKeykey=(ContextKey)ar.AsyncState;
260intstreamHashCode=key.NetStream.GetHashCode();//是SafeNetworkStream的hashcode
261
262try
263{
264key.BytesRead=key.NetStream.EndRead(ar);
265
266if(!this.CheckData(key))
267{
268return;
269}

270
271//处理请求
272byte[]leftData=null;
273ArrayListrepondList=this.messageDispatcher.DealRequestMessage(key.RequestData,outleftData,refkey.Validation);
274
275if(this.validateRequest)
276{
277if(key.Validation.gotoCloseConnection)
278{
279this.DisposeOneConnection(streamHashCode,key.Validation.cause);
280}

281}

282
283key.StreamState=NetStreamState.Writing;
284if(repondList!=null&&(repondList.Count!=0))
285{
286foreach(objectobjinrepondList)
287{
288byte[]respond_stream=(byte[])obj;
289key.NetStream.Write(respond_stream,0,respond_stream.Length);
290if(this.ServiceCommitted!=null)
291{
292RespondInformationinfo=newRespondInformation();
293info.ConnectID=streamHashCode;
294info.ServiceKey=this.messageDispatcher.GetServiceKey(respond_stream);
295info.repondData=respond_stream;
296this.ServiceCommitted(info);
297}

298
299this.ActivateUserActionEvent(streamHashCode,TcpUserAction.FunctionAccess);
300}

301}

302
303if(key.IsFirstMsg)
304{
305if(repondList==null||(repondList.Count==0))//表示第一条消息还未接收完全
306{
307key.IsFirstMsg=true;
308}

309else
310{
311key.IsFirstMsg=false;
312}

313}

314
315key.StreamState=NetStreamState.Idle;
316
317key.ResetBuffer(leftData);
318
319if(!this.stateIsStop)
320{
321//继续接收请求
322this.RecieveDataFrom(key);
323}

324else//停止服务
325{
326this.DisposeOneConnection(streamHashCode,DisconnectedCause.ServerStopped);
327}

328}

329catch(Exceptionee)
330{
331if(eeisSystem.IO.IOException)//正在读写流的时候,连接断开
332{
333this.DisposeOneConnection(streamHashCode,DisconnectedCause.ServerStopped);
334}

335
336ee=ee;
337}

338}

339#endregion

340
341CheckData#regionCheckData
342privateboolCheckData(ContextKeykey)
343{
344intstreamHashcode=key.NetStream.GetHashCode();
345if(this.stateIsStop)
346{
347this.DisposeOneConnection(streamHashcode,DisconnectedCause.ServerStopped);
348returnfalse;
349}

350
351if(key.BytesRead==0)//表示客户端掉线或非正常关闭连接
352{
353this.DisposeOneConnection(streamHashcode,DisconnectedCause.LineOff);
354returnfalse;
355}

356
357if(key.BytesRead==8)//表示客户端正常关闭连接
358{
359stringss=System.Text.Encoding.BigEndianUnicode.GetString(key.Buffer,0,8);
360this.DisposeOneConnection(streamHashcode,DisconnectedCause.LineOff);
361returnfalse;
362}

363
364returntrue;
365}

366#endregion

367#endregion

368
369INet成员#regionINet成员
370
371publicIReqestStreamDispatcherDispatcher
372{
373set
374{
375this.messageDispatcher=(ITcpReqStreamDispatcher)value;
376}

377}

378
379publicintPort
380{
381set
382{
383this.curPort=value;
384}

385get
386{
387returnthis.curPort;
388}

389}

390
391publicboolUserValidated
392{
393set
394{
395this.validateRequest=value;
396}

397}

398
399#endregion

400}


今天介绍了Tcp通信层中的核心――Tcp组件,仅仅复用Tcp组件已经能为我们省去很多麻烦了,如果想进行更高层次的复用――整个Tcp通信层的复用,请关注本篇的续文。







分享到:
评论

相关推荐

    NET平台下可复用的Tcp通信层实现

    以前就很想将自己在Tcp通信层的开发心得、经验共享出来,但一直没有实现,究其原因,还是自己太懒了。今天终于找到一个时机,写下这篇文章,也算是对2005年的另一种形式的回忆吧。 绝大多数C/S(包括多层)结构的...

    亮剑.NET深入体验与实战精要2

    10.2 异步Socket通信——实现MSN机器人 390 10.2.1 机器人服务端 390 10.2.2 客户端实现步骤 395 10.3 基于TCP协议的客户端和服务端 398 10.3.1 TcpListener 实现网络服务端 398 10.3.2 TcpClient实现网络客户端 399...

    亮剑.NET深入体验与实战精要3

    10.2 异步Socket通信——实现MSN机器人 390 10.2.1 机器人服务端 390 10.2.2 客户端实现步骤 395 10.3 基于TCP协议的客户端和服务端 398 10.3.1 TcpListener 实现网络服务端 398 10.3.2 TcpClient实现网络客户端 399...

    ESFramework .net框架

    后来我将EnterpriseServerBase中的Network部分及建立于之上的应用抽象重新整理为ESFramework框架,这是一套完全可复用的、灵活的、单纯的、支持N层C/S架构的轻量级通信框架,内置了对Tcp和Udp协议的支持。...

    C# TCP异步数据传输

    TCP的接收要发送功能都已经封状成类,大家可以尽量少的代码就可以复用。实现的功能也很简 单,只有一个接收和一个发送。代码中注释满满,大家一起研究吧。希望大家通过这个小程序 熟悉TCP实的基本过程和要求。 VS...

    Herm(一套快速开发高性能的网络应用的C++库)

    用Socket实现TCP Server更灵活,但实现者要做一些额外的工作,比如tcp stream解析,缓冲队列处理等等。 首先实现一个AcceptHandler,处理Client连接, class AcceptHandler : public Herm::EventHandler { ...

    2018吉林大学c++课设源代码压缩包

    (5)实现QQ的点对点的TCP通信的收发功能。(选做)提示: a)需要加载ws2_32.lib静态库,打开头文件winsock.h。 b)百度IP地址、端口等概念; c)百度socket编程,关注bind、listen、accept、connect、send、...

    内网穿透FRP工具 windows 客户端和服务端 V0.46

    客户端服务端通信支持 TCP、KCP 以及 Websocket 等多种协议。 采用 TCP 连接流式复用,在单个连接间承载更多请求,节省连接建立时间。 代理组间的负载均衡。 端口复用,多个服务通过同一个服务端端口暴露。 多个原生...

    某通信公司内部培训资料.doc

    系统中的每一 层叫做一个域,每个域用一个点分开。所谓域名服务器(即Domain Name Server,简称 Name Server)实际上就是装有域名系统的主机。它是一种能够实现名字解析(name re solution)的分层结构数据库。 ...

    JAVA上百实例源码以及开源项目

    百度云盘分享 ... Java实现的FTP连接与数据浏览程序,实现实例化可操作的窗口。  部分源代码摘录:  ftpClient = new FtpClient(); //实例化FtpClient对象  String serverAddr=jtfServer.getText();...

    JAVA上百实例源码以及开源项目源代码

     Java实现的FTP连接与数据浏览程序,实现实例化可操作的窗口。  部分源代码摘录:  ftpClient = new FtpClient(); //实例化FtpClient对象  String serverAddr=jtfServer.getText(); //得到服务器地址  ...

    java自学之道

    5.2 创建可复用类的步骤简要说明 5.3 包的导入 5.4 包的可见性 接口 6.1 接口的概念 6.2 接口的声明 6.3 接口的实现 四、IO流及异常处理 1、流和文件 1.1 流 1.2 文件 2、常用流类 2.1 字节流 2.1.1 ...

    RED HAT LINUX 6大全

    10.1.3 下载新闻组的可选方法 187 10.2 INN硬件与软件要求 188 10.3 INN介绍 188 10.3.1 安装INN 188 10.3.2 INN启动文件 189 10.3.3 配置INN 193 10.4 NNTPCache介绍 194 10.4.1 NNTPCache如何工作 194 10.4.2 下载...

    计算机网络技术基础

    1.2.5 面向终端的计算机通信网络 .............................................................................. 4 1.2.6 以共享资源为目标的计算机网络 .......................................................

Global site tag (gtag.js) - Google Analytics