tigase源码分析2:ConnectionOpenThread 处理服务端socket的线程

发布时间: 5年前 (2020-04-14)浏览: 956评论: 0

原文链接:https://blog.csdn.net/u013934901/java/article/details/84693760 

一、    ConnectionOpenThread 使用单例模式,他是负责建立服务端SOCKET和 接收连接客户端socket 线程。

在初始化ConnectionManager的时候ConnectionManager.connectThread 属性所引用的服务端SOCKET连接线程ConnectionOpenThread 就被初始化了

    ConnectionManager::
    private static ConnectionOpenThread connectThread = ConnectionOpenThread.getInstance();

 

 

ConnectionOpenThread .getInstance()的实现  

    private Selector  selector= null;
     
    public static ConnectionOpenThread getInstance() {
     
            if (acceptThread == null) {
                acceptThread = new ConnectionOpenThread();
     
                Thread thrd = new Thread(acceptThread);
     
                thrd.setName("ConnectionOpenThread");
                thrd.start(); //启动ConnectionOpenThread线程,则this.run()方法将被被执行
                if (log.isLoggable(Level.FINER)) {
                    log.finer("ConnectionOpenThread started.");
                }
            }    // end of if (acceptThread == null)
     
            return acceptThread;
        }
     
     
    private ConnectionOpenThread() {
       .......
            try {
                selector = Selector.open();//得到一个选择器,可以去了解下nio api
            } catch (Exception e) {
                log.log(Level.SEVERE, "Server I/O error, can't continue my work.", e);
                stopping = true;
            }    // end of try-catch
        }

 

 

 ConnectionOpenThread .run()的实现  

在该方法中,selector管理的都是服务端SOCKET

    public void run() {
            while (!stopping) {
                try {
                    selector.select();
                      //此方法为阻塞方法,当选择器管理channel(也就是向selector注册的channel)                    中发生读、写或异常事件时,select()将会被触发会往下执行
     
                    // Set<SelectionKey> selected_keys = selector.selectedKeys();
                    // for (SelectionKey sk : selected_keys) {
                                   //返回已此通道已准备就绪的键集,已选择始终是键集的一个子集。
                    //begin iterator
            for (Iterator i = selector.selectedKeys().iterator(); i.hasNext(); ) {
                        SelectionKey sk = (SelectionKey) i.next();
     
                        i.remove();
     
                        SocketChannel sc        = null;
                        boolean       throttled = false;
                        int           port_no   = 0;
     
                    if ((sk.readyOps() & SelectionKey.OP_ACCEPT) != 0) {                     //在此是否为被动SOCKET也就是服务端SOCKET,是则接受客户端socket
                ServerSocketChannel nextReady = (ServerSocketChannel) sk.channel();
                        port_no = nextReady.socket().getLocalPort();
                        sc = nextReady.accept();//得到一个客户端SOCKET
                        ...
                    }    // end of if (sk.readyOps() & SelectionKey.OP_ACCEPT)
                 if ((sk.readyOps() & SelectionKey.OP_CONNECT) != 0) {
                    sk.cancel();  // 从Selector中删除指定的SelectionKey  
                                    //所以这个普通的conect socket只会处理一次侦听到的发生事件
                    sc = (SocketChannel) sk.channel();//得到connect SOCKET
     
                    }    // end of if (sk.readyOps() & SelectionKey.OP_ACCEPT)
                        if (sc != null) { //设置接收到的SOCKET的一些信息
                            try {
                    sc.configureBlocking(false);//将客户端通道设置为非阻塞
                                sc.socket().setSoLinger(false, 0);
                                sc.socket().setReuseAddress(true);
            
                                                  //每个ServerSocketChannel在创建注册到selector                                              时就被绑定了一个ConnectionOpenListener对象,                                               用这个对象来处理该接受到的socket,该注册过程                                               在addAllWaiting()中进行
                ConnectionOpenListener al = (ConnectionOpenListener) sk.attachment();
     
                sc.socket().setTrafficClass(al.getTrafficClass());
                    sc.socket().setReceiveBufferSize(al.getReceiveBufferSize());
                   al.accept(sc);//此方法 为建立连接socket的进行后续处理的设定
                } catch (java.net.SocketException e) {
     
            ConnectionOpenListener al = (ConnectionOpenListener) sk.attachment();
                          al.accept(sc);
                            }
                        } else {
                            log.log(Level.INFO,
                                    "Can not obtain socket channel from selection key, throttling activated = {0}, for port: {1}",
                                    new Object[] { throttled, port_no });
                        }    // end of if (sc != null) else
                        ++accept_counter;
                    }
                  //end of iterator
         addAllWaiting();//加载要注册到selector中的ServerSocketChannel或connect socket
                } catch (IOException e) {
                    log.log(Level.SEVERE, "Server I/O error.", e);
     
                    // stopping = true;
                }        // end of catch
                        catch (Exception e) {
                    log.log(Level.SEVERE, "Other service exception.", e);
     
                    // stopping = true;
                }        // end of catch
            }
        }

 

 

 在说解到addAllWaiting();加载要注册到selector中的ServerSocketChannel时,先看下源码:

在waiting队列中如果有等待处理的ConnectionOpenListener对象,则创建一个对应的ServerSocketChannel

    private void addAllWaiting() throws IOException {
            ConnectionOpenListener al = null;
     
            while ((al = waiting.poll()) != null) {
                try {
                    addPort(al);//绑定相关的端口进行监听
                } catch (Exception e) {
                    log.log(Level.WARNING, "Error: creating connection for: " + al, e);
                    al.accept(null);
                }    // end of try-catch
            }      // end of for ()
        }
     
     
     
     
    private void addPort(ConnectionOpenListener al) throws IOException {
        if ((al.getConnectionType() == ConnectionType.connect) && (al.getRemoteAddress() !=
                    null)) {
         addISA(al.getRemoteAddress(), al);
        } else if ((al.getIfcs() == null) || (al.getIfcs().length == 0) || al.getIfcs()[0]
                    .equals("ifc") || al.getIfcs()[0].equals("*")) {
       addISA(new InetSocketAddress(al.getPort()), al);//绑定到InetSocketAddress进行监听服务
        } else {
                for (String ifc : al.getIfcs()) {
                    addISA(new InetSocketAddress(ifc, al.getPort()), al);
                }    // end of for ()
        }      // end of if (ip == null || ip.equals("")) else
        }
     
     
      //addISA(..)这才是真正创建ServerSocketChannel方法,绑定到服务器某一个端口上进行监听服务,
      //开启了服务端socket
      private void addISA(InetSocketAddress isa, ConnectionOpenListener al)throws IOException {
            switch (al.getConnectionType()) {
            case accept :
                ...
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.socket().setReceiveBufferSize(al.getReceiveBufferSize());
            ssc.configureBlocking(false);//服务端socket也是非阻塞方法
        ssc.socket().bind(isa, (int) (port_throttling)); //绑定到相关地址的某一个端口上
        ssc.register(selector, SelectionKey.OP_ACCEPT, al);//注册服务端socket到selector中,        并且附带绑定一个ConnectionOpenListener对象,该对象为服务端socket接收到新来的socket         进行后续处理。所以selector能监听这些已注册socket的事件发生
     
                break;
     
            case connect :
            ...
                          //服务器socket之间要进行通讯,则先要连接
                SocketChannel sc = SocketChannel.open();
                sc.socket().setReceiveBufferSize(al.getReceiveBufferSize());
                sc.socket().setTrafficClass(al.getTrafficClass());
                sc.configureBlocking(false);
                sc.connect(isa);
          sc.register(selector, SelectionKey.OP_CONNECT, al);
              //在此也注册到ConnectionOpenThread.selector中
     
                break;
     
            default :
                ..
                break;
            }    
        }

 

   二、从以上addAllWaiting();分析中看到处理的都是waiting队列里的ConnectionOpenListener对象,那这个ConnectionOpenListener对象是什么时候就会被放到waiting队列的呢,这得从ConnectionManager.initializationCompleted()中说起,在启动章节中分析到MessageRouter.setProperties(map)负责加载了其它的组件最后对每一个组件都执行了初始化完成动作。从而ConnectionManager.initializationCompleted()将会被执行

 

    MessageRouter::
    for (ServerComponent comp : components.values()) {
            comp.initializationCompleted();
            }

 

 

 ConnectionManager.initializationCompleted()源码如下

 

    public void initializationCompleted() {
            if (isInitializationComplete()) {
                // Do we really need to do this again?
                return;
            }
            super.initializationCompleted();
            initializationCompleted = true;
                //加载组件中的服务配置
            for (Map<String, Object> params : waitingTasks) {
           //启动一个定时任务,设置准备加入ConnectionOpenThread.waiting的ConnectionListener对象
             reconnectService(params, connectionDelay);
            }
            waitingTasks.clear();
            if ( null != watchdog ){
                watchdog.start();
            }
        }

 
 2.1 也许看到上面的waitingTask你在想他是什么样的配置信息呢,其实他就是启动服务器监听端口的配置,系统 默认的有如下几种,bosh,c2s,s2s,ws2s组件的服务器配置,waitingTask装的是每一个组件的端口配置信息

    c2s/connections/5222/type[S]=accept
    c2s/connections/5222/socket[S]=plain
    c2s/connections/5222/ifc[s]=*
    c2s/connections/5222/remote-host[S]=localhost
    c2s/connections/5222/connections/tls/required[B]=false
    c2s/connections/5223/type[S]=accept
    c2s/connections/5223/socket[S]=ssl
    c2s/connections/5223/ifc[s]=*
    c2s/connections/5223/remote-host[S]=localhost
    c2s/connections/5223/connections/tls/required[B]=false
    c2s/connections/ports[i]=5222, 5223
     
    bosh/connections/5280/type[S]=accept
    bosh/connections/5280/socket[S]=plain
    bosh/connections/5280/ifc[s]=*
    bosh/connections/5280/remote-host[S]=localhost
    bosh/connections/5280/connections/tls/required[B]=false
    bosh/connections/ports[i]=5280
     
    s2s/connections/5269/type[S]=accept
    s2s/connections/5269/socket[S]=plain
    s2s/connections/5269/ifc[s]=*
    s2s/connections/5269/remote-host[S]=localhost
    s2s/connections/5269/connections/tls/required[B]=false
    s2s/connections/ports[i]=5269
     
    ws2s/connections/5290/type[S]=accept
    ws2s/connections/5290/socket[S]=plain
    ws2s/connections/5290/ifc[s]=*
    ws2s/connections/5290/remote-host[S]=localhost
    ws2s/connections/5290/connections/tls/required[B]=false
    ws2s/connections/ports[i]=5290

 
 
 
    2.1.1 ConnectionManager.setProperties(.)中对上面的配置信息作了解析,放到map里保存着

    ConnectionManager.setProperties(Map<String, Object> props){
    ...
    for (int i = 0; i < ports.length; i++) {
    addWaitingTask(port_props);
    }
    }
     
    //conn信息加入waitingTasks队列
    protected void addWaitingTask(Map<String, Object> conn) {
            if (initializationCompleted) {
                reconnectService(conn, connectionDelay);
            } else {
                waitingTasks.add(conn);
            }
        }

 
 

 2.2  然后回来ConnectionManager对象继续分析ConnectionOpenThread.waiting队列是怎么样增加了数据的。

 

     ConnectionManager::
     
     //启动一个定时任务,设置准备加入ConnectionOpenThread.waiting的ConnectionListener对象
    private void reconnectService(final Map<String, Object> port_props, long delay) {
            ...
            addTimerTask(new tigase.util.TimerTask() {
                @Override
                public void run() {
              startService(port_props);
                }
            }, delay);
        }
     
     
       //ConnectionOpenThread是单例模式,在此该对象只被初始化一次
        private static ConnectionOpenThread connectThread = ConnectionOpenThread.getInstance();
     
       private void startService(Map<String, Object> port_props) {
            if (port_props == null) {
                throw new NullPointerException("port_props cannot be null.");
            }
                   //根据组件的配置信息生成一个相关的ConnectionListener对象
            ConnectionListenerImpl cli = new ConnectionListenerImpl(port_props);
     
            if (cli.getConnectionType() == ConnectionType.accept) {
                pending_open.add(cli);
            }
                    //将ConnectionListener对象加入ConnectionOpenThread.waiting队列中
            connectThread.addConnectionOpenListener(cli);
        }

 

 

  最后在ConnectionOpenThread中可以看到ConnectionListener是怎么样加入waiting队列的了

     ConnectionOpenThread::
       public void addConnectionOpenListener(ConnectionOpenListener al) {
            waiting.offer(al);
            selector.wakeup();
        }
 
原文链接:https://blog.csdn.net/u013934901/java/article/details/84693760

标签:

上一篇: tigase源码分析3:SocketThread
下一篇: tigase源码分析1:启动

相关文章暂无相关
评论列表暂无评论
发表评论
验证码

«   2024年4月   »
1234567
891011121314
15161718192021
22232425262728
2930
控制面板
您好,欢迎到访网站!
  查看权限
网站分类
搜索
最新留言
    文章归档
    网站收藏
    友情链接
    • RainbowSoft Studio Z-Blog
    • 订阅本站的 RSS 2.0 新闻聚合
    ︿
    Top