前言

ReactorDouglas C. Schmidt提出的一种设计模式,翻译为反应器模式,或者分发器模式。很多开源的项目都使用了此设计模式,如muduoRedisNginxNetty

本文假定读者有一定的C/C++的网络编程经验,并且对IO多路复用有一定的了解,如selectpollepoll等函数。

本文主要面向对Reactor有一定了解,但是并不知道C++类应该如何设计的读者。

主要组成

Reactor.png

  • Handlers

    在网络编程中即socket句柄。

  • Synchronous Event Demultiplexer

    同步事件分离器,即IO多路复用函数(如selectpollepoll)的包装类。

  • Initiation Dispatcher

    调度器,控制整体的流程。负责事件的注册、分离、调度。

  • Event Handler

    事件处理程序,一般为一个抽象基类,只定义接口,具体的实现由实现类负责。

  • Concrete Event Handler

    具体的事件处理类,为Event Handler的子类,负责具体实现。如接受连接的AcceptorHandler、进行数据读写的LoggingHandler

交互关系

Reactor.png

  • 应用程序注册具体的事件处理对象到调度器中。当调度器中发生指定的类型的事件时,调用对应的事件处理对象。
  • 调度器获取事件处理对象的socket句柄。
  • 调度器将所有的socket句柄传入同步事件分离器中,即用IO多路复用函数进行阻塞调用。
  • 当有连接、可读事件发生时,IO多路复用函数返回,流程返回至调度器中。
  • 调度器通过事件的类型,调用对应的事件处理对象。

使用场景

以一个日志服务器为例。

客户端连接服务端

客户端至服务器.png

  1. 具体的事件处理类Logging Acceptor向调度器注册,Logging Acceptor负责处理所有客户端的连接事件。
  2. 调度器调用handle_events()函数。
  3. 调度器通过调用同步事件分离器的IO多路复用函数来等待事件的到来,如select()函数。
  4. 客户端通过IP地址和端口连接至服务器。
  5. 调度器调用Logging Acceptorhandle_event()函数,来处理新连接。
  6. Logging Acceptor调用accept()函数来接受新的连接。
  7. Logging Acceptor创建Logging Handler代表一个socket的连接。
  8. Logging Handler向调度器注册,当有新的可读事件发生时,调度器通知Logging Handler

客户端发送数据至服务器

使用场景.png

  1. 客户端调用send()函数发送一条数据至服务端。
  2. 调度器通过句柄索引到Logging Handler,然后调用对应的handle_event()函数。
  3. 通过recv()函数来接收数据(此处应该以非阻塞的方式进行,如果一次没有读取一整条完整的数据包,则再进行步骤2、3,如此循环)。
  4. Logging Handler处理数据,然后调用write()函数进行回复。
  5. 控制流程返回至调度器中。

主要实现类

InitiationDispatcher

//InitiationDispatcher.h

enum Event_Type
    // = TITLE
    //  Types of events handled by the
    //  Initiation_Dispatcher.
    //
    // = DESCRIPTION
    //  These values are powers of two so
    //  their bits can be efficiently ‘‘or’d’’
    //  together to form composite values.
{
    ACCEPT_EVENT = 01,
    READ_EVENT = 02,
    WRITE_EVENT = 04,
    TIMEOUT_EVENT = 010,
    SIGNAL_EVENT = 020,
    CLOSE_EVENT = 040
};

class Initiation_Dispatcher
    // = TITLE
    //  Demultiplex and dispatch Event_Handlers
    //  in response to client requests.
{
public:
    // Register an Event_Handler of a particular
    // Event_Type (e.g., READ_EVENT, ACCEPT_EVENT,
    // etc.).
    int register_handler(Event_Handler * eh,
                         Event_Type et);

    // Remove an Event_Handler of a particular
    // Event_Type.
    int remove_handler(Event_Handler * eh,
                       Event_Type et);

    // Entry point into the reactive event loop.
    int handle_events(Time_Value *timeout = 0);

private:
    map<int, Event_Handler*> handlers_;
};

Event_Handler

// Event_Handler.h

class Event_Handler
    // = TITLE
    //  Abstract base class that serves as the
    //  target of the Initiation_Dispatcher.
{
public:
    // Hook method that is called back by the
    // Initiation_Dispatcher to handle events.
    virtual int handle_event(Event_Type et) = 0;
    // Hook method that returns the underlying
    // I/O Handle.
    virtual Handle get_handle(void) const = 0;
};

Logging_Acceptor

// Logging_Acceptor.h

class Logging_Acceptor : public Event_Handler
    // = TITLE
    //  Handles client connection requests.
{
public:
    // Initialize the acceptor_ endpoint and
    // register with the Initiation Dispatcher.
    Logging_Acceptor(const INET_Addr &addr);
    
    // Factory method that accepts a new
    // SOCK_Stream connection and creates a
    // Logging_Handler object to handle logging
    // records sent using the connection.
    virtual void handle_event(Event_Type et);
    
    // Get the I/O Handle (called by the
    // Initiation Dispatcher when
    // Logging_Acceptor is registered).
    virtual HANDLE get_handle(void) const
    {
        return acceptor_.get_handle();
    }

private:
    // Socket factory that accepts client
    // connections.
    SOCK_Acceptor acceptor_;
};
// Logging_Acceptor.cpp

Logging_Acceptor::Logging_Acceptor(const INET_Addr &addr)
    : acceptor_(addr)
{
    // Register acceptor with the Initiation
    // Dispatcher, which "double dispatches"
    // the Logging_Acceptor::get_handle() method
    // to obtain the HANDLE.
    Initiation_Dispatcher::instance()->register_handler(this, ACCEPT_EVENT);
}

void Logging_Acceptor::handle_event(Event_Type et)
{
    // Can only be called for an ACCEPT event.
    assert (et == ACCEPT_EVENT);
    SOCK_Stream new_connection;

    // Accept the connection.
    acceptor_.accept (new_connection);

    // Create a new Logging Handler.
    Logging_Handler *handler = new Logging_Handler(new_connection);
}

Logging_Handler

// Logging_Handler.h

class Logging_Handler : public Event_Handler
    // = TITLE
    //  Receive and process logging records
    //  sent by a client application.
{
public:
    // Initialize the client stream.
    Logging_Handler(SOCK_Stream &cs);

    // Hook method that handles the reception
    // of logging records from clients.
    virtual void handle_event(Event_Type et);

    // Get the I/O Handle (called by the
    // Initiation Dispatcher when
    // Logging_Handler is registered).
    virtual HANDLE get_handle(void) const
    {
        return peer_stream_.get_handle();
    }

private:
    // Receives logging records from a client.
    SOCK_Stream peer_stream_;
};
// Logging_Handler.cpp

Logging_Handler::Logging_Handler(SOCK_Stream &cs)
    :peer_stream_(cs)
{
    // Register with the dispatcher for
    // READ events.
    Initiation_Dispatcher::instance()->register_handler(this, READ_EVENT);
}

void Logging_Handler::handle_event(Event_Type et)
{
    if (et == READ_EVENT) 
    {
        Log_Record log_record;
        peer_stream_.recv((void *)log_record, sizeof log_record);
        
        // Write logging record to standard output.
        log_record.write(STDOUT);
    }
    else if(et == CLOSE_EVENT) 
    {
        peer_stream_.close();
        delete (void *)this;
    }
}

main

// main.cpp

// Server port number.
const u_short PORT = 10000;

int main (void)
{
    // Logging server port number.
    INET_Addr server_addr (PORT);
    
    // Initialize logging server endpoint and
    // register with the Initiation_Dispatcher.
    Logging_Acceptor la (server_addr);
    
    // Main event loop that handles client
    // logging records and connection requests.
    
    for (;;)
    {
        Initiation_Dispatcher::instance()->handle_events ();
    }
    
    /* NOTREACHED */
    return 0;
}

参考文档

Reactor pattern,如有能力,请阅读原文档。

标签: 设计模式

添加新评论