public class ServerReactor implements Runnable { /** * Selector to tell the IO event that our server socket channel has. */ private Selector selector; /** * Serve the incoming call for building up connection and dispatch * the request. */ p
/**
* Selector to tell the IO event that our server socket channel has.
*/
private Selector selector;
/**
* Serve the incoming call for building up connection and dispatch
* the request.
*/
public void run() {
try {
while (!Thread.interrupted()) {
// select the keys to see the channels that are ready
// for IO operation
selector.select();
// get the selected keys
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()) {
// dispatch each selection key
dispatch((SelectionKey) (it.next()));
}
// clear the selection keys
selected.clear();
}
} catch (Throwable t) {
t.printStackTrace();
}
}
/**
* Dispatch the selection key.
*/
void dispatch(SelectionKey k) throws CloudwaveException {
// get the attachment of the selection key
Runnable r = (Runnable) (k.attachment());
if (r != null) {
if (r instanceof ServerReceiver) {
ServerReceiver serverReceiver = (ServerReceiver) r;
if (serverReceiver.isClosed()) {
return;
}
}
// run the attachment
r.run();
}
}
public void close() throws IOException {
Thread.currentThread().interrupt();
serverSocketChannel.close();
selector.close();
}}
public abstract class ServerReceiver implements Runnable {
public void run() {
try {
while (true) {
// do a single read
int read = socketChannel.read(singleReadBuffer);
if (read == -1) {
// this means we reached the end of socket stream.
close();
return;
}
if (read > 0) {
readBytesLength += read;
if (readBytesLength > 4) {
// flip input buffer
singleReadBuffer.flip();
// process input buffer
processSingleReadBuffer(read);
// clear input buffer
singleReadBuffer.clear();
}
}
if (inputComplete) {
// reach the end of input request
completeRequest();
inputComplete = false;
readBytesLength = 0;
processPos = 0;
return;
}
if (read == 0) {
// since no more data available, just break to wait for
// more in coming data
break;
}
}
} catch (Throwable t) {
t.printStackTrace();
closeSession();
reinitBuffer();
selectionKey.cancel();
}
}}