moquette
使用 netty
pipeline.addFirst("idleStateHandler", new IdleStateHandler(nettyChannelTimeoutSeconds, 0, 0));
pipeline.addAfter("idleStateHandler", "idleEventHandler", new IdleHandler());
// pipeline.addLast("logger", new LoggingHandler("Netty", LogLevel.ERROR));
pipeline.addFirst("bytemetrics", new BytesMetricsHandler(m_bytesMetricsCollector));
pipeline.addFirst("cnamedocoder", new CnameHandler());
pipeline.addLast("decoder", new MqttDecoder(BrokerConstants.MAX_MESSAGE_SIZE));
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
pipeline.addLast("metrics", new MessageMetricsHandler(m_metricsCollector));
pipeline.addLast("messageLogger", new MQTTMessageLogger());
if (metrics.isPresent()) {
pipeline.addLast("wizardMetrics", metrics.get());
}
pipeline.addLast("MqttMsgFlowCtrlHandler", new MqttMsgFlowCtrlHandler());
pipeline.addLast("ChannelTrafficControlHandler", new ChannelTrafficControlHandler());
pipeline.addLast("handler", new NettyMQTTHandler());
在一个channel建立后,
- MqttDecoder: 使用nettycodec 来对tcp的playload 进行解码
-
IdleStateHandler: 使用心跳事件IdleStateHandler 来定时触发没有发送心跳的channel,
然后使用idleStateHandler 来关闭channel. 实现mqtt 一段时间无心跳就主动断链的
public class IdleHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent){
IdleStateEvent event=(IdleStateEvent) evt;
System.out.println("ddddddddddd");
if(event.state() == IdleState.READER_IDLE){
System.out.println("关闭"+ctx.channel().id());
ctx.close();
ctx.channel().close();
}
}
super.userEventTriggered(ctx, evt);
}
}
- NettyMQTTHandler: 进行mqtt业务处理的
MqttDecoder,MqttEncoder
属于codec框架里面的一个解码器
· 将字节解码为消息 - ByteToMessageDecoder和ReplayingDecoder。
· 将一个消息类型解码为另一个 - MessageToMessageDecoder。
io.netty.handler.codec.mqtt.MqttDecoder 使用的是原生的netty自带的解码器,除此以外netty还有http,
对于这个对tcp的码流进行解码,按照mqtt3.1.1 进行解码,生成MqttMessage,传递下去,
NettyMQTTHandler
对解码后的message进行处理
- CONNECT: 主要是鉴权,保存会话session和初始化心跳handler,
- 鉴权:开放IAuthenticator 接口,初始化的时候添加进去即可
- 保存会话: 鉴权成功后会去查看当前的clientid是否有会话了,bindToExistingSession如果有就会把以前的会话channel断掉,存储新的会话
SessionRegistry 是用于存储会话的类,有两个内部存储的,
>>在 !pool.containsKey(clientId) 后又使用了pool.putIfAbsent 来判断是否已有session,这个就是防止设备多次connect导致session建立过多的情况
ConcurrentMap<String, Session> pool = new ConcurrentHashMap<>();//session保存的 ConcurrentMap<String, Queue<SessionRegistry.EnqueuedMessage>> queues = new ConcurrentHashMap<>();// 保存的推送的消息 void bindToSession(MQTTConnection mqttConnection, MqttConnectMessage msg, String clientId) { boolean isSessionAlreadyStored = false; PostConnectAction postConnectAction = PostConnectAction.NONE; if (!pool.containsKey(clientId)) { // case 1 final Session newSession = createNewSession(mqttConnection, msg, clientId); // publish the session , 这里就是体现了多次控制的, final Session previous = pool.putIfAbsent(clientId, newSession); final boolean success = previous == null; if (success) { LOG.trace("case 1, not existing session with CId {}", clientId); } else { postConnectAction = bindToExistingSession(mqttConnection, msg, clientId, newSession); isSessionAlreadyStored = true; } } else { final Session newSession = createNewSession(mqttConnection, msg, clientId); postConnectAction = bindToExistingSession(mqttConnection, msg, clientId, newSession); isSessionAlreadyStored = true; } final boolean msgCleanSessionFlag = msg.variableHeader().isCleanSession(); boolean isSessionAlreadyPresent = !msgCleanSessionFlag && isSessionAlreadyStored; mqttConnection.sendConnAck(isSessionAlreadyPresent); if (postConnectAction == PostConnectAction.SEND_STORED_MESSAGES) { final Session session = pool.get(clientId); session.sendQueuedMessagesWhileOffline(); } }
- 所以connect 有四种场景,session状态有四种:CONNECTED, CONNECTING, DISCONNECTING, DISCONNECTED
- 会话是新的,老会话不存在: 往map里面加session就行了
- 老回话存在,cleanSession为true且会话已断开: 清除缓存的待发送的消息,清除订阅topic,往旧的session里面塞连接信息,
pool.replace(clientId, oldSession, oldSession);
- 老回话存在,cleanSession为false且会话已断开: 不清理,塞连接信息
- 老回话存在,会话还连着: closeImmediately 强制旧的session的channel断开,替换新的
pool.replace(clientId, oldSession, newSession);
- 添加心跳IdleStateHandler 值是
keepAlive * 1.5f
, channel初始化的时候其实添加了一个默认时间的,建链完成后会删掉以前的,然后添加这个,主要是因为如果建链不成功,也要主动断开channel,防止攻击 - 把channel和clientid绑定
NettyUtils.clientID(channel, clientId);
- PINGREQ: channel的心跳read的定时器就会被刷新,
MqttFixedHeader pingHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, AT_MOST_ONCE, false, 0); MqttMessage pingResp = new MqttMessage(pingHeader); channel.writeAndFlush(pingResp).addListener(CLOSE_ON_FAILURE); break
- DISCONNECT: channel.close().addListener(FIRE_EXCEPTION_ON_FAILURE);
发表回复