Emqq发布/订阅消息,连接集群多个节点

1、添加依赖

<dependency>
   <groupId>org.eclipse.paho</groupId>
   <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
   <version>1.1.0</version>
</dependency>

2、具体demo,里面使用了一个callback

public class EmqqClient {
    public static void main(String[] args) {
        EmqqClient.sendMsg("client/demo", "来自demo");
    }

    /**
     *
     * @param sendTopic 话题
     * @param content 发送的内容
     * @return
     */
    public static Boolean sendMsg(String sendTopic, String content) {
        int qos = 2;
        String serverHost = "tcp://45.78.4.000:1883"; //服务器主机
       //服务器主机列表,可以是集群中所有节点
        String hosts []= {"tcp://45.78.4.000:1883","tcp://emq.vsalw.com:1883"}; 
        MemoryPersistence persistence = new MemoryPersistence();
        try {
            MqttClient sampleClient = new MqttClient(serverHost, "server", persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);//会话保持,是否以新用户身份链接
            connOpts.setUserName("xxx");//客户端用户名
            connOpts.setPassword("xxxx".toCharArray());
            //设置会话心跳时间 秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
            connOpts.setKeepAliveInterval(20);
            connOpts.setWill("client/close", "已经关闭".getBytes(), 0, true);//断开链接的时候发消息通知到某个topic
           //此处设置了连接多个节点,一个节点挂掉会自动切换到其他节点,当节点恢复又会恢复连接原节点
           //注意在程序启动的时候,指定的一个serverHosts必须指定一个节点,如果此节点不可用,会自动连接其他可用节点            
            connOpts.setServerURLs(hosts);
            sampleClient.connect(connOpts);
            //自己订阅自己 可以订阅多个
            sampleClient.subscribe("client/#");//订阅的topic
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            //只能接收到最新消息,如果需要接收全部消息,发生到 clinet/update/msg/系统时间 
            //接收端接收 clinet/update/msg/# 通配符接收所有
            message.setRetained(true); //离线期间的消息,上线后可以再次接收到,只接收最新的一条信息
            sampleClient.publish(sendTopic, message); //发布消息
            sampleClient.setCallback(new PushCallback());//设置接受到消息处理类
             } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
            return false;
        }
        return true;
    }

    /**
     * 三种消息传输方式QoS:
     0代表“至多一次”,消息发布完全依赖底层 TCP/IP 协议。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,
     丢失一次读记录无所谓,因为不久后还会有第二次发送。
     1代表“至少一次”,确保消息到达,但消息重复可能会发生。
     2代表“只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
     备注:由于服务端采用Mosca实现,Mosca目前只支持到QoS 1
     */
}




//下面是一个类,PushCallback ,定义了几种对消息的处理,比如接收到消息,发送成功后,还是连接丢失时候。
public class PushCallback implements MqttCallback {
    @Override
    public void connectionLost(Throwable throwable) {
        System.out.println("链接丢失时候调用");
    }

    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
        System.out.println("接收到消息主题:"+s);
        System.out.println("接收到消息内容:"+new String(mqttMessage.getPayload()));
        System.out.println("接收到消息是否保留消息:"+mqttMessage.isRetained());
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        System.out.println("消息发生成功后调用");
    }
}

demo下载:  configProperties

小站主要是个人在开发过程中遇到的问题,解决方案的记录,与君分享。
vsalw技术博客 » Emqq发布/订阅消息,连接集群多个节点

提供最优质的资源集合

立即查看 了解详情