tutorial - Connection-Oriented Network ApplicationsThis is a beginner tutorial about writing connection-oriented network applications by using xSocket 2.0. If you need more help, don't hesitate to use xSocket's support forum. Core functionalityThe major abstraction to support a stream-oriented communication is the Connection. Data will be read and written by using a IBlockingConnection or a INonblockingConnection object. A connection object provides several convenience methods to read and write specific data types in a record or in a bulk mode.A connection also implements the GatheringByteChannel and WritableByteChannel interface of the java.nio package. If a InputStream or a OutputStream is required, the java.nio.Channels.newInputStream(<readableChannel>) and java.nio.Channels.newOutputStream(<writeableChannel>)method can be used to wrap the channel. Because the classic streams do have a blocking behaviour only the blocking channel IBlockingConnection should be mapped to a classic InputStream. Other types of provided methods are methods to control the connection behaviour and methods to retrieve information about the connection state. For instance the remote endpoint address can be retrieved or the flush behaviour of the connection can be controlled. The methods of both connection classes are not thread-safe.
1. a simple tcp/delimiter-based server exampleFirst, define the handler class which implements the desired handler interface e.g. IDataHandler, IConnectHandler, IIdleTimeoutHandler or IConnectionTimeoutHandler. The DataHandler will be called if data has been received for a connection (which will be passed over).class EchoHandler implements IDataHandler { public boolean onData(INonBlockingConnection nbc) throws IOException, BufferUnderflowException, MaxReadSizeExceededException { String data = nbc.readStringByDelimiter("\r\n"); nbc.write(data + "\r\n"); return true; } } Than create a instance of the server and assign the above handler // creates the server by passing over the port number & handler IServer srv = new Server(8090, new EchoHandler()); // run it within the current thread. srv.run(); // the call will not return // ... or start it by using a dedicated thread srv.start(); // returns after the server has been started... that`s it The in contrast to the run() method the server's start() method creates a dedicated thread to run the server. The start() method blocks internally until the server has been started. This is the preferred method to assure that the server has been started before performing further activities. To shutdown the server gracefully execute the server's close() method. Like the most connection-oriented artefacts, the server implements the java.io.Closeable interface.
2. Semantic of the DataHandler's onData methodThe IDataHandler's call back method onData will be called immediately if data fragments have been received. Please note, that on the network level data can be fragmented into several TCP segments as well as bundled into one TCP segment. Performing a write operation like connection.write("hello it's me. What I have to say is ...") on the client-side doesn't mean that exact one TCP segment arrives on the server-side. xSocket buffers the received network data by using an internal read buffer to hide this network caused behaivor. To get more information about the TCP protocol family see TCP/IP Illustrated, Volume 1: The Protocols. The data fragmentation causes that each NonBlockingConnection read method will throw a BufferUnderflowException, if not enough data is available. Depending on the execution mode no further data will be received on the network level within a pending onData method call (By using the NONTHREADED mode xSocket's internal network I/O thread performs the onDataMethod and is therefore not able to read more data on the network level at the same time. See the executing and synchronizing chapter for more information). It is a common pattern not to handle the BufferUnderflowException. xSocket will swallow this type of exception when the onData call returns by throwing a BufferUnderflowException. class EchoHandler implements IDataHandler { // this method will be called each time when data fragments have been received public boolean onData(INonBlockingConnection nbc) throws IOException, ClosedChannelException, BufferUnderflowException, MaxReadSizeExceededException { // don't handle the BufferUnderflowException, xSocket will swallow it byte[] bytes = nbc.readBytesByLength(500); //... return true; } } The onData method will be called again as long as unread data is available in xSocket's internal read buffer. This means, the onData method can also be called without receiving new network packets, as long as xSocket's internal read buffer is not empty. This loop stops only when the internal read buffer is emtpy or when no data is read by the onData method implementation. xSocket checks the internal read buffer for modifications (new network data has been added or data has been read) after each onData call to decide if the onData method should be called again. The onData will also be called, if the connection is closed. Equals to the buffer underflow handling it is a common pattern not to handle the ClosedChannelException and to implement the IDisconnectHandler interface to detection connection disconnecting. 3. Writing a blocking clientFor the client a IBlockingConnection can be used to simplify the socket handling. In contrast to the NonBlockingConnection a BlockingConnection doesn't support call back handler.IBlockingConnection bc = new BlockingConnection(host, port); String req = "Hello server"; bc.write(req + "\r\n"); // read the whole logical part by waiting (blocking) until // the required data have been received String res = bc.readStringByDelimiter("\r\n"); assert (req.equals(res)); 4. Writing a non-blocking clientBy performing the a BlockingConnection's read method, the method call blocks until data has been received or a time out has been occurred. To avoid this blocking behaviour the onBlockingConnection can be used the client. To do this, a client handler has to be defined which will be notified if network events occur (which implements IDataHandler and/or DisconnectHandler and/or...)// defining the client handler (here as a anonymous inner class) IDataHandler clientHandler = new IDataHandler() { public boolean onData(INonBlockingConnection nbc) throws IOException, BufferUnderflowException, MaxReadSizeExceededException { // read the whole logical part or throwing a BufferUnderflowException String res = nbc.readStringByDelimiter("\r\n"); //... return true; } }; // opening the connection INonBlockingConnection nbc = new NonBlockingConnection(host, port, clientHandler); nbc.write("Hello"); // do something else. Receiving data causes that the client // handler's onData method will be called (within a dedicated thread) //... 5. Wrap a non-blocking connection by a blocking connectionxSocket's implementation of the BlockingConnection uses internally a INonBlockingConnection instance. That means, a BlockingConnection is a wrapper which brings in the blocking behaviour of the read methods. For this reason a NonBlockingConnection can become a BlockingConnection at any time.INonBlockingConnection nbc = ... // wrapping an existing non-blocking connection by a BlockingConnection IBlockingConnection bc = new BlockingConnection(nbc); bc.readInt(); //... 6. Wrap a non-blocking connection on the server-sideIn most cases NonblockingConnections will be used on the server-side. If a BlockingConnection is required on the server side, a BlockingConnection will be created by wrapping the NonblockingConnection. Wrapping a NonblockingConnection leads, that the assigned handler to the Nonblocking connection will be removed by a internal handler of the BlockingConnection. Typically the onConnect() method will be used to create a BlockingConnection on the server-side. ... class ConnectHandler implements IConnectHandler { public boolean onConnect(INonBlockingConnection nbc) throws IOException { IBlockingConnection bc = new BlockingConnection(nbc); // return true; } ... 7. Handling connectBy implementing the IConnectHandler interface, the onConnect call back method will be performed immediately, if a new connection is established. This method will be called for one time only. Often this call back method will be used to perform (security) checks based on the new connection, to modifiy connection properties, to prepare resources, to set up backend connections or to write greeting messages. It is uncommon to read data within the onConnect() method, because it is not predictable that data have already received. class Handler implements IDataHandler, IConnectHandler { public boolean onConnect(INonBlockingConnection nbc) throws IOException { //... e.g. open resources return true; } public boolean onData(INonBlockingConnection nbc) throws IOException { //... return true; } } 8. Handling disconnectIf the handler implements the IDisconnectHandler interface, the onDisconnect call back method will be performed when the connection has been terminated (independent if the current process or the remote peer terminates the connection). Even though the connection will be passed over by calling the onDisconnect handler methods modifying/writing operation can't be performed, because the connection is already closed.A disconnect occurs in three ways:
class Handler implements IDataHandler, IDisconnectHandler { public boolean onDisconnect(INonBlockingConnection nbc) throws IOException { //... e.g. closing open resources return true; } public boolean onData(INonBlockingConnection nbc) throws IOException ByteBuffer[] data = connection.readByteBufferByDelimiter("\r\n"); // print out the received data ByteBuffer[] copy = new ByteBuffer[data.length]; for (int i = 0; i < data.length; i++) { copy[i] = data[i].duplicate(); } System.out.println(DataConverter.toString(copy)); //... return true; } } In the case of a disconnect the IDataHandler handler will also be called (before performing the IDisconnectHandler). Similar to native socket reads the INonBlockingConnection’s available() method will return -1, if the connection is closed. Performing a read method leads to a ClosedChannelException, which will be swallowed by xSocket if not handled.
9. Asynchronous ConnectIn general creating a new BlockingConnection or NonBlockingConnection is a synchronous operation. This means write or read methods can be called immediately after creating the connection. Merely, some constructors of the client-side NonBlockingConnection include a parameter waitForConnect. If this parameter is set to false, the constructor will return immediately without waiting until the connection is established. The listing below shows how a connection can be set up asynchronously. The handler also implements the IConnectExceptionHandler to handle a connect exception (connection failed, timeout exception, ...). class Handler implements IConnectHandler, IDataHandler, IConnectExceptionHandler { public boolean onConnect(INonBlockingConnection nbc) throws IOException { nbc.write("hello server\r\n"); //... return true; } public boolean onConnectException(INonBlockingConnection nbc, IOException ioe) throws IOException { //... return true; } public boolean onData(INonBlockingConnection nbc) throws IOException { //... return true; } } Handler hdl = new Handler(); INonBlockingConnection nbc = new NonBlockingConnection(InetAddress.getByName(host), port, hdl, false, 2000); //... // write operations are only valid after the connection has been established which is indicated by the onConect() callback
If you call a write method immediately after returning from the NonBlockingConnection constructor, race condition exists because it is not predictable if the connection is already established. The callback method onConnect() indicates that the connection is established. 10. Handling timeout'sxSocket supports detecting connection timeouts and idle timeouts. The connection timeout defines the max lifetime of connection. Independent of the traffic the connection will be closed by exceeding the connection timeout. The idle timeout defines the max idling time where no data have been received. If the idle time has been exceeded the connection will be closed. To avoid wasting resources caused by non responsive pending connections, it is highly recommended to set a proper idle time out. By default the idle timeout is set with the maximum value (~ 24 days).
// the handler class Handler implements IDataHandler, IIdleTimeoutHandler, IConnectionTimeoutHandler { public boolean onConnectionTimeout(INonBlockingConnection nbc) throws IOException { nbc.write("bye bye"); nbc.close(); return true; // prevent, that xSocket also closes the connection } public boolean onIdleTimeout(INonBlockingConnection nbc) throws IOException { nbc.write("What's going on? Why don't you send data?"); nbc.setIdleTimeoutMillis(30 * 1000); // resets the timeout counter return true; // prevent, that xSocket closes the connection } public boolean onData(INonBlockingConnection nbc) throws IOException { //... return true; } } // and the server IServer server = new Server(8090, new Handler()); server.setIdleTimeoutMillis(30 * 1000); // set the default idle timeout for server-side connections server.run(); 11. Synchronization of the callback methodsThe execution of the callback methods such as onConnect(...) or onData(...) will be synchronized based on the connection. This means for the same connection the executioba long running or never ending task within your onConnect method, the onData() method for the same connection will never be called. class Handler implements IConnectHandler, IDataHandler { public boolean onConnect(INonBlockingConnection nbc) throws IOException { nbc.write("hello server\r\n"); //... // DO NOT DO THIS! // this causes that onData() will never be called because executing of // callback methods is synchronized based on the connection while (true) { try { Thread.slepp(1000); } catch (InterruptedException ignore) { } nbc.write("I am alive\r\n"); } // You could define a TimerTask and run it within a Timer (thread) // to implement the "alive" issue return true; } public boolean onData(INonBlockingConnection nbc) throws IOException { String msg = nbc.readStringByDelimiter("\r\n"); //... return true; } } 12. Defining a connection-scoped handlerBy default handlers are instance-scoped. That means, the same handler instance will be used for each new incoming connection. A handler becomes connection-scoped by implementing the IConnectionScoped interface. This interface requires a clone method, which will be used to create a dedicated instance of the given handler for each new connection. To avoid side-effects, the clone method should perform a deep clone.class SmtpHandler implements IDataHandler, IConnectionScoped { private int handledMessages = 0; private SessionData sessionData = new SessionData(); public boolean onData(INonBlockingConnection nbc) throws IOException { //... return true; } // deep clone: all attributes beside primitives, immutable or // global manager/service references have also to be cloned public Object clone() throws CloneNotSupportedException { SmtpHandler copy = (SmtpHandler) super.clone(); copy.sessionData = (SessionData) this.sessionData.clone(); return copy; } } By declaring a handler as connection-scoped, the handler's variables become automatically connection-specific (if a deep clone is supported). Beside this implicit way of attaching data to a connection an explicit way is also supported by the attachment methods. 13. Attaching session-specific data to a connectionAn alternative approach to assign session date to the connection is attaching the data. Often this is the preferred method. A connection supports attaching connection-specific session data by the setAttachment(object), getAttachment() methods.class SmtpHandler implements IConnectHandler, IDataHandler { public boolean onConnect(INonBlockingConnection nbc) throws IOException { nbc.setAttachment(new SessionData()); return true; } public boolean onData(INonBlockingConnection nbc) throws IOException { SessionData sessionData = (SessionData) nbc.getAttachment(); //... return true; } } 14. Replacing a handler at runtimexSocket supports replacing a handler at runtime. This can be done for a specific connection or – in case of server-side handlers - for all new incoming connections. If the connection already exists, the handler will be replaced by calling the <connection>.setHandler(…) method. This will replace the handler of the current connection. class ServerHandlerA implements IDataHandler { public boolean onData(INonBlockingConnection nbc) throws IOException { String cmd = nbc.readStringByDelimiter("\r\n"); if (cmd.equals("switch")) { nbc.setHandler(new ServerHandlerB()); } else { nbc.write("A" + cmd + "\r\n"); } return true; } } On the server-side a handler is assigned to the server. This handler (or a clone -> connection-scoped handler) will be assigned to a new incoming connection by accepting it. You can replace the server’s handler by calling the <server>.setHandler(…) method. The server will be injected, if you will annotate it. class ServerSideHandler implements IDataHandler { @Resource private Server server; public boolean onData(INonBlockingConnection connection) throws IOException { String cmd = connection.readStringByDelimiter("\r\n"); if (cmd.equals("switch")) { server.setHandler(new ServerHandlerB()); connection.write("switched\r\n"); } else { connection.write("A" + cmd + "\r\n"); } return true; } } 15. Example: a simple length field-based handlerTo apply a length field based communication pattern, the connection's mark-support can be used. In this case the client will first write an "empty" length field. After writing the content data, the write pointer will be moved back to the length field, to override the length field.IBlockingConnection bc = new BlockingConnection(host, port); bc.setAutoflush(false); // mark support requires deactivated autoflush! bc.markWritePosition(); // mark current position bc.write((int) 0); // write "emtpy" length field int written = bc.write("hello world"); written += bc.write(" it's nice to be here"); //... bc.resetToWriteMark(); // return to length field position bc.write(written); // and update it bc.flush(); // flush (marker will be removed implicit) 16. ... and the serverFor the server-side a utility method of ConnectionUtils can be used.public boolean onData(INonBlockingConnection nbc) throws IOException, BufferUnderflowException { // validate that enough data is available (if // not an BufferUnderflowException will be thrown) int length = ConnectionUtils.validateSufficientDatasizeByIntLengthField(nbc); String text = nbc.readStringByLength(length); nbc.write(text); return true; }
17. A more complex example: multitpart dataOften a data record consists of multiple parts. For instance a data
record can start with some header data fields followed by the content
data. Typical the header also contains a content length field. IBlockingConnection bc = new BlockingConnection(host, port); bc.setAutoflush(false); bc.write(RECORD_TYPE_A); // record type bc.write((int) 2); // version bc.write(sign); // signature bc.write(length); // data length bc.flush(); bc.write(data); // data bc.flush(); String status = bc.readStringByDelimiter("\r\n"); //...
18. ... and the server ("transactional" read & dynamic handler replacing)As discussed above, data will be segmented on the network level. If the onData() method is called, it is not predictable how many bytes will be received. By calling a read method such as readInt() or readStringByDelimiter() a BufferUnderflowException can occur. To handle this, the read mark support can be used. class ProtocolHandler implements IDataHandler { public boolean onData(INonBlockingConnection connection) throws IOException { byte type = -1;; int version = -1; int signature = -1; int dataLength = 0 //////////// // "transaction" start // // mark read position connection.markReadPosition(); try { type = connection.readByte(); version = connection.readInt(); signature = connection.readInt(); dataLength = connection.readInt(); connection.removeReadMark(); } catch (BufferUnderflowException bue) { connection.resetToReadMark(); return true; } // // "transaction" end /////////////// if (type == 1) { connection.setHandler(new ContentHandler(this, dataLength, signature)); } else { //... } return true; } } class ContentHandler implements IDataHandler { private int remaining = 0; private ProtocolHandler hdl = null; public ContentHandler(ProtocolHandler hdl, int dataLength, int signature) { this.hdl = hdl; remaining = dataLength; //... } public boolean onData(INonBlockingConnection nbc) throws IOException { int available = nbc.available(); int lengthToRead = remaining; if (available < remaining) { lengthToRead = available; } ByteBuffer[] buffers = nbc.readByteBufferByLength(lengthToRead); remaining -= lengthToRead; // processing the data // ... if (remaining == 0) { nbc.setAttachment(hdl); nbc.write("ACCEPTED\r\n"); } return true; } } This listing above shows a simple multipart protocol. If data is received, the onData() method will be called. The read position reference points to the head of the internal read buffer.
By performing the markReadPosition() method the current read position will be marked. By marking the read position the result data of all following read operations will be duplicated and buffered internally by the read mark buffer. By calling the read methods the read position will be moved to the next unread data. If not enough data is available by performing a read method, a BufferUnderflowException will be thrown. In the example code this exception will be catched, the read position will be reset to the read mark and the onData() method will be exited.
If more data is recevied, the onData() method will be called again.
Within he onData() method the read methods will be performed again. By removing the read mark the "transaction" is completed. The removeReadMark() method cleans the read mark buffer. The read mark support can be used to implement a lightweight “transaction” (read all or nothing) support. The read mark support is not required, if only a single read operation will be called within the inData method. In the example below a dedicated handler will be used the read the content data. To do this, the current Protocol handler will be replaced at runtime by the dedicated content handler. 19. Unread SupportAn alternative approach to marking is the unread support. Based on the unread support an undo can be performed for a read operation. By performing the unread(…) method the data will be pushed back to the top of the internal read buffer. IBlockingConnection con = new BlockingConnection(host, port); String txt = con.readStringByLength(15); // check if record is the new AA type if (txt.indexOf("type=AA") != -1) int length = con.readInt(); // ... // .. no, than it have to be a old CA record } else { // perform unread and check for type CA con.unread(txt); // read CA record String caHeader = con.readStringByDelimiter(":"); if (!caHeader.equalsIgnoreCase("CA record")) { // error } // .. } 20. Concurrent access of a connection outside the callback threadInside the callback threads accessing the connection is thread-safe. If the connection will be accessed outside the callback thread in a concurrent way, the access have to be synchronized. The class ConnectionUtil supports a synchronizedConnection(...) method the wrapped by a synchronized delegator class. Such a thread-safe wrapper class synchronize each method based on the underlying connection. public class ServerHandler implements IConnectHandler, IDataHandler, IDisconnectHandler { private final Timer timer = new Timer(true); public boolean onConnect(INonBlockingConnection connection) throws IOException{ connection = ConnectionUtils.synchronizedConnection(connection); TimeNotifier notifier = new TimeNotifier(connection); timer.schedule(notifier, 10, 10); connection.setAttachment(notifier); return true; } public boolean onDisconnect(INonBlockingConnection connection) throws IOException { connection = ConnectionUtils.synchronizedConnection(connection); TimeNotifier notifier = (TimeNotifier) connection.getAttachment(); if (notifier != null) { notifier.cancel(); } return true; } public boolean onData(INonBlockingConnection connection) throws IOException { connection = ConnectionUtils.synchronizedConnection(connection); String cmd = connection.readStringByDelimiter("\r\n"); connection.write(cmd + ":4545\r\n"); return true; } } public class TimeNotifier extends TimerTask { private final INonBlockingConnection connection; public TimeNotifier(INonBlockingConnection connection) { this.connection = ConnectionUtils.synchronizedConnection(connection); } @Override public void run() { try { connection.write("CMD_TIME:" + System.currentTimeMillis() + "\r\n"); } catch (Exception e) { //... } } } Server server = new Server(new ServerHandler()); server.start(); 21. Using xSocket together with SpringThe xSocket artefacts are plain Java classes. For this reason it is very easy to use xSocket together with Spring. I the example below a server will be created, which listens on port 8787 and uses the declared EchoHandler.<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd"> <bean id="server" class="org.xsocket.connection.Server" scope="singleton"> <constructor-arg type="int" value="8787"/> <constructor-arg type="org.xsocket.connection.IHandler" ref="Handler"/> </bean> <bean id="Handler" class="org.xsocket.connection.EchoHandler" scope="prototype"/> </beans> The server bean can be retrieved by the standard Spring approach. BeanFactory beanFactory = new XmlBeanFactory(new FileSystemResource(file)); IServer server = (IServer) beanFactory.getBean("server"); server.start(); //... To autostart the server within the Spring configuration the Spring bean attribute init-method and destroy-method can be used. ... <bean id="server" class="org.xsocket.connection.Server" scope="singleton" init-method="start" destroy-method="close"> <constructor-arg type="int" value="8787"/> <constructor-arg type="org.xsocket.connection.IHandler" ref="Handler"/> </bean> ... In this case the server has already been started by retrieving it. BeanFactory beanFactory = new XmlBeanFactory(new FileSystemResource(file)); IServer server = (IServer) beanFactory.getBean("server"); // server is already started 22. Scripting supportxSocket have also been tested with script languages like JRuby, Groovy or jython. Known restrictions are (caused by the script language environment):
For example code see following table.
See also the article Scripting on the Java platformto see
how xSocket can be accessed inside scripting languages. 23. Connection poolingClient-side connection pools can improve the performance by avoiding the re-creation of connections. Typically, a connection pool will be used on the client-side, if connections for the same server (address) will be created in a serial manner in a sort period of time. By pooling such connections the overhead of establishing a connection will be avoided.A connection pool class exist for BlockingConnection as well as for NonblockingConnection. Both pools also support ssl connections. // create a limited connection pool BlockingConnectionPool pool = new BlockingConnectionPool(); pool.setMaxActive(10); IBlockingConnection bc = null; try { // retrieve a connection (if no connection // is in pool, a new one will be created) bc = pool.getBlockingConnection(host, port); bc.write("Hello"); //... // always close the connection! (the connection will be // returned into the connection pool) bc.close(); } catch (IOException ioe) { if (bc != null) { try { // if the connection is invalid -> destroy it // (it will not return to the pool) pool.destroy(bc); } catch (Exception ignore) { } } } 24. SSL support (static & on-demand)To run a server in a ssl mode, just a appropriated SSLContext object has to pass over within the constructor. By setting the sslOn parameter with true, the server will be start up in the ssl mode.
SSLContext sslContext = SSLContext.getDefault(); // getDefault is a JSE 1.6 method IServer sslTestServer = new Server(port, new EchoHandler(), sslContext, true); sslTestServer.start(); // manually ssl activation (connection.activateSecuredMode) is not necessary! client-side example IBlockingConnection bc = new BlockingConnection(host, port, sslContext, true); //... If the ssl mode should be started later - on-demand - the sslOn
parameter has to be set with false. The same is true for client-side
connections. client-side example (on-demand activation) IBlockingConnection bc = new BlockingConnection(host, port, sslContext, false); //... bc.activateSecuredMode(); //... server-side example (on-demand activation) IServer sslTestServer = new Server(port, new EchoHandler(), sslContext, false); sslTestServer.start(); // handler implementation class EchoHandler implements IDataHandler { public boolean onData(INonBlockingConnection nbc) throws IOException { //... // activate ssl mode on demand (-> only necessary if // server started with flag false) nbc.activateSecuredMode(); //... return true; } }
Often upgrading to secured mode is combined with sending a plain response message to confirm that the connection will be upgraded. In this case it has to assure that no messages will be received between sending the plain confirm message and activating the secured mode. To do this, two approaches exits: Control the flush behaviour on the user-level. // safe ssl upgrade by using user-level flushing class MyHandler implements IDataHandler { public boolean onData(INonBlockingConnection nbc) throws IOException { nbc.write(SECURED_MODE_ACTIVATED); // (prepare) sending the plain confirm message nbc.activateSecuredMode(); // xSocket sends the message in plain mode, because it has // been written before the secured mode has been activated nbc.flush(); return true; } }
Or suspend and resume reading. // safe ssl upgrade by suspend and resume (and default autoflush) class MyHandler implements IDataHandler { public boolean onData(INonBlockingConnection nbc) throws IOException { nbc.suspendRead(); nbc.write(SECURED_MODE_ACTIVATED); nbc.activateSecuredMode(); nbc.resumeRead(); return true; } }
25. FlushingBy default the autoflush is activated. That means calling a write method will transfer the data to the underlying subsystem (OS-internal write socket buffer), immediately. By calling the connection's setAutoflush(false) method, the flush behaviour will be controlled manually. For server-side handler this will typically done within the onConnect call back method or by calling the server's setAutoflush(false) method.Autoflush example (default behaviour) //client-side IBlockingConnection bc = new BlockingConnection(host, port); bc.write(text + "\n"); String response = bc.readStringByDelimiter("\n"); //server-side class ServerHandler implements IDataHandler { public boolean onData(INonBlockingConnection nbc) throws IOException { String text = nbc.readStringByDelimiter("\n"); nbc.write(text); nbc.write("\n"); return true; } } user-managed flush example //client-side IBlockingConnection bc = new BlockingConnection(host, port); bc.setAutoflush(false); bc.write(text + "\n"); bc.flush(); String response = bc.readStringByDelimiter("\n"); //server-side class ServerHandler implements IConnectHandler, IDataHandler { // set autoflush with false (this behaviour can also // be controlled by server settings) public boolean onConnect(INonBlockingConnection nbc) throws IOException { nbc.setAutoflush(false); return true; } public boolean onData(INonBlockingConnection nbc) throws IOException { String text = nbc.readStringByDelimiter("\n"); //... nbc.write(text); nbc.write("\n"); //... nbc.flush(); return true; } } While auto flush automatically flushes the data by performing a write operation, the FlushMode controls the flush behaviour. If the FlushMode is set to ASYNC (default is SYNC), the data will be transferred to the underlying connection in a asynchronous way. By using the WritableByteChannel interface methods write(ByteBuffer) and write(ByteBuffer[]) some restriction exits. Calling such a write method in mode ASYNC causes that the byte buffer will be read asynchronously by the internal I/O thread. If the byte buffer will be accessed (reused) after calling the write method, race conditions will occur. The write(ByteBuffer) and write(ByteBuffer[]) should only called in ASYNC mode, if the byte buffer will not be accessed (reused) after the write operation. In the example below a ByteBuffer will be reused serveral times to copy data. In this case the ASYNC can not be used to write the data. If the ASYNC mode is used, race conditions will occur. ... File file = new File(filename); RandomAccessFile raf = new RandomAccessFile(file, "r"); ReadableByteChannel fc = raf.getChannel(); IBlockingConnection connection = new BlockingConnection(host, port); // using a copy buffer (which will be reused for the read operations) // requires FlushMode SYNC which is default (for writing)! ByteBuffer copyBuffer = ByteBuffer.allocate(4096); int read = 0; while (read >= 0) { // read channel read = fc.read(copyBuffer); copyBuffer.flip(); if (read > 0) { // write channel connection.write(copyBuffer); if (copyBuffer.hasRemaining()) { copyBuffer.compact(); } else { copyBuffer.clear(); } } } ... By using FlushMode ASYNC you have to ensure that the ByteBuffer will not be modified or reused after passing it over by calling the write method. 26. Performance recommendations[BlockingConnection & NonBlockingConnection] Using direct ByteBuffer to read the incoming socket data can improve the performance. Using direct ByteBuffer will be activated by setting the system property usedirect with true:
By default these properties are set with false because some vm implementations seems to have problems by gc direct buffers. [BlockingConnection & NonBlockingConnection] By
setting autoflush with false (default is
true), the flushing can be controlled manually. Because the flush
performs a write operation and synchronization on the underlying
connection, the flush operation is expensive. Especially in the case of
multiple writes within one "transaction", user managed flush can
improve the performance. By using the connection's write(ByteBuffer)
and write(ByteBuffer[]) methods some restriction exits. Please see the
API doc for more information. IBlockingConnection bc = ... bc.setAutoflush(false); bc.write(header); bc.write(fragment1); bc.write(fragement2); //... bc.flush(); [NonBlockingConnection only] By setting the flush mode with ASYNC (default is SYNC) the data will be transferred to the underlying OS-internal socket send buffer in an asynchronous way. By setting the flush mode with ASYNC the worker thread will not be synchronized with the xSocket-internal I/O-Thread. Please take care by setting flush mode to ASYNC. If you access the buffer after writing it, race conditions will occur. Setting the flushmode with SYNC doesn't mean, that the peer has received the data. It just says, that the data has been transfered to the OS-internal socket send buffer. If the not enough space is available in the OS-internal socket send buffer and the SYNC mode is set, the flush operation will block until socket buffer space becomes free. The OS will only write data on the network level, if the TCP flow control system (sliding window mechanism) signals that the receiving peer is able to receive more data. In case of the ASYNC mode, the outgoing data will be buffered by using the (unbound) xSocket internal write buffer. This means, independent if the receiving peer is able to receive data or not, the flush operation will never block in the ASYNC mode. INonBlockingConnection nbc = ... nbc.setFlushmode(FlushMode.ASYNC); The FileChannel supports performance optimized transfer methods transferFrom(...) and transferTo(...) which bases implicit on synchronous flushing. By transferring file channel data to the connection, the connection's transferFrom(...) method should used instead of the file channel's transferTo(...) method. Based on the current flush mode of the connection the connection's transferFrom(...) method decide internally if the file channel's transferTo(...) should be used (flush mode = SYNC) or not. The connection's transferFrom(...) method will chose the best approach. In general you get a better performance by using the ASYNC flush mode. INonBlockingConnection nbc = ... FileChannel fc = ... // using nbc.transferFrom(fc); // instead of fc.transferTo(0, fc.size(), nbc);
INonBlockingConnection nbc = ... FileChannel fc = ... // using nbc.transferTo(fc, length); // instead of fc.transferFrom(nbc, 0, length); Choose the proper worker pool type (bound
pool, unbound pool, ...) and sizing. The proper worker pool size
depends on the concrete handler implementation. If the handler performs
long running blocking calls like file I/O or network calls which
consume much time by waiting and idling, a larger worker pool size
should be set. If the worker pool size has been configured to small,
call backs of concurrent connections couldn't performed by waiting for
free worker threads. In this case the efficiency is poor, because the
taken worker threads spend the most time by waiting. Concurrent call
backs can't be performed caused by missing worker threads and system
resources like CPUs are partly unused. On the other hand, if the worker
pool size is configured to high, the operating system will spend much
time switching between threads, rather than execute them. 27. Receive and send buffer size (network flow control)The operation system uses socket buffers (SO_RCVBUF, SO_SNDBUF) to buffer incoming and outgoing network data. These buffers defines the TCP receive window, which specifies the amount of data that can be sent without interrupting the data exchange. For instance if the high water mark of the receive socket buffer is reached, the sender will stop sending network data. The mechanism that controls data transfer interruptions is referred to as flow control. The socket buffers size depends on the operation system (configuration), and can be modified at runtime by the IConnection’s setOption() method. Normally the default buffer size is 8 KB.
Based on the operation system-level receive and send buffer, a second-level receive and send buffer exits on the application-level. If data is received, the network data (reference) will be copied from the operation system’s SO_RCVBUF buffer into xSocket's second-level buffer. By default the secod-level buffer is unlimited. In this case the tcp flow control will never stop receiving data from the sender. To avoid to be attacked by a malicious sender, the second-level buffer can be limited by the maxReceiveBufferThreshold threshold -> setMaxReadBufferThreshold(size). The threshold will be check immediately after copying the SO_RCVBUF into the application-level receive buffer. If the application-level receive buffer exceeds the threshold, the network read operation will be suspended. The network read operation will be resumed automatically, if the application-level becomes lower than the maxReceiveBufferThreshold. 28. Avoiding large send buffersA slow receiving peer causes that the outgoing data will be buffered in xSocket's second-level send buffer. To avoid large second-level send buffers, caused by a slow receiving peer, the sync flush mode (which is default) can be used. In this case the write method returns after the data is written to the os-level SO_SNDBUF. IBlockingConnection con = new BlockingConnection(host, port); // or // INonBlockingConnection con = new NonBlockingConnection(host, port); // con.setFlushmode(FlushMode.SYNC); not necessary because it is default RandomAccessFile file = new RandomAccessFile(filename, "r"); FileChannel channel = file.getChannel(); ByteBuffer transferBuffer = ByteBuffer.allocate(4096); int read = 0; do { transferBuffer.clear(); read = channel.read(transferBuffer); transferBuffer.flip(); if (read > 0) { // blocks until data is written into the os-level's SO_SNDBUF con.write(transferBuffer); } } while (read > 0); channel.close(); file.close(); //... This will avoid large second-level send buffers. However, caused by the TCP flow control the write (flush) method will be blocked until the receiving peer consumes the data. The current thread will be suspended. The INonBlockingConnection and IBlockingConnections also supports the IWriteCompletionHandler which helps to avoid blocking threads. The IWriteCompletionHandler defines call back methods which will be called if the data is written to the os-level SO_SNDBUF od if an error occurs. Normally the IWriteCompletionHandler will only used for connection which are in async flush mode. Caused by the asnc flush mode the write method returns immediately. Is the data is written, or an exception is occurred, the call back method onWritten(…) or onException(…) will be called. The example below shows how a e IWriteCompletionHandler can be used to send data in a sync mode by avoiding large send buffers. IBlockingConnection con = new BlockingConnection(host, port); // or // INonBlockingConnection con = new NonBlockingConnection(host, port); AsyncWriter asyncWriter = new AsyncWriter(filename, con); asyncWriter.write(); // .. class AsyncWriter implements IWriteCompletionHandler { private final INonBlockingConnection con; private final ReadableByteChannel channel; private final RandomAccessFile file; private AtomicBoolean isComplete = new AtomicBoolean(false); private ByteBuffer transferBuffer = ByteBuffer.allocate(4096); private IOException ioe = null; AsyncWriter(String filename, INonBlockingConnection con) throws IOException { this.con = con; file = new RandomAccessFile(filename, "r"); channel = file.getChannel(); con.setFlushmode(FlushMode.ASYNC); } void write() throws IOException { writeChunk(); } private void writeChunk() throws IOException { transferBuffer.clear(); int read = channel.read(transferBuffer); transferBuffer.flip(); if (read > 0) { con.write(transferBuffer, this); } else { con.close(); channel.close(); file.close(); isComplete.set(true); } } public void onWritten(int written) throws IOException { writeChunk(); } public void onException(IOException ioe) { this.ioe = ioe; isComplete.set(true); } boolean isComplete() throws IOException { if (ioe != null) { throw ioe; } return isComplete.get(); } }
29. Configure the serverMay be you want to configure the server. Dynamic
configuration parameters can be set by calling the
appropriated setter. // define the timeout, in which data have to be received srv.setIdleTimeoutMillis(2 * 60 * 1000); // define the timeout for the connection srv.setConnectionTimeoutMillis(30 * 60 * 1000); // set the default autoflush behaviour of the // server-side created connections srv.setAutoflush(false); You could also configure the work pool. The worker pool is used to perform the handler's call back methods such as onData or onConnect. After reading the data from the socket, the xSocket-internal Dispatcher starts a pooled worker thread to perform the proper call back method. The Dispatcher (I/O thread) is responsible to perform the socket read & write I/O operations and to delegate the call back handling. By default number of CPUs + 1 dispatchers will be created. A connection is bound to one dispatcher during the total lifetime.
xSocket uses the worker pool only to perform the handler's call back
method. The pool can be set by calling the appropriate setter method. A
worker pool has to implement the java.util.concurrent.Executor
interface. If no worker pool will be set, a
FixedThreadPool is used (see java.util.concurrent.Executors). // setting a fixed worker pool with 20 threads srv.setWorkerpool(Executors.newFixedThreadPool(20)); //... For a deeper look into multithreaded architectures see Architecture of a Highly Scalable NIO-Based Server Static configuration parameters will be set by the appropriated system properties. E.g.
Following system properties are supported:
30. Executing and synchronizing call back handler methodsThe call back handler method will be called in a synchronized context. Calling the methods will be synchronized based on the related connection. That means, the andler call back methods will always be called in a serialized manner for the same connection. Further more call back methods will always be executed in the right order (onConnect, onData, ..., onDisconnect).By default worker threads will be used to perform the the call back methods. By annotating the handler with the execution mode NONTHREADED the handler call back method will be performed by the xSocket-internal I/O thread. This can increase the performance and scalability, because non threading-caused context switchs are required. Doing this is only reasonable if unexceptional non blocking CPU bound operations will be performed by the handler implementation. If the handler performs I/O operations such as file or network operations, the current thread can be suspended within the operation call. That means the xSocket-internal single I/O thread will be suspended, and the server is blocked! Please consider that some library calls performs I/O bound or synchronized operations implicitly. For instance a log framework which logs the output to a file could perform blocking I/O operations internally. The flush mode should be set to ASYNC if the execution mode is NONTHREADED. The combination flush mode SYNC and execution mode NONTHREADED can cause deadlocks.
@Execution(Execution.NONTHREADED) class NonThreadedHandler implements IConnectHandler, IDataHandler { @Execution(Execution.MULTITHREADED) // overrides the class-level mode public boolean onConnect(INonBlockingConnection nbc) throws IOException { IBlockingConnection bc = new BlockingConnection(host, port); bc.write("CMD_GET_CURRENT_RATE\r\n"); int rate = bc.readInt(); // ... return true; } // inherits the class-level mode NONTHREADED public boolean onData(INonBlockingConnection nbc) throws IOException { int y = nbc.readInt(); int x = .... compute something without I/O nbc.write(x); return true; } } 31. Troubleshooting (logging, common problems, ...)- Logging xSocket uses the built-in logging
capability of Java SE. To configure the logging behaviour by the
logging.properties file see Configuring Logger Default Values with a
Properties File. You can also set the logging configuration
programmatically. // activate xSocket logging (for namespace org.xsocket.connection) Logger logger = Logger.getLogger("org.xsocket.connection"); logger.setLevel(Level.FINE); ConsoleHandler ch = new ConsoleHandler(); ch.setLevel(Level.FINE); logger.addHandler(ch); - Too many open files A common problem by opening/closing plenty of connections within a short time is the Too Many Open Files error, caused by (the configuration of) the underlying operation system. The Too Many Open Files support pattern describes this problem and how to handle it. By using Windows see also Tuning Windows
- ASYNC Flushmode Setting the flushmode to ASYNC can improve the performance. However, some strong restriction exits by using the ASYNC flushmode. The restrictions are discussed above. Take care by setting the flushmode to ASYNC. - Thread safety Both, the NonBlockingConnection and the BlockingConnection are not thread-safe. Please also note, that using the marks support changes a internal state of the connection, which have to be synchronized on application level. // can be called by concurrent threads private void sendMessage(String msg) { for (INonBlockingConnection nbc : connections) { // synchronizing by the connection instance synchronized (nbc) { nbc.markWritePosition(); // mark current position nbc.write((int) 0); // write "emtpy" length field int written = bc.write(System.currentMillis()); // int written += bc.write(msg); nbc.resetToWriteMark(); // return to length field position nbc.write(written); // and update it nbc.flush(); } } }
32. JMX supportThe ConnectionUtils class supports methods to register the related MBeans of IServer or IConnectionPool implementation on the MbeansServer// JMX support for a Server IServer srv = new Server(8090, new Handler()); // registers the server's mbeans on the platform MBeanServer ConnectionUtils.registerMBean(srv); srv.start(); // JMX support for a ConnectionPool BlockingConnectionPool pool = new BlockingConnectionPool(); // register the pool mbeans on the platform MBeanServer ConnectionUtils.registerMBean(pool); By registering the server, xSocket creates also an MBean for the
assigned handler. By doin this, all getter and setter methods of the
handler will be exported, which are not private. After registering the artefacts tools like JConsole can be used to
monitor |