Reactor模式
前言
Reactor
为Douglas C. Schmidt
提出的一种设计模式,翻译为反应器模式,或者分发器模式。很多开源的项目都使用了此设计模式,如muduo
、Redis
、Nginx
、Netty
。
本文假定读者有一定的C/C++的网络编程经验,并且对IO多路复用有一定的了解,如select
、poll
、epoll
等函数。
本文主要面向对Reactor
有一定了解,但是并不知道C++类应该如何设计的读者。
主要组成
Handlers
在网络编程中即socket句柄。
Synchronous Event Demultiplexer
同步事件分离器,即IO多路复用函数(如
select
、poll
、epoll
)的包装类。Initiation Dispatcher
调度器,控制整体的流程。负责事件的注册、分离、调度。
Event Handler
事件处理程序,一般为一个抽象基类,只定义接口,具体的实现由实现类负责。
Concrete Event Handler
具体的事件处理类,为
Event Handler
的子类,负责具体实现。如接受连接的AcceptorHandler
、进行数据读写的LoggingHandler
。
交互关系
- 应用程序注册具体的事件处理对象到调度器中。当调度器中发生指定的类型的事件时,调用对应的事件处理对象。
- 调度器获取事件处理对象的socket句柄。
- 调度器将所有的socket句柄传入同步事件分离器中,即用IO多路复用函数进行阻塞调用。
- 当有连接、可读事件发生时,IO多路复用函数返回,流程返回至调度器中。
- 调度器通过事件的类型,调用对应的事件处理对象。
使用场景
以一个日志服务器为例。
客户端连接服务端
- 具体的事件处理类
Logging Acceptor
向调度器注册,Logging Acceptor
负责处理所有客户端的连接事件。 - 调度器调用
handle_events()
函数。 - 调度器通过调用同步事件分离器的IO多路复用函数来等待事件的到来,如
select()
函数。 - 客户端通过IP地址和端口连接至服务器。
- 调度器调用
Logging Acceptor
的handle_event()
函数,来处理新连接。 Logging Acceptor
调用accept()
函数来接受新的连接。Logging Acceptor
创建Logging Handler
代表一个socket的连接。Logging Handler
向调度器注册,当有新的可读事件发生时,调度器通知Logging Handler
。
客户端发送数据至服务器
使用场景.png
- 客户端调用
send()
函数发送一条数据至服务端。 - 调度器通过句柄索引到
Logging Handler
,然后调用对应的handle_event()
函数。 - 通过
recv()
函数来接收数据(此处应该以非阻塞的方式进行,如果一次没有读取一整条完整的数据包,则再进行步骤2、3,如此循环)。 Logging Handler
处理数据,然后调用write()
函数进行回复。- 控制流程返回至调度器中。
主要实现类
InitiationDispatcher
//InitiationDispatcher.h
enum Event_Type
// = TITLE
// Types of events handled by the
// Initiation_Dispatcher.
//
// = DESCRIPTION
// These values are powers of two so
// their bits can be efficiently ‘‘or’d’’
// together to form composite values.
{
ACCEPT_EVENT = 01,
READ_EVENT = 02,
WRITE_EVENT = 04,
TIMEOUT_EVENT = 010,
SIGNAL_EVENT = 020,
CLOSE_EVENT = 040
};
class Initiation_Dispatcher
// = TITLE
// Demultiplex and dispatch Event_Handlers
// in response to client requests.
{
public:
// Register an Event_Handler of a particular
// Event_Type (e.g., READ_EVENT, ACCEPT_EVENT,
// etc.).
int register_handler(Event_Handler * eh,
Event_Type et);
// Remove an Event_Handler of a particular
// Event_Type.
int remove_handler(Event_Handler * eh,
Event_Type et);
// Entry point into the reactive event loop.
int handle_events(Time_Value *timeout = 0);
private:
map<int, Event_Handler*> handlers_;
};
Event_Handler
// Event_Handler.h
class Event_Handler
// = TITLE
// Abstract base class that serves as the
// target of the Initiation_Dispatcher.
{
public:
// Hook method that is called back by the
// Initiation_Dispatcher to handle events.
virtual int handle_event(Event_Type et) = 0;
// Hook method that returns the underlying
// I/O Handle.
virtual Handle get_handle(void) const = 0;
};
Logging_Acceptor
// Logging_Acceptor.h
class Logging_Acceptor : public Event_Handler
// = TITLE
// Handles client connection requests.
{
public:
// Initialize the acceptor_ endpoint and
// register with the Initiation Dispatcher.
Logging_Acceptor(const INET_Addr &addr);
// Factory method that accepts a new
// SOCK_Stream connection and creates a
// Logging_Handler object to handle logging
// records sent using the connection.
virtual void handle_event(Event_Type et);
// Get the I/O Handle (called by the
// Initiation Dispatcher when
// Logging_Acceptor is registered).
virtual HANDLE get_handle(void) const
{
return acceptor_.get_handle();
}
private:
// Socket factory that accepts client
// connections.
SOCK_Acceptor acceptor_;
};
// Logging_Acceptor.cpp
Logging_Acceptor::Logging_Acceptor(const INET_Addr &addr)
: acceptor_(addr)
{
// Register acceptor with the Initiation
// Dispatcher, which "double dispatches"
// the Logging_Acceptor::get_handle() method
// to obtain the HANDLE.
Initiation_Dispatcher::instance()->register_handler(this, ACCEPT_EVENT);
}
void Logging_Acceptor::handle_event(Event_Type et)
{
// Can only be called for an ACCEPT event.
assert (et == ACCEPT_EVENT);
SOCK_Stream new_connection;
// Accept the connection.
acceptor_.accept (new_connection);
// Create a new Logging Handler.
Logging_Handler *handler = new Logging_Handler(new_connection);
}
Logging_Handler
// Logging_Handler.h
class Logging_Handler : public Event_Handler
// = TITLE
// Receive and process logging records
// sent by a client application.
{
public:
// Initialize the client stream.
Logging_Handler(SOCK_Stream &cs);
// Hook method that handles the reception
// of logging records from clients.
virtual void handle_event(Event_Type et);
// Get the I/O Handle (called by the
// Initiation Dispatcher when
// Logging_Handler is registered).
virtual HANDLE get_handle(void) const
{
return peer_stream_.get_handle();
}
private:
// Receives logging records from a client.
SOCK_Stream peer_stream_;
};
// Logging_Handler.cpp
Logging_Handler::Logging_Handler(SOCK_Stream &cs)
:peer_stream_(cs)
{
// Register with the dispatcher for
// READ events.
Initiation_Dispatcher::instance()->register_handler(this, READ_EVENT);
}
void Logging_Handler::handle_event(Event_Type et)
{
if (et == READ_EVENT)
{
Log_Record log_record;
peer_stream_.recv((void *)log_record, sizeof log_record);
// Write logging record to standard output.
log_record.write(STDOUT);
}
else if(et == CLOSE_EVENT)
{
peer_stream_.close();
delete (void *)this;
}
}
main
// main.cpp
// Server port number.
const u_short PORT = 10000;
int main (void)
{
// Logging server port number.
INET_Addr server_addr (PORT);
// Initialize logging server endpoint and
// register with the Initiation_Dispatcher.
Logging_Acceptor la (server_addr);
// Main event loop that handles client
// logging records and connection requests.
for (;;)
{
Initiation_Dispatcher::instance()->handle_events ();
}
/* NOTREACHED */
return 0;
}
参考文档
Reactor pattern,如有能力,请阅读原文档。