Android平台MQTT使用并实现简单聊天室

在上一篇文章介绍了MQTT的原理以及其在NodeMCU上运用实现一些简单的物联网
http://www.jianshu.com/p/701f4d31029f
但没讲Android APP怎么实现的,那这篇将介绍MQTT协议在Android平台的运用,除了可以用来控制智能硬件,还常实现消息推送和即时通讯im。下面的例子用它来实现一个简单的聊天室

Android中使用MQTT协议

添加依赖

在gradle中添加依赖,引入相应的库

1
2
3
4
5
dependencies {
...
compile 'commons-codec:commons-codec:1.5'
compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.0.2'
}

权限声明

1
2
3
<uses-permission android:name="android.permission.INTERNET"/>
<uses-permission android:name="android.permission.WAKE_LOCK"/>
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE"/>

这些权限不是危险权限,不用运行时申请

MQTT支持类

MqttAsyncClient是mqtt支持类,创建时要要传入服务器host,port以及设备标识clientID(不同设备id不能相同)

1
2
MqttAsyncClient mqttClient=new MqttAsyncClient("tcp://"+this.host+":"+this.port ,
"ClientID"+this.clientID, new MemoryPersistence());

接着连接服务器,并对连接状态进行监听

1
2
3
4
5
6
7
8
9
10
11
12
//注意接口回调都是在非主线程,不能直接进行ui操作
mqttClient.connect(getOptions(), null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
//连接成功
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
//连接失败
}
});

getOptions()方法是对连接信息进行配置,如用户名,密码(一般服务器内不设)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 设置Mqtt的连接信息
*/
private MqttConnectOptions getOptions(){
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);//重连不保持状态
if(this.userID!=null && this.userID.length()>0 && this.passWord!=null && this.passWord.length()>0){
options.setUserName(this.userID);//设置服务器账号密码
options.setPassword(this.passWord.toCharArray());
}
options.setConnectionTimeout(10);//设置连接超时时间
options.setKeepAliveInterval(30);//设置保持活动时间,超过时间没有消息收发将会触发ping消息确认
return options;
}

最后对消息及连接进行监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//注意接口回调都是在非主线程,不能直接进行ui操作
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
//丢失连接
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
//接到推送消息
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
//发送消息成功到达
}
});

获取并配置好MqttAsyncClient 后就可以调用相应方法订阅Topic接收消息
subscribe(String topicFilter, int qos)
发送某个Topic的消息
publish(String topic, byte[] payload, int qos, boolean retained)

封装

为了方便使用,对相关操作进行封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
/**
* Author:LvQingYang
* Date:2017/8/29
* Email:biloba12345@gamil.com
* Github:https://github.com/biloba123
* Info:MQTT操作类
*/
public class MyMqtt {
private String TAG = "MyMqtt";
/**MQTT配置参数**/
private static String host = "****************";
private static String port = "*****";
private static String userID = "";
private static String passWord = "";
private static String clientID = UUID.randomUUID().toString();
/**MQTT状态信息**/
private boolean isConnect = false;
/**MQTT支持类**/
private MqttAsyncClient mqttClient=null;
private MqttListener mMqttListener;
private Handler mHandler=new Handler(new Handler.Callback() {
@Override
public boolean handleMessage(Message message) {
switch (message.arg1) {
case MqttTag.MQTT_STATE_CONNECTED:
if (BuildConfig.DEBUG) Log.d(TAG, "handleMessage: connected");
mMqttListener.onConnected();
break;
case MqttTag.MQTT_STATE_FAIL:
if (BuildConfig.DEBUG) Log.d(TAG, "handleMessage: fail");
mMqttListener.onFail();
break;
case MqttTag.MQTT_STATE_LOST:
if (BuildConfig.DEBUG) Log.d(TAG, "handleMessage: lost");
mMqttListener.onLost();
break;
case MqttTag.MQTT_STATE_RECEIVE:
if (BuildConfig.DEBUG) Log.d(TAG, "handleMessage: receive");
mMqttListener.onReceive((String) message.obj);
break;
case MqttTag.MQTT_STATE_SEND_SUCC:
if (BuildConfig.DEBUG) Log.d(TAG, "handleMessage: send");
mMqttListener.onSendSucc();
break;
}
return true;
}
});
/**
* 自带的监听类,判断Mqtt活动变化
*/
private IMqttActionListener mIMqttActionListener=new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
isConnect=true;
Message msg=new Message();
msg.arg1=MqttTag.MQTT_STATE_CONNECTED;
mHandler.sendMessage(msg);
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
isConnect=false;
Message msg=new Message();
msg.arg1=MqttTag.MQTT_STATE_FAIL;
mHandler.sendMessage(msg);
}
};
/**
* 自带的监听回传类
*/
private MqttCallback mMqttCallback=new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
isConnect=false;
Message msg=new Message();
msg.arg1=MqttTag.MQTT_STATE_LOST;
mHandler.sendMessage(msg);
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Message msg=new Message();
msg.arg1=MqttTag.MQTT_STATE_RECEIVE;
msg.obj=new String(message.getPayload());
mHandler.sendMessage(msg);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
Message msg=new Message();
msg.arg1=MqttTag.MQTT_STATE_SEND_SUCC;
mHandler.sendMessage(msg);
}
};
public MyMqtt(MqttListener lis){
mMqttListener=lis;
}
public static void setMqttSetting(String host, String port, String userID, String passWord, String clientID){
MyMqtt.host = host;
MyMqtt.port = port;
MyMqtt.userID = userID;
MyMqtt.passWord = passWord;
MyMqtt.clientID = clientID;
}
/**
* 进行Mqtt连接
*/
public void connectMqtt(){
try {
mqttClient=new MqttAsyncClient("tcp://"+this.host+":"+this.port ,
"ClientID"+this.clientID, new MemoryPersistence());
mqttClient.connect(getOptions(), null, mIMqttActionListener);
mqttClient.setCallback(mMqttCallback);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 断开Mqtt连接重新连接
*/
public void reStartMqtt(){
disConnectMqtt();
connectMqtt();
}
/**
* 断开Mqtt连接
*/
public void disConnectMqtt(){
try {
mqttClient.disconnect();
mqttClient = null;
isConnect = false;
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 向Mqtt服务器发送数据
*/
public void pubMsg(String Topic, String Msg, int Qos){
if(!isConnect){
Log.d(TAG,"Mqtt连接未打开");
return;
}
try {
/** Topic,Msg,Qos,Retained**/
mqttClient.publish(Topic,Msg.getBytes(),Qos,false);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 向Mqtt服务器发送数据
*/
public void pubMsg(String Topic, byte[] Msg, int Qos){
if(!isConnect){
Log.d(TAG,"Mqtt连接未打开");
return;
}
try {
/** Topic,Msg,Qos,Retained**/
mqttClient.publish(Topic,Msg,Qos,false);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 向Mqtt服务器订阅某一个Topic
*/
public void subTopic(String Topic, int Qos){
if(!isConnect){
Log.d(TAG,"Mqtt连接未打开");
return;
}
try {
mqttClient.subscribe(Topic,Qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 设置Mqtt的连接信息
*/
private MqttConnectOptions getOptions(){
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);//重连不保持状态
if(this.userID!=null&&this.userID.length()>0&&this.passWord!=null&&this.passWord.length()>0){
options.setUserName(this.userID);//设置服务器账号密码
options.setPassword(this.passWord.toCharArray());
}
options.setConnectionTimeout(10);//设置连接超时时间
options.setKeepAliveInterval(30);//设置保持活动时间,超过时间没有消息收发将会触发ping消息确认
return options;
}
public boolean isConnect() {
return isConnect;
}
}

相关标识MqttTag

1
2
3
4
5
6
7
public class MqttTag {
public final static int MQTT_STATE_CONNECTED=1;
public final static int MQTT_STATE_LOST=2;
public final static int MQTT_STATE_FAIL=3;
public final static int MQTT_STATE_RECEIVE=4;
public final static int MQTT_STATE_SEND_SUCC=5;
}

接口MqttListener

1
2
3
4
5
6
7
interface MqttListener {
void onConnected();//连接成功
void onFail();//连接失败
void onLost();//丢失连接
void onReceive(String message);//接收到消息
void onSendSucc();//消息发送成功
}

运用观察者模式,创建一个Service后台监听相关状态(写的不太规范…)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
public class MqttService extends Service implements MqttListener {
private static MyMqtt mMyMqtt;
private static List<MqttListener> mMqttListenerList=new ArrayList<>();
public static void start(Context context) {
Intent starter = new Intent(context, MqttService.class);
context.startService(starter);
}
public static void stop(Context context) {
Intent starter = new Intent(context, MqttService.class);
context.stopService(starter);
}
@Override
public void onCreate() {
super.onCreate();
if (mMyMqtt==null) {
mMyMqtt=new MyMqtt(this);
}
mMyMqtt.connectMqtt();
}
@Override
public IBinder onBind(Intent intent) {
// TODO: Return the communication channel to the service.
throw new UnsupportedOperationException("Not yet implemented");
}
@Override
public void onDestroy() {
super.onDestroy();
mMyMqtt.disConnectMqtt();
mMyMqtt=null;
mMqttListenerList.clear();
}
public static MyMqtt getMyMqtt(){
return mMyMqtt;
}
public static void addMqttListener(MqttListener listener){
if (!mMqttListenerList.contains(listener)) {
mMqttListenerList.add(listener);
}
}
public static void removeMqttListener(MqttListener listener){
mMqttListenerList.remove(listener);
}
@Override
public void onConnected() {
for (MqttListener mqttListener : mMqttListenerList) {
mqttListener.onConnected();
}
}
@Override
public void onFail() {
new Thread(new Runnable() {
@Override
public void run() {
mMyMqtt.connectMqtt();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
for (MqttListener mqttListener : mMqttListenerList) {
mqttListener.onFail();
}
}
@Override
public void onLost() {
new Thread(new Runnable() {
@Override
public void run() {
mMyMqtt.connectMqtt();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
for (MqttListener mqttListener : mMqttListenerList) {
mqttListener.onLost();
}
}
@Override
public void onReceive(String message) {
for (MqttListener mqttListener : mMqttListenerList) {
mqttListener.onReceive(message);
}
}
@Override
public void onSendSucc() {
for (MqttListener mqttListener : mMqttListenerList) {
mqttListener.onSendSucc();
}
}
}

简单的聊天室

源码https://github.com/biloba123/ChatRoom
开源仓库里还有些其他mqtt示例

如果文章对你有帮助,给我些鼓励吧!