MQTT与树莓派连接

为啥

元旦没事做

mqtt

发布订阅的模式,以tcp长连接进行。一般不能使用带电池的物联网设备,否则耗不起。
以topic的形式作为数据的分发体,将一条消息发布到一个主题(Topic)上,那所有订阅到该主题的客户端都会获取此消息,对于发送者,不用关注此topic被谁消费。

以moquette来做broke

<!-- https://mvnrepository.com/artifact/io.moquette/moquette-broker -->
<dependency>
    <groupId>io.moquette</groupId>
    <artifactId>moquette-broker</artifactId>
    <version>0.12</version>
</dependency>

实现代码

import io.moquette.broker.Server;
import io.moquette.broker.config.ClasspathResourceLoader;
import io.moquette.broker.config.IConfig;

import io.moquette.broker.config.IResourceLoader;
import io.moquette.broker.config.ResourceLoaderConfig;
import io.moquette.interception.InterceptHandler;
import jia.servier.Auth;
import jia.servier.Handler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class Patt {
    public static final Logger log=LoggerFactory.getLogger(Patt.class);
    public static void main(String[] args) {

        final Server mqttServer=new Server();

        IResourceLoader configFileResourceLoader = new ClasspathResourceLoader("config/moquette.conf");
        IConfig config=new ResourceLoaderConfig(configFileResourceLoader);

        System.out.println(config.toString());

        Auth auth=new Auth(); // 鉴权
        Handler handler=new Handler(); // 处理数据
        List<InterceptHandler> lists=new ArrayList<InterceptHandler>();
        lists.add(handler);
        try {
            mqttServer.startServer(config,lists,null,auth,null);
        } catch (IOException e) {
            e.printStackTrace();
        }

        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                mqttServer.stopServer();
                log.info("Moquette Server stopped");
            }
        });
    }
}

起一个Server,

  • 读取配置文件,在resources/config/moquette.conf 中
  • 鉴权: public class Auth implements IAuthenticator{},返回true表示通过,业务侧自己实现鉴权
  • 处理消息:public class Handler extends AbstractInterceptHandler{}

  • 启动mqttServer.startServer

处理消息

继承AbstractInterceptHandler类后实现了几个方法即可,
下面是对设备上报消息的一个处理,把bytebuf 转string 后可以自己随便处理。

@Override
    public void onPublish(InterceptPublishMessage msg) {
        super.onPublish(msg);
        logger.info("pubhlish");
        byte[] msbyte = new byte[msg.getPayload().capacity()];
        msg.getPayload().readBytes(msbyte);
        String msgg=new String(msbyte , Charset.forName("UTF-8"));
        logger.info("payload:{}",msgg);
        logger.info("topic:{}",msg.getTopicName());
    }

配置文件

moquette.conf

##############################################
#  Moquette configuration file.
#
#  The synthax is equals to mosquitto.conf
#
##############################################

port 1883

websocket_port 8080

host 0.0.0.0

password_file config/password_file.conf

#false to accept only client connetions with credentials
#true to accept client connection without credentails, validating only the one that provides
allow_anonymous true

#false to prohibit clients from connecting without a clientid.
#true to allow clients to connect without a clientid. One will be generated for them.
allow_zero_byte_client_id false

客户端

先用个网页版的
http://www.hivemq.com/demos/websocket-client/

服务端自己打了一些日志


评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注