Reactor

Event-driven applications, such as GUIs or servers, often apply the architecture pattern Reactor. A Reactor can accept multiple requests simultaneously and distribute them to different handlers.

Reactor

The Reactor Pattern is an event-driven framework to concurrently demultiplex and dispatch service requests to various service providers. The requests are processed synchronously.

Reactor

Also known as

  • Dispatcher
  • Notifier

Problem

  • A server should
    • answer several client requests simultaneously
    • be performant, stable, and scalable
    • be extendable to support new or improved services
  • The application should be hidden from multi-threading and synchronization challenges

 Solution

  • Each supported service is encapsulated in a handler
  • The handlers are registered within the Reactor
  • The Reactor uses an event demultiplexer to wait synchronously on all incoming events
  • When the Reactor is notified, it dispatches the service request to the specific handler

Structure

reactorUML

reactorCRCUpdate

Handles

  • The handles identify different event sources, such as network connections, open files, or GUI events.
  • The event source generates events such as connect, read, or write queued on the associated handle.

Synchronous event demultiplexer

  • The synchronous event demultiplexer waits for one or more indication events and blocks until the associated handle can process the event.
  • The system calls select, poll, epoll, kqueue, or WaitForMultipleObjects enable it to wait for indication events.

Event handler

  • The event handler defines the interface for processing the indication events.
  • The event handler defines the supported services of the application.

Concrete event handler

  • The concrete event handler implements the interface of the application defined by the event handler.

Reactor

  • The Reactor supports an interface to register and deregister the concrete event handler using file descriptors.
  • The Reactor uses a synchronous event demultiplexer to wait for indication events. An indication event can be a reading event, a writing event, or an error event.
  • The Reactor maps the events to their concrete event handler.
  • The Reactor manages the lifetime of the event loop.

The Reactor (not the application) waits for the indication events to demultiplex and dispatch the event. The concrete event handlers are registered within the Reactor. The Reactor inverts the flow of control. This inversion of control is often called Hollywood principle.

The dynamic behavior of a Reactor is pretty interesting.

Dynamic Behavior

The following points illustrate the control flow between the Reactor and the event handler.

  • The application registers an event handler for specific events in the Reactor.
  • Each event handler provides its specific handler to the Reactor.
  • The application starts the event loop. The event loop waits for indication events.
  • The event demultiplexer returns to the Reactor when an event source becomes ready.
  • The Reactor dispatches the handles to the corresponding event handler.
  • The event handler processes the event.

Let’s study the Reactor in action.

This example uses the POCO framework. “The POCO C++ Libraries are powerful cross-platform C++ libraries for building network- and internet-based applications that run on desktop, server, mobile, IoT, and embedded systems.”

// reactor.cpp

#include 
#include 

#include "Poco/Net/SocketReactor.h"
#include "Poco/Net/SocketAcceptor.h"
#include "Poco/Net/SocketNotification.h"
#include "Poco/Net/StreamSocket.h"
#include "Poco/Net/ServerSocket.h"
#include "Poco/Observer.h"
#include "Poco/Thread.h"
#include "Poco/Util/ServerApplication.h"

using Poco::Observer;
using Poco::Thread;

using Poco::Net::ReadableNotification;
using Poco::Net::ServerSocket;
using Poco::Net::ShutdownNotification;
using Poco::Net::SocketAcceptor;
using Poco::Net::SocketReactor;
using Poco::Net::StreamSocket;

using Poco::Util::Application;

class EchoHandler {
 public:
  EchoHandler(const StreamSocket& s, SocketReactor& r): socket(s), reactor(r) {           // (11)
    reactor.addEventHandler(socket, 
      Observer<EchoHandler, ReadableNotification>(*this, &EchoHandler::socketReadable));
  }

  void socketReadable(ReadableNotification*) {
    char buffer[8];
    int n = socket.receiveBytes(buffer, sizeof(buffer));
    if (n > 0) {
      socket.sendBytes(buffer, n);                                                       // (13)                                                    
    }
    else {
      reactor.removeEventHandler(socket,                                                 // (12)
	      Observer<EchoHandler, ReadableNotification>(*this, &EchoHandler::socketReadable));
      delete this;
    }
  }

 private:
  StreamSocket socket;
  SocketReactor& reactor;
};

class DataHandler {
 public:

  DataHandler(StreamSocket& s, SocketReactor& r): socket(s), reactor(r), 
                                                  outFile("reactorOutput.txt") {
    reactor.addEventHandler(socket,                                                   // (14) 
      Observer<DataHandler, ReadableNotification>(*this, &DataHandler::socketReadable));
    reactor.addEventHandler(socket,                                                   // (15)
      Observer<DataHandler, ShutdownNotification>(*this, &DataHandler::socketShutdown));
    socket.setBlocking(false);
  }

  ~DataHandler() {                                                                    // (16)
    reactor.removeEventHandler(socket, 
      Observer<DataHandler, ReadableNotification>(*this, &DataHandler::socketReadable));
    reactor.removeEventHandler(socket, 
      Observer<DataHandler, ShutdownNotification>(*this, &DataHandler::socketShutdown));
  }

  void socketReadable(ReadableNotification*) {
    char buffer[64];
    int n = 0;
    do {
      n = socket.receiveBytes(&buffer[0], sizeof(buffer));
      if (n > 0) {
        std::string s(buffer, n);
        outFile << s << std::flush;                                                 // (17)
      }
      else break;
    } while (true);
  }

  void socketShutdown(ShutdownNotification*) {
    delete this;
  }

 private:
  StreamSocket socket;
  SocketReactor& reactor;
  std::ofstream outFile;
};

class Server: public Poco::Util::ServerApplication {

 protected:
  void initialize(Application& self) {                                   // (3)
    ServerApplication::initialize(self);
  }
		
  void uninitialize() {                                                  // (4)
    ServerApplication::uninitialize();
  }

  int main(const std::vector<std::string>&) {                            // (2)
		
    ServerSocket serverSocketEcho(4711);                                 // (5)
    ServerSocket serverSocketData(4712);                                 // (6)
    SocketReactor reactor;
    SocketAcceptor<EchoHandler> acceptorEcho(serverSocketEcho, reactor); // (7)
    SocketAcceptor<DataHandler> acceptorData(serverSocketData, reactor); // (8)
    Thread thread;
    thread.start(reactor);                                               // (9)
    waitForTerminationRequest();
    reactor.stop();                                                      // (10)
    thread.join();
        
    return Application::EXIT_OK;

  }

};

int main(int argc, char** argv) {

  Server app;                                                             // (1)
  return app.run(argc, argv);

}

Line (1) generates the TCP server. The server performs the main function (line 2) and is initialized in line (3) and uninitialized in line (4). The main function of the TCP server creates two server sockets, listening on port 4711 (line 5) and port 4712 (line 6). Lines (7) and (5) generate the server sockets using the EchoHandler and the DataHandler. The SocketAcceptor models the Acceptor component of the Accepter-Connector design pattern. The reactor runs in a separate thread (line 9) until it gets its termination request (line 10). The EchoHandler registers its read handle in the constructor (line 11), and it unregisters its read handle in the member function socketReadable (line 12). The echo service sends the client’s message back (line 13). On the contrary, the DataHandler enables a client to transfer data to the server. The handler registers in its constructor its action for reading events (line 14) and shutdown events (line 15). Both handlers are unregistered in the destructor of DataHandler (line 16). The result of the data transfer is directly written to the file handle outFile (line 17).

The following output shows on the left the server and on the right both clients. A telnet session serves as a client. The first client connects to port 4711: telnet 127.0.0.1 4711. This client connects with the echo server and displays its request. The second client connects to port 4712: telnet 127.0.0.1 4712. The server’s output shows that the client data is transferred to the server.

reactorRun

What are the pros and cons of the Reactor?

Pros and Cons

Pros

  • A clear separation of framework and application logic.
  • The modularity of various concrete event handlers.
  • The Reactor can be ported to various platforms because the underlying event demultiplexing functions such as select, poll, epoll, kqueue, or WaitForMultipleObjects is available on Unix (select, epoll), and Windows platforms (WaitForMultipleObjects).
  • The separation of interface and implementation enables easy adaption or extension of the services.
  • Overall structure supports the concurrent execution.

Cons

  • Requires an event demultiplexing system call.
  • A long-running event handler can block the Reactor.
  • The inversion of control makes testing and debugging more difficult.

What’s Next?

There are many well-established patterns used in the concurrency domain. They deal with synchronization challenges such as sharing and mutation but also with concurrent architectures. In my next post, I will start with the patterns focusing on data sharing.