tigase源码分析3:SocketThread

发布时间: 5年前 (2020-04-14)浏览: 799评论: 0


原文链接:https://blog.csdn.net/u013934901/java/article/details/84696064 

SocketThread 专用于处理客户端SOCKET的读写事件的线程,当服务器端SOCKET接受到客户socket,就会生成一个与对应的IOService,IOService.socketIO指向SocketIO对象,

SocketIO是对java api中SocketChannel的封装,所以拿到IOService也就等于拿到客户端SocketChannel了。SocketThread 是一个私有类,他在第一次加载的时候,就会创建了3类线程,

socketReadThread():负责读socket的数据;

socketWriteThread():负责写入socket数据;

ResultsListener:负责监视CompletionService执行结果IOService完成情况,判断IOService中的socket连接是否关闭,如没有则继续注册入SocketThread 的Selector中进行事件侦听;

 

    SocketThread ::
    private static SocketThread[] socketReadThread = null;
    private static SocketThread[] socketWriteThread = null;
    private static ThreadPoolExecutor executor = null;
    private static CompletionService<IOService<?>> completionService = null;
    //下面是实例属性
    private Selector clientsSel = null;
    private boolean reading = false;
    private boolean writing = false;
     
    static {
            if (socketReadThread == null) {
                int nThreads = (cpus * DEF_MAX_THREADS_PER_CPU) / 2 + 1;
     
                executor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<Runnable>());
                completionService = new ExecutorCompletionService<IOService<?>>(executor);
                            //执行任务的线程池
                socketReadThread = new SocketThread[nThreads]; //一组负责读socket的数据;
                             socketWriteThread = new SocketThread[nThreads]; //一组负责写socket的数据;
                for (int i = 0; i < socketReadThread.length; i++) {
                    socketReadThread[i] = new SocketThread("socketReadThread-" + i);
                    socketReadThread[i].reading = true;
     
                    Thread thrd = new Thread(socketReadThread[i]);
     
                    thrd.setName("socketReadThread-" + i);
                    thrd.start();//启动,会执行run()
                }
     
                log.log(Level.WARNING, "{0} socketReadThreads started.", socketReadThread.length);
     
                for (int i = 0; i < socketWriteThread.length; i++) {
                    socketWriteThread[i] = new SocketThread("socketWriteThread-" + i);
                    socketWriteThread[i].writing = true;
     
                    Thread thrd = new Thread(socketWriteThread[i]);
     
                    thrd.setName("socketWriteThread-" + i);
                    thrd.start();////启动,会执行run()
                }
     
                log.log(Level.WARNING, "{0} socketWriteThreads started.", socketWriteThread.length);
            }    // end of if (acceptThread == null)
        }
     
     
           //生成每一个SocketThread都会有一个对应ResultsListener线程
        private SocketThread(String name) {
            try {
                clientsSel = Selector.open();
            } catch (Exception e) {
                log.log(Level.SEVERE, "Server I/O error, can't continue my work.", e);
                stopping = true;
            }    // end of try-catch
     
            new ResultsListener("ResultsListener-" + name).start();
        }
     
     
     
      public void SocketThread.run() {
            while ( !stopping) {
                try {
                    clientsSel.select();
     
                    if (log.isLoggable(Level.FINEST)) {
                        log.log(Level.FINEST, "Selector AWAKE: {0}", clientsSel);
                    }
                                  //等到已选择的key,证明有数据要处理
                    Set<SelectionKey> selected = clientsSel.selectedKeys();
                    int selectedKeys = selected.size();
     
                    if ((selectedKeys == 0) && (waiting.size() == 0)) {
                        if (log.isLoggable(Level.FINEST)) {
                            log.finest("Selected keys = 0!!! a bug again?");
                        }
     
                        if ((++empty_selections) > MAX_EMPTY_SELECTIONS) {
                            recreateSelector();
                        }
                    } else {
                        empty_selections = 0;
     
                        if (selectedKeys > 0) {
     
                            for (SelectionKey sk : selected) {
                                                    //得到ConnectionListenerImpl.accept()中绑定的ioservice
                                IOService s = (IOService) sk.attachment();
     
                                try {
                                
                                    .....
                                                           //下一次socket从selector监听队列中移除
                                    sk.cancel();
                                                               
                                    forCompletion.add(s);
     
                                    
                                } catch (CancelledKeyException e) {
                                ...
                                }
                            }
                        }
     
                        // Clean-up cancelled keys...
                        clientsSel.selectNow();
                    }
                                  //注册新的socket到selector中进行监听     
                    addAllWaiting();
     
                    IOService serv = null;
     
                    while ((serv = forCompletion.pollFirst()) != null) {
                                           //放线程沲中执行,调用了IOService.call()进行数据处理
                        completionService.submit(serv);
                    }
     
                    // clientsSel.selectNow();
                } catch (CancelledKeyException brokene) {
     
                    。。
                } catch (IOException ioe) {
                               。。
                } catch (Exception exe) {
                ..
                }
            }
        }
     
     
     
    //ResultsListener.run()
        public void ResultsListener.run() {
        for (;;) {
        try {
    //CompletionService的实现是维护一个保存Future对象的BlockingQueue。只有当这个Future对象状态是结束的时候,才会加入到这个Queue中,take()方法得到的对象其实就是IOService。它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。
          //其实这里的设计非常巧妙,当读到要处理事件进来后,把selector中对应的socket移出,当完成socket数据处理后只要连接还开启,再次加入selector中进行监听,所以客户端可以发送一个空字符串来进行心跳处理,维持客户端和服务器进行长连接。
     
        IOService<?> service = completionService.take().get();
            if (service != null) {
            if (service.isConnected()) {//只要连接没关闭
                addSocketService(service);//就再次注册到线程的Selector中
             ............
                      }
               }
            }


原文链接:https://blog.csdn.net/u013934901/java/article/details/84696064

标签:

上一篇: tigase源码分析4:packet处理
下一篇: tigase源码分析2:ConnectionOpenThread 处理服务端socket的线程

相关文章暂无相关
评论列表暂无评论
发表评论
验证码

«   2024年4月   »
1234567
891011121314
15161718192021
22232425262728
2930
控制面板
您好,欢迎到访网站!
  查看权限
网站分类
搜索
最新留言
    文章归档
    网站收藏
    友情链接
    • RainbowSoft Studio Z-Blog
    • 订阅本站的 RSS 2.0 新闻聚合
    ︿
    Top