android MQTT使用示例

打印 上一主题 下一主题

主题 906|帖子 906|积分 2718

1、导入依赖库:
  1. implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
  2. implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
复制代码
2、功能封装:
  1. package com.example.mqttdemo;
  2. import static android.app.Service.START_NOT_STICKY;
  3. import android.app.Service;
  4. import android.content.Context;
  5. import android.content.Intent;
  6. import android.net.ConnectivityManager;
  7. import android.net.NetworkInfo;
  8. import android.os.Handler;
  9. import android.os.IBinder;
  10. import android.util.Log;
  11. import androidx.annotation.Nullable;
  12. import org.eclipse.paho.android.service.MqttAndroidClient;
  13. import org.eclipse.paho.client.mqttv3.*;
  14. public class MyMqttService extends Service {
  15.     public final static String TAG = MyMqttService.class.getSimpleName();
  16.     public static MqttAndroidClient mqttAndroidClient;
  17.     private static MqttConnectOptions mMqttConnectOptions;
  18.     public static String HOST = "tcp://broker.hivemq.com:1883";//服务器地址(协议+地址+端口号)
  19.     public String USERNAME = "";//用户名
  20.     public String PASSWORD = "";//密码
  21.     public static String PUBLISH_TOPIC = "/d";//发布主题
  22.     public static String RESPONSE_TOPIC = "v";//响应主题
  23.     public String CLIENTID = "CLIENTID";//设备唯一标识
  24.     @Override
  25.     public int onStartCommand(Intent intent, int flags, int startId) {
  26.         init();
  27.         return START_NOT_STICKY;//非粘性的 service强制杀死后,不会尝试重新启动service
  28.     }
  29.     @Nullable
  30.     @Override
  31.     public IBinder onBind(Intent intent) {
  32.         return null;
  33.     }
  34.     /**
  35.      * 开启服务
  36.      */
  37.     public static void startService(Context mContext) {
  38.         mContext.startService(new Intent(mContext, MyMqttService.class));
  39.     }
  40.     /**
  41.      * 发布 (模拟其他客户端发布消息)
  42.      *
  43.      * @param message 消息
  44.      */
  45.     public static void publish(String message) {
  46.         String topic = PUBLISH_TOPIC;
  47.         Integer qos = 1;
  48.         Boolean retained = false;
  49.         try {
  50.             //参数分别为:主题、消息的字节数组、服务质量、是否在服务器保留断开连接后的最后一条消息
  51.             mqttAndroidClient.publish(topic, message.getBytes(), qos.intValue(), retained.booleanValue());
  52.         } catch (MqttException e) {
  53.             e.printStackTrace();
  54.         }
  55.     }
  56.     /**
  57.      * 响应 (收到其他客户端的消息后,响应给对方告知消息已到达或者消息有问题等)
  58.      *
  59.      * @param message 消息
  60.      */
  61.     public static void response(String message) {
  62.         String topic = RESPONSE_TOPIC;
  63.         Integer qos = 1;
  64.         Boolean retained = false;
  65.         try {
  66.             //参数分别为:主题、消息的字节数组、服务质量、是否在服务器保留断开连接后的最后一条消息
  67.             mqttAndroidClient.publish(topic, message.getBytes(), qos.intValue(), retained.booleanValue());
  68.         } catch (MqttException e) {
  69.             e.printStackTrace();
  70.         }
  71.     }
  72.     /**
  73.      * 初始化
  74.      */
  75.     private void init() {
  76.         String serverURI = HOST; //服务器地址(协议+地址+端口号)
  77.         Log.i(TAG, "初始化MQ" + serverURI);
  78.         if (mqttAndroidClient == null) {
  79.             mqttAndroidClient = new MqttAndroidClient(this, serverURI, CLIENTID);
  80.             mqttAndroidClient.setCallback(mqttCallback); //设置监听订阅消息的回调
  81.         }
  82.         if (mMqttConnectOptions == null) {
  83.             mMqttConnectOptions = new MqttConnectOptions();
  84.             mMqttConnectOptions.setCleanSession(true); //设置是否清除缓存
  85.             mMqttConnectOptions.setConnectionTimeout(10); //设置超时时间,单位:秒
  86.             mMqttConnectOptions.setKeepAliveInterval(20); //设置心跳包发送间隔,单位:秒
  87.             mMqttConnectOptions.setUserName(USERNAME); //设置用户名
  88.             mMqttConnectOptions.setPassword(PASSWORD.toCharArray()); //设置密码
  89.         }
  90.         // last will message
  91.         boolean doConnect = true;
  92.         String message = "{"terminal_uid":"" + CLIENTID + ""}";
  93.         String topic = PUBLISH_TOPIC;
  94.         Integer qos = 1;
  95.         Boolean retained = true;
  96.         if ((!message.equals("")) || (!topic.equals(""))) {
  97.             // 最后的遗嘱
  98.             try {
  99.                 mMqttConnectOptions.setWill(topic, message.getBytes(), qos.intValue(), retained.booleanValue());
  100.             } catch (Exception e) {
  101.                 Log.i(TAG, "Exception Occured");
  102.                 doConnect = false;
  103.                 iMqttActionListener.onFailure(null, e);
  104.             }
  105.         }
  106.         if (doConnect) {
  107.             doClientConnection();
  108.         }
  109.     }
  110.     /**
  111.      * 连接MQTT服务器
  112.      */
  113.     private static void doClientConnection() {
  114.         try {
  115.             if (!mqttAndroidClient.isConnected() && isConnectIsNomarl()) {
  116.                 Log.i(TAG, "连接MQTT服务器" + HOST);
  117.                 mqttAndroidClient.connect(mMqttConnectOptions, null, iMqttActionListener);
  118.             }
  119.         } catch (Exception e) {
  120.             e.printStackTrace();
  121.         }
  122.     }
  123.     /**
  124.      * 判断网络是否连接
  125.      */
  126.     private static boolean isConnectIsNomarl() {
  127.         return true;
  128. //        ConnectivityManager connectivityManager = (ConnectivityManager) BaseApplication.getInstance().getSystemService(Context.CONNECTIVITY_SERVICE);
  129. //        NetworkInfo info = connectivityManager.getActiveNetworkInfo();
  130. //        if (info != null && info.isAvailable()) {
  131. //            String name = info.getTypeName();
  132. //            Log.i(TAG, "当前网络名称:" + name);
  133. //            return true;
  134. //        } else {
  135. //            Log.i(TAG, "没有可用网络");
  136. //            /*没有可用网络的时候,延迟3秒再尝试重连*/
  137. //            new Handler().postDelayed(new Runnable() {
  138. //                @Override
  139. //                public void run() {
  140. //                    Log.i(TAG, "没有可用网络doClientConnection");
  141. //                    doClientConnection();
  142. //                }
  143. //            }, 3000);
  144. //            return false;
  145. //        }
  146.     }
  147.     //MQTT是否连接成功的监听
  148.     private static IMqttActionListener iMqttActionListener = new IMqttActionListener() {
  149.         @Override
  150.         public void onSuccess(IMqttToken arg0) {
  151.             Log.i(TAG, "连接成功 " + HOST);
  152.             try {
  153.                 if (mqttAndroidClient != null) {
  154.                     mqttAndroidClient.subscribe(PUBLISH_TOPIC, 1);//订阅主题,参数:主题、服务质量
  155.                 }
  156.             } catch (Exception e) {
  157.                 e.printStackTrace();
  158.             }
  159.         }
  160.         @Override
  161.         public void onFailure(IMqttToken arg0, Throwable arg1) {
  162.             arg1.printStackTrace();
  163.             Log.i(TAG, "连接失败 ");
  164.             doClientConnection();//连接失败,重连(可关闭服务器进行模拟)
  165.         }
  166.     };
  167.     //订阅主题的回调
  168.     private MqttCallback mqttCallback = new MqttCallback() {
  169.         @Override
  170.         public void messageArrived(String topic, MqttMessage msgStr) throws Exception {
  171.             try {
  172.                 String enCodeMsg = new String(msgStr.getPayload());
  173.                 Log.i(TAG, "收到消息: " + enCodeMsg);
  174.                 //收到消息,这里弹出Toast表示。如果需要更新UI,可以使用广播或者EventBus进行发送
  175.                 //收到其他客户端的消息后,响应给对方告知消息已到达或者消息有问题等
  176.                 response("message arrived");
  177.             } catch (Exception e) {
  178.                 e.printStackTrace();
  179.             }
  180.         }
  181.         @Override
  182.         public void deliveryComplete(IMqttDeliveryToken arg0) {
  183.         }
  184.         @Override
  185.         public void connectionLost(Throwable arg0) {
  186.             Log.i(TAG, "连接断开 ");
  187.             doClientConnection();//连接断开,重连
  188.         }
  189.     };
  190.     public static void disconnect(Context context) {
  191.         try {
  192.             if (mqttAndroidClient != null && mqttAndroidClient.isConnected()) {
  193.                 mqttAndroidClient.unsubscribe(PUBLISH_TOPIC);
  194.                 mqttAndroidClient.unregisterResources();
  195.                 mqttAndroidClient.disconnect(0); //断开连接
  196.                 mqttAndroidClient = null;
  197.                 context.stopService(new Intent(context, MyMqttService.class));
  198.             }
  199.         } catch (Exception e) {
  200.             e.printStackTrace();
  201.         }
  202.     }
  203. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

我可以不吃啊

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表