tigase源码分析4:packet处理

发布时间: 5年前 (2020-04-14)浏览: 1040评论: 0

原文链接: https://blog.csdn.net/u013934901/article/details/84696050

这节主要讲数据包packet 的流转过程,如图大概说明packet被处理的流程,但实际上packet最终的处理者是插件,这个过程是在packet流转到SM中被分发到对它感兴趣的processor中处理的,下节将会详细说明packet被SM处理情况:

 图片.png

被处理的包packet,一旦被会话管理器和处理器插件(session manager and processor plugins)处理完成,数据包会被摧毁。因此一个处理器将数据包转发到目的地前必须创建一个包的副本,并设置所有属性才返回处理结果。当然处理器可以生成任意数量的数据包。

所以你会看到上图显示2个用户userA 和userB之间的通信的数据包送到最终目的地前被复制两次,messagerouter可以看作为一个包的路由器,他根据packet中的to属性值选择相应的处理组件进行投递到对方的处理队列里,这个包将被下一个组件执行processPacket(packet)进行处理

 

在启动章分析到XMPPServer中执行 MessageRouter.start();开启多线程处理in out packet.
图片.png 

        public void MessageRouter.start() {
            super.start();
        }
     

   从以上的继承图得知道,将执行AbstractMessageReceiver.start()

        @Override
        public void AbstractMessageReceiver.start() {
            startThreads();
        }
     
    private ArrayDeque<QueueListener>  threadsQueue=null;
    private void startThreads() {
      if (threadsQueue == null) {
        threadsQueue = new ArrayDeque<QueueListener>(8);
        for (int i = 0; i < in_queues_size; i++) {
        QueueListener in_thread = new QueueListener(in_queues.get(i), QueueType.IN_QUEUE);
     
            in_thread.setName("in_" + i + "-" + getName());
            in_thread.start();
            threadsQueue.add(in_thread);
        }
      for (int i = 0; i < out_queues_size; i++) {
        QueueListener out_thread = new QueueListener(out_queues.get(i), QueueType
                            .OUT_QUEUE);
     
                out_thread.setName("out_" + i + "-" + getName());
            out_thread.start();
            threadsQueue.add(out_thread);
        }
        }    // end of if (thread == null || ! thread.isAlive())
    ......
    }

每个MessageReceiver组件都 有多个线程分别处理各自packet,来一个简单的模型图表明
图片.png 

 

     QueueListener是AbstractMessageReceiver的内部类,所以QueueListener内部能直接访问到AbstractMessageReceiver对象的方法。由此可见,象ClientConnectionManager,S2SConnectionManager

,MessageRouter,SessionManager 这些子类都有各自的in out线程负责处理投递到他们节点上的packet,

 

 

    private QueueType    type          = null;
    private boolean      threadStopped = false;
    private PriorityQueueAbstract<Packet> queue;
    private QueueListener(PriorityQueueAbstract<Packet> q, QueueType type) {
        this.queue = q;
        this.type  = type;
        compName   = AbstractMessageReceiver.this.getName();
    }
     
    @Override
    public void QueueListener.run() {
                
     
        Packet        packet  = null;
        Queue<Packet> results = new ArrayDeque<Packet>(2);
     
        while (!threadStopped) {
            try {
     
            packet = queue.take();//阻塞方法
            ++packetCounter;
            switch (type) {
             case IN_QUEUE :
                   long startPPT = System.currentTimeMillis();
     
                PacketReceiverTask task = null;
     
                if (packet.getTo() != null) {
                String id = packet.getTo().toString() + packet.getStanzaId();
     
                   task = waitingTasks.remove(id);
                            }
                if (task != null) {
                 task.handleResponse(packet);
                } else {
            boolean processed = false;
             if (packet.isCommand() && (packet.getStanzaTo() != null) && compName.equals(
                      packet.getStanzaTo().getLocalpart()) && isLocalDomain(packet
                    .getStanzaTo().getDomain())) {
                    processed = processScriptCommand(packet, results);
            if (processed) {
             Packet result = null;
              while ((result = results.poll()) != null) {
                addOutPacket(result);
            }
         }
           }
           if (!processed && ((packet = filterPacket(packet, incoming_filters)) !=null)) {
            processPacket(packet);//执行具体实现类的处理方法
        }
     
        int idx = pptIdx;
     
        pptIdx = (pptIdx + 1) % processPacketTimings.length;
     
        long timing = System.currentTimeMillis() - startPPT;
     
        processPacketTimings[idx] = timing;
        }
     
        break;
     
         case OUT_QUEUE :
     
                 if ((packet = filterPacket(packet, outgoing_filters)) != null) {
                processOutPacket(packet);//执行具体的实现类的处理方法
              }
     
                break;
            default :
                break;
            }    // end of switch (qel.type)
            } catch (InterruptedException e) {
                                         .....
            }      // end of while (! threadStopped)
            }

 

    private MessageReceiver     parent  = null;
    //在初始化时,parent被赋值为MessageRouter对象。
    public void AbstractMessageReceiver.processOutPacket(Packet packet) {
            if (parent != null) {
              parent.addPacket(packet);//过渡到MessageRouter对象进行处理
            } else {
             addPacketNB(packet);
            }    // end of else
        }

 

 

 

 父类实现默认addPacketNB()添加packet到队列的方法,如果子类没有重写该方法则使用父类的这个方法加入packet到他们各自的in 或out的处理队列里

 

    public boolean AbstractMessageReceiver.addPacketNB(Packet packet) {
       int queueIdx = Math.abs(hashCodeForPacket(packet) % in_queues_size);//得到一个hash值
       //根据那个hash值加入到对应的in队列里,run()里会监听到阻塞队列有处理packet,则处理之
       boolean result = in_queues.get(queueIdx).offer(packet, packet.getPriority().ordinal());
     
            if (result) {
                ++statReceivedPacketsOk;
            } else {
     
                // Queue overflow!
                ++statReceivedPacketsEr;
            }
     
            return result;
        }

 

   同上,只不过这个是放入out处理队列,由out线程监听处理之

        protected boolean AbstractMessageReceiver.addOutPacketNB(Packet packet) {
            int queueIdx = Math.abs(hashCodeForPacket(packet) % out_queues_size);
            boolean result = false;
            //放到相应的out队列里等待处理
        result = out_queues.get(queueIdx).offer(packet, packet.getPriority().ordinal());
            if (result) {
                ++statSentPacketsOk;
            } else {
     
                ++statSentPacketsEr;
     
            }
     
            return result;
        }

 

 

 当我们知道如果要用哪一个AbstractMessageReceiver的实现类来处理packet的时候,我只需要把packet投递到对应的实现类(如SessionManager)里的in_queue或out_queue里就可以了,因为这些实现类开启了多条in 和out线程在等待处理in out队列里的packet,所以当客户端和服务器端建立连接后,发来有效的packet,而这个packet被投递的入口要从下面分析说起了。

 

 

    public void ConnectionManager.accept(SocketChannel sc) {
    IO serv = getXMPPIOServiceInstance(); //每一个接受到的新socket都有一个与之对应的ioservice
    //这个ioservice设置监听器ConnectionManager.this可能是(ClientConnectionManager,
    //S2SConnectionManager,BoshConnectionManager,WebSocketClientConnectionManager)的对象,主要是看是这些客户端socket是从哪个服务端socket对象监听的端口进来的
    serv.setIOServiceListener(ConnectionManager.this);
    }
     
    //前面ConnectionOpenThread分析的章节已经详细分析过了,当有数据包来的时间,客户端socket是由服务端创建的一个对应的IoService作为处理类的。在SocketThread中监听到的socket有可以处理的数据时completionService.submit(serv);用线程池里的线程来执行IOService.call()方法,开始进入数据处理
    private IOServiceListener<IOService<RefObject>> serviceListener  = null;
    public IOService<?> IOService.call() throws IOException {
            writeData(null);
     
            boolean readLock = true;
     
            if (stopping) {
                stop();
            } else {
                readLock = readInProgress.tryLock();
                if (readLock) {
                    try {
                        processSocketData();//执行具体的子类的处理数据方法
                                                               该方法包括解析的数据封装成packet
                    if ((receivedPackets() > 0) && (serviceListener != null)) {
                           //由前面分析可知道执行ConnectionManager子类的方法开始处                                     理数据包packet
                                        serviceListener.packetsReady(this);
                        }    // end of if (receivedPackets.size() > 0)
                    } finally {
                        readInProgress.unlock();
                        if (!isConnected()) {
     
                            forceStop();
                        }
                    }
                }
            }
     
            return readLock
                    ? this
                    : null;
        }

 

 

    public void ConnectionManager.packetsReady(IO serv) throws IOException {
     
            if (checkTrafficLimits(serv)) {
                    //processSocketData(serv)是读入方向的packet处理入口
                    //writePacketsToSocket 是写出方向的packet处理方法
            writePacketsToSocket(serv, processSocketData(serv));
            }
        }

 processSocketData可能的继承实现结构,由图可见,执行processSocketData(IO serv)由继承ConnectionManager的子类的重写方法。
图片.png

    //拿实现类ClientConnectionManager来分析
    public Queue<Packet> ClientConnectionManager.processSocketData(XMPPIOService<Object> serv) {
     
            JID id = serv.getConnectionId();
     
                     Packet p = null;
     
            while ((p = serv.getReceivedPackets().poll()) != null) {
                   if (p.getAttributeStaticStr(Packet.XMLNS_ATT) == null) {
                    p.setXMLNS(XMLNS);
                }
                
     
                if (p.getStanzaFrom() != null) {
                    p.initVars(null, p.getStanzaTo());
                }
                            //设置包的from值
                p.setPacketFrom(id);
     
                JID receiver = serv.getDataReceiver();
                            //设置包的to值
                if (receiver != null) {
                       p.setPacketTo(serv.getDataReceiver());
                            //投递到(如ClientConnectionManager类)中的out_queue队列中,
                              由run()处理,这个包直到处理完成经过多次投递,
                   addOutPacket(p);
                } else {
     
                }
     
            }    // end of while ()
     
            return null;
        }
     
          //投递packet到相应的out_queue列队中
          protected boolean addOutPacket(Packet packet) {
        int queueIdx = Math.abs(hashCodeForPacket(packet) % out_queues_size);
     
        try {
            out_queues.get(queueIdx).put(packet, packet.getPriority().ordinal());
            ++statSentPacketsOk;
         } catch (InterruptedException e) {
            ..
         }    // end of try-catch
     
            return true;
        }
————————————————
版权声明:本文为CSDN博主「jianfulovee」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/u013934901/java/article/details/84696050

标签:

上一篇: tigase源码分析6:了解xmpp协议
下一篇: tigase源码分析3:SocketThread

相关文章暂无相关
评论列表暂无评论
发表评论
验证码

«   2024年4月   »
1234567
891011121314
15161718192021
22232425262728
2930
控制面板
您好,欢迎到访网站!
  查看权限
网站分类
搜索
最新留言
    文章归档
    网站收藏
    友情链接
    • RainbowSoft Studio Z-Blog
    • 订阅本站的 RSS 2.0 新闻聚合
    ︿
    Top