tigase源码分析3:SocketThread
原文链接:https://blog.csdn.net/u013934901/java/article/details/84696064
SocketThread 专用于处理客户端SOCKET的读写事件的线程,当服务器端SOCKET接受到客户socket,就会生成一个与对应的IOService,IOService.socketIO指向SocketIO对象,
SocketIO是对java api中SocketChannel的封装,所以拿到IOService也就等于拿到客户端SocketChannel了。SocketThread 是一个私有类,他在第一次加载的时候,就会创建了3类线程,
socketReadThread():负责读socket的数据;
socketWriteThread():负责写入socket数据;
ResultsListener:负责监视CompletionService执行结果IOService完成情况,判断IOService中的socket连接是否关闭,如没有则继续注册入SocketThread 的Selector中进行事件侦听;
SocketThread ::
private static SocketThread[] socketReadThread = null;
private static SocketThread[] socketWriteThread = null;
private static ThreadPoolExecutor executor = null;
private static CompletionService<IOService<?>> completionService = null;
//下面是实例属性
private Selector clientsSel = null;
private boolean reading = false;
private boolean writing = false;
static {
if (socketReadThread == null) {
int nThreads = (cpus * DEF_MAX_THREADS_PER_CPU) / 2 + 1;
executor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
completionService = new ExecutorCompletionService<IOService<?>>(executor);
//执行任务的线程池
socketReadThread = new SocketThread[nThreads]; //一组负责读socket的数据;
socketWriteThread = new SocketThread[nThreads]; //一组负责写socket的数据;
for (int i = 0; i < socketReadThread.length; i++) {
socketReadThread[i] = new SocketThread("socketReadThread-" + i);
socketReadThread[i].reading = true;
Thread thrd = new Thread(socketReadThread[i]);
thrd.setName("socketReadThread-" + i);
thrd.start();//启动,会执行run()
}
log.log(Level.WARNING, "{0} socketReadThreads started.", socketReadThread.length);
for (int i = 0; i < socketWriteThread.length; i++) {
socketWriteThread[i] = new SocketThread("socketWriteThread-" + i);
socketWriteThread[i].writing = true;
Thread thrd = new Thread(socketWriteThread[i]);
thrd.setName("socketWriteThread-" + i);
thrd.start();////启动,会执行run()
}
log.log(Level.WARNING, "{0} socketWriteThreads started.", socketWriteThread.length);
} // end of if (acceptThread == null)
}
//生成每一个SocketThread都会有一个对应ResultsListener线程
private SocketThread(String name) {
try {
clientsSel = Selector.open();
} catch (Exception e) {
log.log(Level.SEVERE, "Server I/O error, can't continue my work.", e);
stopping = true;
} // end of try-catch
new ResultsListener("ResultsListener-" + name).start();
}
public void SocketThread.run() {
while ( !stopping) {
try {
clientsSel.select();
if (log.isLoggable(Level.FINEST)) {
log.log(Level.FINEST, "Selector AWAKE: {0}", clientsSel);
}
//等到已选择的key,证明有数据要处理
Set<SelectionKey> selected = clientsSel.selectedKeys();
int selectedKeys = selected.size();
if ((selectedKeys == 0) && (waiting.size() == 0)) {
if (log.isLoggable(Level.FINEST)) {
log.finest("Selected keys = 0!!! a bug again?");
}
if ((++empty_selections) > MAX_EMPTY_SELECTIONS) {
recreateSelector();
}
} else {
empty_selections = 0;
if (selectedKeys > 0) {
for (SelectionKey sk : selected) {
//得到ConnectionListenerImpl.accept()中绑定的ioservice
IOService s = (IOService) sk.attachment();
try {
.....
//下一次socket从selector监听队列中移除
sk.cancel();
forCompletion.add(s);
} catch (CancelledKeyException e) {
...
}
}
}
// Clean-up cancelled keys...
clientsSel.selectNow();
}
//注册新的socket到selector中进行监听
addAllWaiting();
IOService serv = null;
while ((serv = forCompletion.pollFirst()) != null) {
//放线程沲中执行,调用了IOService.call()进行数据处理
completionService.submit(serv);
}
// clientsSel.selectNow();
} catch (CancelledKeyException brokene) {
。。
} catch (IOException ioe) {
。。
} catch (Exception exe) {
..
}
}
}
//ResultsListener.run()
public void ResultsListener.run() {
for (;;) {
try {
//CompletionService的实现是维护一个保存Future对象的BlockingQueue。只有当这个Future对象状态是结束的时候,才会加入到这个Queue中,take()方法得到的对象其实就是IOService。它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。
//其实这里的设计非常巧妙,当读到要处理事件进来后,把selector中对应的socket移出,当完成socket数据处理后只要连接还开启,再次加入selector中进行监听,所以客户端可以发送一个空字符串来进行心跳处理,维持客户端和服务器进行长连接。
IOService<?> service = completionService.take().get();
if (service != null) {
if (service.isConnected()) {//只要连接没关闭
addSocketService(service);//就再次注册到线程的Selector中
............
}
}
}
原文链接:https://blog.csdn.net/u013934901/java/article/details/84696064
标签:
上一篇: tigase源码分析4:packet处理
下一篇: tigase源码分析2:ConnectionOpenThread 处理服务端socket的线程