moquette 下面的netty

moquette

使用 netty

pipeline.addFirst("idleStateHandler", new IdleStateHandler(nettyChannelTimeoutSeconds, 0, 0));
pipeline.addAfter("idleStateHandler", "idleEventHandler", new IdleHandler());
// pipeline.addLast("logger", new LoggingHandler("Netty", LogLevel.ERROR));
pipeline.addFirst("bytemetrics", new BytesMetricsHandler(m_bytesMetricsCollector));
pipeline.addFirst("cnamedocoder", new CnameHandler());
pipeline.addLast("decoder", new MqttDecoder(BrokerConstants.MAX_MESSAGE_SIZE));
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
pipeline.addLast("metrics", new MessageMetricsHandler(m_metricsCollector));
pipeline.addLast("messageLogger", new MQTTMessageLogger());
if (metrics.isPresent()) {
    pipeline.addLast("wizardMetrics", metrics.get());
}
pipeline.addLast("MqttMsgFlowCtrlHandler", new MqttMsgFlowCtrlHandler());
pipeline.addLast("ChannelTrafficControlHandler", new ChannelTrafficControlHandler());
pipeline.addLast("handler", new NettyMQTTHandler());

在一个channel建立后,

  • MqttDecoder: 使用nettycodec 来对tcp的playload 进行解码

  • IdleStateHandler: 使用心跳事件IdleStateHandler 来定时触发没有发送心跳的channel,
    然后使用idleStateHandler 来关闭channel. 实现mqtt 一段时间无心跳就主动断链的

public class IdleHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if(evt instanceof IdleStateEvent){
            IdleStateEvent event=(IdleStateEvent) evt;
            System.out.println("ddddddddddd");
            if(event.state() == IdleState.READER_IDLE){
                System.out.println("关闭"+ctx.channel().id());
                ctx.close();
                ctx.channel().close();
            }
        }
        super.userEventTriggered(ctx, evt);
    }
}
  • NettyMQTTHandler: 进行mqtt业务处理的

MqttDecoder,MqttEncoder

属于codec框架里面的一个解码器

    · 将字节解码为消息 - ByteToMessageDecoder和ReplayingDecoder。

    · 将一个消息类型解码为另一个 - MessageToMessageDecoder。

io.netty.handler.codec.mqtt.MqttDecoder 使用的是原生的netty自带的解码器,除此以外netty还有http,

对于这个对tcp的码流进行解码,按照mqtt3.1.1 进行解码,生成MqttMessage,传递下去,

NettyMQTTHandler

对解码后的message进行处理

  • CONNECT: 主要是鉴权,保存会话session和初始化心跳handler,
    • 鉴权:开放IAuthenticator 接口,初始化的时候添加进去即可
    • 保存会话: 鉴权成功后会去查看当前的clientid是否有会话了,bindToExistingSession如果有就会把以前的会话channel断掉,存储新的会话
      SessionRegistry 是用于存储会话的类,有两个内部存储的,
      >>在 !pool.containsKey(clientId) 后又使用了pool.putIfAbsent 来判断是否已有session,这个就是防止设备多次connect导致session建立过多的情况
    ConcurrentMap<String, Session> pool = new ConcurrentHashMap<>();//session保存的
    ConcurrentMap<String, Queue<SessionRegistry.EnqueuedMessage>> queues = new ConcurrentHashMap<>();// 保存的推送的消息
    
    void bindToSession(MQTTConnection mqttConnection, MqttConnectMessage msg, String clientId) {
           boolean isSessionAlreadyStored = false;
           PostConnectAction postConnectAction = PostConnectAction.NONE;
           if (!pool.containsKey(clientId)) {
               // case 1
               final Session newSession = createNewSession(mqttConnection, msg, clientId);
    
               // publish the session , 这里就是体现了多次控制的,
               final Session previous = pool.putIfAbsent(clientId, newSession);
               final boolean success = previous == null;
    
               if (success) {
                   LOG.trace("case 1, not existing session with CId {}", clientId);
               } else {
                   postConnectAction = bindToExistingSession(mqttConnection, msg, clientId, newSession);
                   isSessionAlreadyStored = true;
               }
           } else {
               final Session newSession = createNewSession(mqttConnection, msg, clientId);
               postConnectAction = bindToExistingSession(mqttConnection, msg, clientId, newSession);
               isSessionAlreadyStored = true;
           }
           final boolean msgCleanSessionFlag = msg.variableHeader().isCleanSession();
           boolean isSessionAlreadyPresent = !msgCleanSessionFlag && isSessionAlreadyStored;
           mqttConnection.sendConnAck(isSessionAlreadyPresent);
    
           if (postConnectAction == PostConnectAction.SEND_STORED_MESSAGES) {
               final Session session = pool.get(clientId);
               session.sendQueuedMessagesWhileOffline();
           }
       }
    
    • 所以connect 有四种场景,session状态有四种:CONNECTED, CONNECTING, DISCONNECTING, DISCONNECTED
    1. 会话是新的,老会话不存在: 往map里面加session就行了
    2. 老回话存在,cleanSession为true且会话已断开: 清除缓存的待发送的消息,清除订阅topic,往旧的session里面塞连接信息,pool.replace(clientId, oldSession, oldSession);
    3. 老回话存在,cleanSession为false且会话已断开: 不清理,塞连接信息
    4. 老回话存在,会话还连着: closeImmediately 强制旧的session的channel断开,替换新的pool.replace(clientId, oldSession, newSession);
    • 添加心跳IdleStateHandler 值是keepAlive * 1.5f, channel初始化的时候其实添加了一个默认时间的,建链完成后会删掉以前的,然后添加这个,主要是因为如果建链不成功,也要主动断开channel,防止攻击
    • 把channel和clientid绑定 NettyUtils.clientID(channel, clientId);
  • PINGREQ: channel的心跳read的定时器就会被刷新,
    MqttFixedHeader pingHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, AT_MOST_ONCE,
                                                    false, 0);
    MqttMessage pingResp = new MqttMessage(pingHeader);
    channel.writeAndFlush(pingResp).addListener(CLOSE_ON_FAILURE);
    break
    
  • DISCONNECT: channel.close().addListener(FIRE_EXCEPTION_ON_FAILURE);

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注