Connection-Oriented Network Applications (covers V1.2.x)

This is a small tutorial about how to start writing stream-oriented network applications by using xSocket. If you need more help, don’t hesitate to use xSocket’s support forum.

 

The major abstraction of stream-oriented communication is the Connection.

 

Data will be written and read by using a IBlockingConnection or a INonblockingConnection. In contrary to the IBlockingConnection the INonBlockingConnection returns immediately by calling the read method. To be notified about new incoming data a IDataHandler can be assigned to a INonBlockingConnection. The call back method of the handler will be performed if the corresponding event occurs. Beside the IDataHandler additional handlers like IConnectHandler exists.

 

The server-side (MultithreadedServer) always works on INonblockingConnection to handle the incoming connections.

 

1. a simple tcp/delimiter-based server example
First, define the handler class which implements the desired handler interface e.g. IDataHandler, IConnectHandler or ITimeoutHandler. The DataHandler will be called if data has been received for a connection (which will be passed over).

class EchoHandler implements IDataHandler {


  public boolean onData(INonBlockingConnection con) throws IOException {
     String data = con.readStringByDelimiter("\r\n");

     con.write(data + "\r\n");

     return true;

  }

}

                                                                                                                  


Than create a instance of the server (
MultithreadedServer) and assign the above handler

// creates the server by passing over the port number and the handler
IMultithreadedServer srv = new MultithreadedServer(8090, new EchoHandler());

 

// run in the context of the current thread

srv.run();

that's it

 

2. semantic of the DataHandler’s onData method
The IDataHandler’s call back method onData will be called if data has been received. Please note, that on network level data can be fragmented on several TCP packets as well as data can be bundled into one TCP packet. Performing a write operation like connection.write(“hello it’s me. What I have to say is….”) on the client side doesn’t mean that exact one onData call occurs on the server side. A common pattern to solve this is to identify logical parts by a delimiter or a leading length field. xSocket provides methods to support this pattern.

  public boolean onData(INonBlockingConnection con) throws IOException, BufferUnderflowException {
     // read only the whole logical part

     // or throwing a BufferUnderflowException

     String data = con.readStringByDelimiter("\r\n");

     con.write(data + "\r\n");

     return true;

  }


The INonBlockingConnection‘s readStringByDelimiter(String) method only returns a record if the whole part (marked by the delimiter) has been received, or if not, a BufferUnderflowException will be thrown. In contrast to IBlockingConnection, a INonBlockingConnection read method always returns immediately and could thrown a BufferUnderflowException.

The BufferUnderflowException will be swallowed by xSocket, if the DataHandler implementation doesn’t catch this exception. It is a common pattern not to handle such an exception by the DataHandler.

The same is true for the INonBlockingConnection‘s readStringByLength(int) method.

 

 

3. writing a blocking client 
For the client a IBlockingConnection can be used to simplify the socket handling

IBlockingConnection con = new BlockingConnection(host, port);
 
String req = "Hello server";
con.write(req + "\r\n");
 
String res = con.readStringByDelimiter("\r\n");
assert (request.equals(res));

 

 

4. writing a non blocking client
By performing the a BlockingConnection’s read method, the method call blocks until data has been received or a time out has been occurred. To avoid this blocking behaviour the NonBlockingConnection can be used the client. To do this, a client handler has to be defined which will be notified if network events occur (which implements IDataHandler and/or IDisconnectHandler and/or…) 

 

// defining the client handler (here as a anonymous inner class)

IDataHandler clientHandler = new IDataHandler() {

  public boolean onData(INonBlockingConnection connection) throws IOException, BufferUnderflowException, MaxReadSizeExceededException {

    String response = connection.readStringByDelimiter("\r\n");

   

    return true;

  }

};

 

// opening the connection

INonBlockingConnection connection = new NonBlockingConnection(host, port, clientHandler);

connection.write(…);

 

// do something else. Receiving data causes that the client handler’s onData method will be called (within a dedicated thread)

 

 

5. configure the server & worker pool
May be you want to configure the server. You can do this by calling the appropriated setter

// define the timeout, in which data has to be received
srv.setIdleTimeoutSec(2 * 60);

// define the timeout for the connection
srv.setConnectionTimeoutSec(30 * 60);

 

You could also configure the work pool. The worker pool is used to perform the handler’s call back methods such as onData or onConnect. xSocket internal tasks like socket read/write or background activities will be performed by xSocket internal threads. These threads can’t be set or modified from the outside.

xSocket uses the worker pool only to perform the handler’s call back method. The pool can be set within the server’s constructor. A worker pool has to implement the java.util.concurrent.Executor interface. If a server constructor will be used, which doesn’t include a worker pool argument, a FixedThreadPool will be created by default (see java.util.concurrent.Executors). If the worker pool is set with null, the multithreading modus is switched off. That means the call back methods will be performed by the xSocket internal dispatcher thread, which is responsible to perform the socket read/write operations.

 

// setting a fixed worker pool with 20 threads

IMultithreadedServer srv = new MultithreadedServer(8090, new EchoHandler(), Executors. newFixedThreadPool(20));

 

 

6. define a connection-scoped handler
By default handlers are instance-scoped. That means, the same handler instance will be used for each new incoming connection. A handler becomes connection-scoped by implementing the IConnectionScoped interface. This interface requires a clone method, which will be used to create a dedicated instance of the given handler for each new connection. To avoid side-effects, the clone method has to perform a deep clone.

class SmtpHandler implements IDataHandler, IConnectionScoped {

  public boolean onData(INonBlockingConnection con) throws IOException {
   ...
  }

  public Object clone() throws CloneNotSupportedException {
   return super.clone();
  }
}

 

7. attaching session-specific data to a connection
A connection allows to attach (connection-specific session) data by using the setAttachment(attch), getAttachment() methods:

class SmtpHandler implements IConnectHandler, IDataHandler {

  public boolean onConnect(INonBlockingConnection con) throws IOException {
     con.setAttachment(new MySessionData());

     return true; ...
  }

  public boolean onData(INonBlockingConnection con) throws IOException {
     MySessionData sessionData = (MySessionData) con.getAttachment();

     ….
  }
}

 

 

8. another example: a simple tcp/length field-based handler
To apply a length field based communication pattern, the connection’s mark-support can be used. In this case the client will first write an “empty” length field. After writing the content data, the write pointer will be moved back to the length field, to override the length field.

IBlockingConnection con = new BlockingConnection(host, port);
con.setAutoflush(false); // mark support requires deactivated autoflush!

con.markWritePosition();  // mark current position
con.write((int) 0);       // write "emtpy" length field

int written = con.write("hello world");
written += con.write(" it’s nice to be here");

con.resetToWriteMark();  // return to length field position
con.write(written);      // and update it

con.flush(); // flush (marker will be removed implicit)

 

9. … and the server
For the server-side a utility method of StreamUtils can be used.

 

public boolean onData(INonBlockingConnection connection) throws IOException, BufferUnderflowException {
   // validate that enough data is available (if not an BufferUnderflowException will be thrown)

   int length = StreamUtils.validateSufficientDatasizeByIntLengthField(connection);

   String text = connection.readStringByLength(length);

 

   connection.write(text);

   return true;

}

 

10. connection pooling
Client-side connection pools can improve the performance by avoiding the re-creation of connections. Typically, a connection pool will be used on the client-side, if connections for the same server (address) will be created in a serial manner in a sort period of time. By pooling such connections the overhead of establishing a connection will be avoided.

// create a unlimited connection pool with idle timeout 1 min

BlockingConnectionPool pool = new BlockingConnectionPool(60 * 1000);

 

IBlockingConnection con = null;

 

try {

   // retrieve a connection (if no connection is in pool, a new one will be created) 

   con = pool.getBlockingConnection(host, port);

   con.write("Hello");

   ...

  

   // always close the connection! (the connection will be returned into the connection pool)

   con.close();

  

} catch (IOException) {

   if (con != null) {

     try {

       // if the connection is invalid -> destroy it (it will not return into the pool)

       pool.destroyConnection(con);

     } catch (Exception ignore) { }

   }

}

 

 

 

11. flushing
By default the autoflush is activated. That means calling a write method will transfer the data to the underlying subsystem, immediately. By calling the setAutoflush(false) method, the flush behaviour will be controlled manually. For server-side handler this will typically done within the onConnect call back method.

 

autoflush example (default):

 //client-side
 IBlockingConnection connection = new BlockingConnection(host, port);
 connection.write(text + “\n”);

 String text = connection.readStringByDelimiter(“\n”);



 //server-side
 class ServerHandler implements IDataHandler {

   public boolean onData(INonBlockingConnection connection) throws IOException {

      String text = connection.readStringByDelimiter(“\n”);
      connection.write(text)
      connection.write(“\n”);
      return true;

   }
 }

 

user-managed flush example:
 //client-side
 IBlockingConnection connection = new BlockingConnection(host, port);
 connection.setAutoflush(false);

 connection.write(text + “\n”);

 connection.flush();
 String text = connection.readStringByDelimiter(“\n”);



 
//server-side
 class ServerHandler implements IConnectHandler, IDataHandler {

   public boolean onConnect(INonBlockingConnection connection) throws IOException {

      connection.setAutoflush(false);

      return true;

   }

   public boolean onData(INonBlockingConnection connection) throws IOException {

      String text = connection.readStringByDelimiter(“\n”);
     
      connection.write(text)
      connection.write(“\n”);
     
      connection.flush();

      return true;

   }
 }


 

12. handler and synchronization
By default the handler is synchronized on the connection-level. That means, for the same connection the handler call back methods will be called always in a serialized manner. Further more call back methods will always be executed in the right order (onConnect, onData, …, onDisconnect).

Sometime it is desirable that the synchronization should be done by the handler implementation itself. The synchronization can be deactivated by annotate the handler as non-synchronized. In this case the case, the order of the call back method calls isn’t guaranteed. E.g. under some circumstance it could happen, that a onData method will be performed before onConnect.


@Synchronized(Synchronized.Mode.OFF)

private final class NonSynchronizedHandler implements IDataHandler {

 

   public boolean onData(INonBlockingConnection connection) throws IOException{

     

   }

}

 

 

13. performance recommendations
[Connection] By setting autoflush with false (default is true), the flushing can be controlled manually. Because the flush performs a write operation and synchronization on the underlying connection, the flush operation is quite expensive. Especially in the case of multiple writes within one “transaction”, user managed flush can improve the performance. By using the connection’s write(ByteBuffer) and write(ByteBuffer[]) methods some restriction exits. Please see the API doc for more information.

 

   connection.setAutoflush(false);

   connection.write(header);
   connection.write(fragment1);

   connection.write(fragement2);

  

   connection.flush();

 

[NonBlockingConnection only] By setting the flushmode with ASYNC (default is SYNC) the data will be transferred to the underlying connection in an asynchronous way. Often the performance improvements can be very high. By using the connection’s write(ByteBuffer) and write(ByteBuffer[])methods some restriction exits. Please see the API doc for more information.

 

   nonblockingConnection.setFlushmode(FlushMode.ASYNC);

Choose the proper worker pool type (bound pool, unbound pool, …) and sizing. The proper worker pool size depends on the concrete handler implementation. If the handler performs long running blocking calls like file I/O or network calls which consume much time by waiting and idling, a larger worker pool size should be set. If the worker pool size has been configured to small, call backs of concurrent connections couldn’t performed by waiting for free worker threads. In this case the efficiency is poor, because the taken worker threads spend the most time by waiting. Concurrent call backs can’t be performed caused by missing worker threads and system resources like CPUs are partly unused. On the other hand, if the worker pool size is configured to high, the operating system will spend much time switching between threads, rather than execute them.

 

To configuring the socket parameters by the <IConnection>.setOption(…) method see also Boost socket performance on Linux.

 

 

14. SSL support (static & on-demand)
To run a server in a ssl mode, just a appropriated SSLContext object has to pass over within the constructor (see SSLTestContextFactory as an example to create such an SSLContext object). By setting the sslOn parameter with true, the server will be start up in the ssl mode. If the ssl mode should be started later - on-demand - the parameter has to be set with false.

The same is true for client-side connections.

 

 

server-side example:

IMultithreadedServer sslTestServer = new MultithreadedServer(port, new EchoHandler(), true, sslContext);

StreamUtils.start(sslTestServer);

 

// manually ssl activation (connection.activateSecuredMode) is not necessary!

 

 

server-side example: (on-demand activation)

IMultithreadedServer sslTestServer = new MultithreadedServer(port, new EchoHandler(), false, sslContext);

StreamUtils.start(sslTestServer);

 

class EchoHandler implements IDataHandler {


  public boolean onData(INonBlockingConnection con) throws IOException {
    

     con.activateSecuredMode();  // activate ssl mode on demand (-> only necessary if server started with flag false)

    

     return true;

  }

}

 

 

client-side example:

IBlockingConnection connection = new BlockingConnection(port, sslContext, true);

 

 

client-side example (on-demand activation):

IBlockingConnection connection = new BlockingConnection(port, sslContext, false);

connection.activateSecuredMode();

 

 

15. script language support
xSocket have also been tested with script languages like JRuby, Groovy or jython. Known restrictions (caused by the script language environment):

 

See the examples for more information.

 

 

16. troubleshooting (logging, common problems, …)
xSocket uses the built-in logging capability of Java SE. To configure the logging behaviour by the logging.properties file see Configuring Logger Default Values with a Properties File. You can also set the logging configuration programmatically:

 

// activate xSocket logging (for namespace org.xsocket.stream)

Logger logger = Logger.getLogger("org.xsocket.stream");

logger.setLevel(Level.FINE);

                

ConsoleHandler ch = new ConsoleHandler();

ch.setLevel(Level.FINE);

logger.addHandler(ch);

 

 

A common problem by opening/closing plenty of connections within a short time is the Too Many Open Files error, caused by (the configuration of) the underlying operation system. The Too Many Open Files support pattern describes this problem and how to handle it. By using Windows see also Tuning Windows.

 

 

17. JMX and xSocket
To support monitoring by JMX, xSocket includes a MultithreadedServerMBeanProxyFactory class. By calling the createAndRegister method the major components of a xSocket stream server will be registered as MBeans.

 
public void launch(int port) {
  // start the jmx environment (server)
  startJmxServer("test", 1099);

  // create the server, start it
  MultithreadedServer server = new MultithreadedServer(port, new MyHandler());
  StreamUtils.start(server);
 
  // … and register the server mbeans on the PlatformMBeanServer (jmx environment)
  MultithreadedServerMBeanProxyFactory.createAndRegister(server, "MyTestServer");
 
}

// help method to start the jmx server
private static void startJmxServer(String name, int port) {
  // start the rmi registry
  try {
   Registry registry = LocateRegistry.createRegistry(port);
   registry.unbind(name);
  } catch (Exception ignore) {  }

  // start the jmx server
  try {
    JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + InetAddress.getLocalHost().getHostName() + ":" + port + "/" + name);
    MBeanServer mbeanSrv = ManagementFactory.getPlatformMBeanServer();
    JMXConnectorServer server = JMXConnectorServerFactory.newJMXConnectorServer(url, null, mbeanSrv);
    server.start();
    System.out.println("JMX RMI Agent has been bound on address");
    System.out.println(url);
  } catch (Exception e) { }
}


By running jconsole you will get: