为啥
元旦没事做
mqtt
发布订阅的模式,以tcp长连接进行。一般不能使用带电池的物联网设备,否则耗不起。
以topic的形式作为数据的分发体,将一条消息发布到一个主题(Topic)上,那所有订阅到该主题的客户端都会获取此消息,对于发送者,不用关注此topic被谁消费。
以moquette来做broke
<!-- https://mvnrepository.com/artifact/io.moquette/moquette-broker -->
<dependency>
<groupId>io.moquette</groupId>
<artifactId>moquette-broker</artifactId>
<version>0.12</version>
</dependency>
实现代码
import io.moquette.broker.Server;
import io.moquette.broker.config.ClasspathResourceLoader;
import io.moquette.broker.config.IConfig;
import io.moquette.broker.config.IResourceLoader;
import io.moquette.broker.config.ResourceLoaderConfig;
import io.moquette.interception.InterceptHandler;
import jia.servier.Auth;
import jia.servier.Handler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class Patt {
public static final Logger log=LoggerFactory.getLogger(Patt.class);
public static void main(String[] args) {
final Server mqttServer=new Server();
IResourceLoader configFileResourceLoader = new ClasspathResourceLoader("config/moquette.conf");
IConfig config=new ResourceLoaderConfig(configFileResourceLoader);
System.out.println(config.toString());
Auth auth=new Auth(); // 鉴权
Handler handler=new Handler(); // 处理数据
List<InterceptHandler> lists=new ArrayList<InterceptHandler>();
lists.add(handler);
try {
mqttServer.startServer(config,lists,null,auth,null);
} catch (IOException e) {
e.printStackTrace();
}
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
mqttServer.stopServer();
log.info("Moquette Server stopped");
}
});
}
}
起一个Server,
- 读取配置文件,在resources/config/moquette.conf 中
- 鉴权: public class Auth implements IAuthenticator{},返回true表示通过,业务侧自己实现鉴权
- 处理消息:public class Handler extends AbstractInterceptHandler{}
-
启动mqttServer.startServer
处理消息
继承AbstractInterceptHandler类后实现了几个方法即可,
下面是对设备上报消息的一个处理,把bytebuf 转string 后可以自己随便处理。
@Override
public void onPublish(InterceptPublishMessage msg) {
super.onPublish(msg);
logger.info("pubhlish");
byte[] msbyte = new byte[msg.getPayload().capacity()];
msg.getPayload().readBytes(msbyte);
String msgg=new String(msbyte , Charset.forName("UTF-8"));
logger.info("payload:{}",msgg);
logger.info("topic:{}",msg.getTopicName());
}
配置文件
moquette.conf
##############################################
# Moquette configuration file.
#
# The synthax is equals to mosquitto.conf
#
##############################################
port 1883
websocket_port 8080
host 0.0.0.0
password_file config/password_file.conf
#false to accept only client connetions with credentials
#true to accept client connection without credentails, validating only the one that provides
allow_anonymous true
#false to prohibit clients from connecting without a clientid.
#true to allow clients to connect without a clientid. One will be generated for them.
allow_zero_byte_client_id false
客户端
先用个网页版的
http://www.hivemq.com/demos/websocket-client/
发表回复