This is a small tutorial about how to start
writing stream-oriented network applications by using xSocket. If you need more
help, don’t hesitate to use xSocket’s support
forum.
The
major abstraction of stream-oriented communication is the Connection.
Data
will be written and read by using a IBlockingConnection or a INonblockingConnection. In contrary to the IBlockingConnection the INonBlockingConnection returns immediately by
calling the read method. To be notified about new incoming data a IDataHandler can be assigned to a
INonBlockingConnection. The call back method of the handler will be performed if the
corresponding event occurs. Beside the IDataHandler additional handlers like
IConnectHandler exists.
The
server-side (MultithreadedServer) always works on INonblockingConnection to handle the incoming
connections.
1. a simple tcp/delimiter-based server example
First, define the handler class which implements the
desired handler interface e.g. IDataHandler, IConnectHandler or
ITimeoutHandler. The DataHandler
will be called if data has been received for a connection (which will be passed
over).
class EchoHandler implements IDataHandler {
public boolean onData(INonBlockingConnection con)
String data =
con.readStringByDelimiter("\r\n");
con.write(data +
"\r\n");
return true;
}
}
Than create a instance of the server (MultithreadedServer)
and assign the above handler
// creates the server by passing over the port number
and the handler
IMultithreadedServer srv = new MultithreadedServer(8090
// run in the context of the current thread
srv.run();
… that's it
2. semantic of the DataHandler’s onData
method
The IDataHandler’s
call back method onData
will be called if data has been received. Please note, that on network level
data can be fragmented on several TCP packets as well as data can be bundled
into one TCP packet. Performing a write operation like connection.write(“hello it’s me. What I
have to say is….”) on the client side doesn’t
mean that exact one onData
call occurs on the server side. A common pattern to solve this is to identify
logical parts by a delimiter or a leading length field. xSocket provides
methods to support this pattern.
public boolean
onData(INonBlockingConnection con)
// read only the whole logical part
// or
throwing a BufferUnderflowException
String data
= con.readStringByDelimiter("\r\n");
con.write(data +
"\r\n");
return true;
}
The INonBlockingConnection‘s
readStringByDelimiter(String) method
only returns a record if the whole part (marked by the delimiter) has been
received, or if not, a BufferUnderflowException
will be thrown. In contrast to IBlockingConnection,
a INonBlockingConnection read method
always returns immediately and could thrown a BufferUnderflowException.
The BufferUnderflowException
will be swallowed by xSocket, if the DataHandler
implementation doesn’t catch this exception. It is a common pattern not
to handle such an exception by the DataHandler.
The same is true for the INonBlockingConnection‘s readStringByLength(int) method.
3. writing a blocking client
For the client a IBlockingConnection
can be used to simplify the socket handling
IBlockingConnection con = new BlockingConnection(host, port);
String req = "Hello server";
con.write(req + "\r\n");
String res = con.readStringByDelimiter("\r\n");
assert (request.equals(res));
4. writing a non blocking client
By performing the a BlockingConnection’s read method, the method
call blocks until data has been received or a time out has been occurred. To
avoid this blocking behaviour the NonBlockingConnection can be used the client.
To do this, a client handler has to be defined which will be notified if
network events occur (which implements
IDataHandler and/or IDisconnectHandler
and/or…)
// defining the client handler (here as a anonymous
inner class)
IDataHandler clientHandler = new IDataHandler() {
public boolean
onData(INonBlockingConnection
connection) throws IOException, BufferUnderflowException,
MaxReadSizeExceededException {
String
response = connection.readStringByDelimiter("\r\n");
…
return true;
}
};
// opening the connection
INonBlockingConnection connection = new NonBlockingConnection(host, port, clientHandler);
connection.write(…);
// do something else. Receiving data causes that the
client handler’s onData method will be called (within a dedicated thread)
…
5. configure the server & worker pool
May be you want to configure the server. You can do this by calling the
appropriated setter
// define the timeout, in which data has to be
received
srv.setIdleTimeoutSec(2 * 60);
// define the timeout for the connection
srv.setConnectionTimeoutSec(30 * 60);
You could also configure the work pool. The worker
pool is used to perform the handler’s call back methods such as onData or onConnect. xSocket internal tasks like
socket read/write or background activities will be performed by xSocket
internal threads. These threads can’t be set or modified from the
outside.
xSocket uses the worker pool only to perform the
handler’s call back method. The pool can be set within the server’s
constructor. A worker pool has to implement the java.util.concurrent.Executor interface.
If a server constructor will be used, which doesn’t include a worker pool
argument, a FixedThreadPool will be created by default (see java.util.concurrent.Executors).
If the worker pool is set with null,
the multithreading modus is switched off. That means the call back methods will
be performed by the xSocket internal dispatcher thread, which is responsible to
perform the socket read/write operations.
// setting a fixed worker pool with 20 threads
IMultithreadedServer srv = new MultithreadedServer(8090, new
EchoHandler(), Executors.
newFixedThreadPool(20));
…
6.
define a connection-scoped handler
By default handlers are instance-scoped. That means,
the same handler instance will be used for each new incoming connection. A
handler becomes connection-scoped by implementing the IConnectionScoped interface.
This interface requires a clone
method, which will be used to create a dedicated instance of the given handler
for each new connection. To avoid side-effects, the clone
method has to perform a deep clone.
class SmtpHandler implements IDataHandler, IConnectionScoped
{
public boolean
onData(INonBlockingConnection con) throws IOException {
...
}
public Object clone() throws
CloneNotSupportedException {
return super.clone();
}
}
7.
attaching session-specific data to a connection
A connection allows to attach (connection-specific
session) data by using the setAttachment(attch),
getAttachment() methods:
class SmtpHandler implements IConnectHandler,
IDataHandler {
public boolean onConnect(INonBlockingConnection
con) throws IOException {
con.setAttachment(new
MySessionData());
return true; ...
}
public boolean
onData(INonBlockingConnection con) throws IOException {
MySessionData sessionData =
(MySessionData) con.getAttachment();
….
}
}
8.
another example: a simple tcp/length field-based handler
To apply a length field based communication pattern,
the connection’s mark-support can be used. In this case the client will
first write an “empty” length field. After writing the content
data, the write pointer will be moved back to the length field, to override the
length field.
IBlockingConnection con = new BlockingConnection(host, port);
con.setAutoflush(false); // mark support requires deactivated autoflush!
con.markWritePosition(); // mark current position
con.write((int) 0); // write
"emtpy" length field
int written = con.write("hello world");
written += con.write(" it’s nice to be here");
…
con.resetToWriteMark(); // return to length field position
con.write(written); // and update it
con.flush(); // flush (marker will be removed implicit)
…
9.
… and the server
For the server-side a utility method of StreamUtils can be used.
public boolean onData(INonBlockingConnection connection) throws
IOException, BufferUnderflowException {
// validate that enough data is
available (if not an BufferUnderflowException will be thrown)
int length = StreamUtils.validateSufficientDatasizeByIntLengthField(connection);
String text = connection.readStringByLength(length);
connection.write(text);
return true;
}
10.
connection pooling
Client-side connection pools can improve the
performance by avoiding the re-creation of connections. Typically, a connection
pool will be used on the client-side, if connections for the same server
(address) will be created in a serial manner in a sort period of time. By
pooling such connections the overhead of establishing a connection will be
avoided.
// create a unlimited connection pool with idle timeout 1 min
BlockingConnectionPool pool = new BlockingConnectionPool(60
* 1000);
IBlockingConnection con = null;
try {
// retrieve a connection (if no
connection is in pool, a new one will be created)
con = pool.getBlockingConnection(host, port);
con.write("Hello");
...
// always close the connection!
(the connection will be returned into the connection pool)
con.close();
} catch (IOException) {
if (con != null) {
try {
// if the connection is
invalid -> destroy it (it will not return into the pool)
pool.destroyConnection(con);
} catch (Exception ignore) {
}
}
}
11.
flushing
By default the autoflush is activated. That means calling
a write method will transfer the data to the underlying subsystem, immediately.
By calling the setAutoflush(false)
method, the flush behaviour will be controlled manually. For server-side
handler this will typically done within the onConnect call back
method.
autoflush example (default):
//client-side
IBlockingConnection connection = new
BlockingConnection(host, port);
connection.write(text +
“\n”);
String text =
connection.readStringByDelimiter(“\n”);
//server-side
class ServerHandler implements
IDataHandler {
public boolean
onData(INonBlockingConnection connection) throws IOException {
String text =
connection.readStringByDelimiter(“\n”);
connection.write(text)
connection.write(“\n”);
return true;
}
}
user-managed flush example:
//client-side
IBlockingConnection connection = new
BlockingConnection(host, port);
connection.setAutoflush(false);
connection.write(text +
“\n”);
connection.flush();
String text =
connection.readStringByDelimiter(“\n”);
//server-side
class ServerHandler implements
IConnectHandler, IDataHandler {
public boolean
onConnect(INonBlockingConnection connection) throws IOException {
connection.setAutoflush(false);
return true;
}
public boolean onData(INonBlockingConnection
connection) throws IOException {
String text =
connection.readStringByDelimiter(“\n”);
…
connection.write(text)
connection.write(“\n”);
…
connection.flush();
return true;
}
}
12.
handler and synchronization
By default the handler is synchronized on the
connection-level. That means, for the same connection the handler call back
methods will be called always in a serialized manner. Further more call back
methods will always be executed in the right order (onConnect, onData, …, onDisconnect).
Sometime
it is desirable that the synchronization should be done by the handler
implementation itself. The synchronization can be deactivated by annotate the
handler as non-synchronized. In this
case the case, the order of the call back method calls isn’t guaranteed.
E.g. under some circumstance it could happen, that a onData method will be performed before onConnect.
@Synchronized(Synchronized.Mode.OFF)
private final class NonSynchronizedHandler implements IDataHandler {
public boolean
onData(INonBlockingConnection connection) throws IOException{
…
}
}
13.
performance recommendations
[Connection] By setting autoflush
with false (default is true), the flushing
can be controlled manually. Because the flush performs a write operation and
synchronization on the underlying connection, the flush operation is quite
expensive. Especially in the case of multiple writes within one
“transaction”, user managed flush can improve the performance. By
using the connection’s write(ByteBuffer)
and write(ByteBuffer[]) methods
some restriction exits. Please see the API doc for more information.
connection.setAutoflush(false);
connection.write(header);
connection.write(fragment1);
connection.write(fragement2);
…
connection.flush();
[NonBlockingConnection
only] By setting the flushmode with ASYNC (default is SYNC) the data will be transferred to the
underlying connection in an asynchronous
way. Often the performance improvements can be very high. By using the
connection’s write(ByteBuffer)
and write(ByteBuffer[])methods some
restriction exits. Please see the API doc for more information.
nonblockingConnection.setFlushmode(FlushMode.ASYNC);
Choose
the proper worker pool type (bound pool, unbound pool, …) and
sizing. The proper worker pool size depends on the concrete handler
implementation. If the handler performs long running blocking calls like file
I/O or network calls which consume much time by waiting and
idling, a larger worker pool size should be set. If the worker pool size has
been configured to small, call backs of concurrent connections couldn’t
performed by waiting for free worker threads. In this case the efficiency is poor, because the taken worker
threads spend the most time by waiting. Concurrent call backs can’t be
performed caused by missing worker threads and system resources like CPUs are
partly unused. On the other hand, if the worker pool size is configured to
high, the operating system will spend much time switching between threads, rather than
execute them.
To
configuring the socket parameters by the <IConnection>.setOption(…)
method see also Boost
socket performance on Linux.
14.
SSL support (static & on-demand)
To run a server in a ssl mode, just a appropriated SSLContext object has
to pass over within the constructor (see SSLTestContextFactory
as an example to create such an SSLContext
object). By setting the sslOn
parameter with true,
the server will be start up in the ssl mode. If the ssl mode should be started
later - on-demand - the parameter has to be set with false.
The
same is true for client-side connections.
server-side example:
IMultithreadedServer sslTestServer = new MultithreadedServer(port, new
EchoHandler(), true, sslContext);
StreamUtils.start(sslTestServer);
// manually ssl activation (connection.activateSecuredMode) is not
necessary!
server-side example: (on-demand
activation)
IMultithreadedServer sslTestServer = new MultithreadedServer(port, new
EchoHandler(), false, sslContext);
StreamUtils.start(sslTestServer);
class EchoHandler implements IDataHandler {
public boolean onData(INonBlockingConnection con)
…
con.activateSecuredMode(); //
activate ssl mode on demand (-> only necessary if server started with flag
false)
…
return true;
}
}
client-side example:
IBlockingConnection connection = new BlockingConnection(port,
sslContext, true);
client-side example (on-demand
activation):
IBlockingConnection connection = new BlockingConnection(port,
sslContext, false);
…
connection.activateSecuredMode();
…
15.
script language support
xSocket have also been tested with script languages like
JRuby, Groovy or jython. Known restrictions (caused by the script language
environment):
See
the examples
for more information.
16.
troubleshooting (logging, common problems, …)
xSocket uses the built-in logging capability of
Java SE. To configure the logging behaviour by the logging.properties
file see Configuring Logger Default Values with a Properties File.
You can also set the logging configuration programmatically:
// activate xSocket logging (for namespace org.xsocket.stream)
Logger logger = Logger.getLogger("org.xsocket.stream");
logger.setLevel(Level.FINE);
ConsoleHandler ch = new ConsoleHandler();
ch.setLevel(Level.FINE);
logger.addHandler(ch);
A
common problem by opening/closing plenty of connections within a short
time is the Too Many Open Files
error, caused by (the configuration of) the underlying operation system. The Too Many Open Files support pattern describes this problem
and how to handle it. By using Windows see also Tuning
Windows.
17. JMX and
xSocket
To support monitoring by JMX, xSocket includes a MultithreadedServerMBeanProxyFactory class. By
calling the createAndRegister method the
major components of a xSocket stream server will be registered as MBeans.
public void launch(int port) {
// start the jmx environment (server)
startJmxServer("test", 1099);
// create the server, start it
MultithreadedServer server = new
MultithreadedServer(port, new MyHandler());
StreamUtils.start(server);
// … and register the server
mbeans on the PlatformMBeanServer (jmx environment)
MultithreadedServerMBeanProxyFactory.createAndRegister(server,
"MyTestServer");
…
}
// help method to start the jmx server
private static void startJmxServer(String name, int port) {
// start the rmi registry
try {
Registry registry =
LocateRegistry.createRegistry(port);
registry.unbind(name);
} catch (Exception ignore) { }
// start the jmx server
try {
JMXServiceURL url = new
JMXServiceURL("service:jmx:rmi:///jndi/rmi://" +
InetAddress.getLocalHost().getHostName() + ":" + port + "/"
+ name);
MBeanServer mbeanSrv =
ManagementFactory.getPlatformMBeanServer();
JMXConnectorServer server =
JMXConnectorServerFactory.newJMXConnectorServer(url, null, mbeanSrv);
server.start();
System.out.println("JMX RMI
Agent has been bound on address");
System.out.println(url);
} catch (Exception e) { }
}
By running jconsole you will get: