tutorial - multiplexed streaming supportThis is a beginner tutorial about writing stream-oriented muliplexed network applications by using xSocket. If you need more help, don't hesitate to use xSocket's support forum. Multiplexed connection support (xSocket extension)xSocket supports writing multiplexed connections. By doing this a single (expensive) physical connection instance will be used to transfer independent logical data on multiple streams. Here, these streams are called Pipeplines. The pipline's data stream will be multiplexed and demultiplexed by the Multiplexer. xSocket supports a simple and performant multiplexer by default. By implementing the IMultiplexer interface a custom multiplexer can be realized.The MultiplexConnection implementation uses internally a INonBlockingConnection instance to handle the network communication. Therefore a NonBlockingConnection can become a MultiplexedConnection at any time. This is true for the client-side as well as for the server-side. The integration of the multiplexed connection is highly optimized by reducing the overhead of layering. ![]() To create a MultiplexConnection first a NonBlockingConnection has to be established which will be passed over within the MultiplexConnection constructor call. A Multiplexed connection offers operations to manage the Pipelines. A pipeline will be created by the createPipeline() method. This returns a unique pipeline id. Based on this pipelineId a INonBlockingPipeline or an IBlockingPipeline handle can be retrieved. The IBlockingPipeline/INonBlockingPipeline interface extends the IBockingConnection/INonBlockingConnection interface by adding the getMultiplexedConnection() method. Beside this additional functionality, a pipeline is like a ordinary IBlocking/INonBlockingConnection. For this reason a pipeline also implements the GatheringByteChannel and WritableByteChannel interface of the java.nio package. 1. Blocking client-side example// creating a client-side multiplexed connection INonBlockingConnection nativeCon = NonBlockingConnection(hostname, 9011); IMultiplexedConnection multiplexedCon = new MultiplexedConnection(nativeCon); // create a new pipeline String controlPipelineId = multiplexedCon.createPipeline(); IBlockingPipeline controlPipeline = multiplexedCon.getBlockingPipeline(controlPipelineId); // open pipelines, send and received data, and close the pipelines ... controlPipeline.write("NEW_MESSAGE_PIPELINE\r\n"); String messagePipelineId = commandPipeline.readStringByDelimiter("\r\n"); IBlockingPipeline messagePipeline = multiplexedCon.getBlockingPipeline(messagePipelineId); messagePipeline.write(messsagepart_1); messagePipeline.write(messsagepart_2); ... controlPipeline.write("CLOSE_MESSAGE_PIPELINE" + messagePipelineId + "\r\n"); messagePipeline.close(); ... 2. Server-side handlingA MultiplexedConnection uses a INonBlockingConnection instance to handle the network communication. On the server-side each new incoming connection will be a INonBlockingConnection. By accepting a connection, this has to be wrapped by the MultiplexedConnection immediately. This will be done within the onConnect callback method by creating a new MultiplexedConnection based on the new INonBlockingConnection object. xSocket provides the MultiplexedProtocolAdapter to do this. The MultiplexedProtocolAdapter is an ordinary IConnectHandler implementation which creates a Multiplexed connection based on each incoming connection.![]() By creating a MultiplexedProtocolAdapter a
IPipelineHandler has to be passed over. Each
new pipeline will be set with this handler. Beside the specific
pipeline handlers, standard handlers like IDataHandler, ITimeoutHandler
or IDisconnectHandler are supported as well as the IConnectionScoped
(which means here pipeline scoped) interface. IServer server = new Server(9011, new MultiplexedProtocolAdapter(new CommandPipelineHandler())); ConnectionUtils.start(server); // pipeline data handler which is assigned to each new pipeline class CommandPipelineHandler implements IPipelineDataHandler { public boolean onData(INonBlockingPipeline pipeline) throws IOException { String cmd = pipeline.readStringByDelimiter("\r\n") if (cmd.equals("NEW_MESSAGE_PIPELINE")) { IMultiplexedConnection mplCon = pipeline.getMultiplexedConnection(); String msgPipelineId = mplCon.createPipeline(); INonBlockingPipeline msgPipeline = mplCon.getNonBlockingPipeline(msgPipelineId); // replace the CommandPipelineHandler of the new pipeline // by a pipeline-specific data handler msgPipeline.setHandler(new DataHandler()); pipeline.write(msgPipelineId + "\r\n"); } ... ... } } // A pipeline handler could also be a ordinary handler like IDataHandler class DataHandler implements IDataHandler { private FileChannel fc = null; DataHandler() { File file = ... fc = new RandomAccessFile(file, "rw").getChannel(); ... } public boolean onData(INonBlockingConnection pipeline) throws IOException { pipeline.transferTo(fc, pipeline.available()); ... return true; } } 3. Defining a custom MultiplexerTo define a custom multiplex logic a IMultiplexer implementation has to be provided. This custom multiplexer will be passed over within the constructors.// client-side custom multiplexed handling IMultiplexedConnection multiplexedCon = new MultiplexedConnection(nativeCon, new MyMultiplexer()); // server-side custom multiplexed handling IHandler adpater = new MultiplexedProtocolAdapter(new CommandPipelineHandler(), new MyMultiplexer())) IServer server = new Server(9011, adpater); To define a custom multiplex logic a IMultiplexer implementation has to be provided. This custom multiplexer will be passed over within the constructors. class MyMultiplexer implements IMultiplexer { public String openPipeline(INonBlockingConnection nbc) throws IOException, ClosedException { ... create a unique pipelineId ... create a network packet ... and send it to the peer return pipelineId; } public void closePipeline(INonBlockingConnection nbc, String pipelineId) throws IOException, ClosedException { ... create a network packet ... and send it to the peer } public void multiplex(INonBlockingConnection nbc, String pipelineId, ByteBuffer[] dataToWrite) throws IOException, ClosedException { ... create a network packet based on the pipeline data ... and send it to the peer } public void multiplex(INonBlockingConnection nbc, String pipelineId, ByteBuffer[] dataToWrite, IWriteCompletionHandler completionHandler) throws IOException, ClosedChannelException { ... create a network packet based on the pipeline data ... and send it to the peer } public void demultiplex(INonBlockingConnection nbc, IDemultiplexResultHandler rsltHdl) throws IOException, ClosedException { ... read the network packet ... notify the specific pipeline message } } |