前言
在物联网(IoT)开发中,MQTT协议因其轻量级、低带宽占用和高可靠性,成为了设备通信的首选协议。在Web端或Node.js环境中,开发者通常直接使用成熟的 mqtt.js 库即可轻松实现连接。然而,当我们将目光转向微信小程序或uni-app小程序端时,情况变得复杂起来。
微信小程序的运行环境(Mini Program Environment)有着严格的限制:
不支持TCP Socket:小程序无法直接建立TCP连接,必须通过WebSocket (wss://) 进行通信。
API差异:小程序使用 uni.connectSocket (或 wx.connectSocket) 而非标准的 WebSocket API,且对二进制数据处理(ArrayBuffer)有特定要求。
依赖限制:mqtt.js 默认依赖 Node.js 的 net 模块或浏览器的标准 WebSocket,直接引入往往会导致打包失败或运行时报错(如 process is not defined 或 WebSocket is not a constructor)。
虽然社区存在一些适配方案(如使用 mqtt/dist/mqtt.min.js 并手动注入 WebSocket 实现),但在处理二进制协议解析、心跳保活以及断线重连时,往往显得臃肿且难以调试。
本文将介绍一种轻量级、原生适配的解决方案:基于 uni-app 的 connectSocket API,从零封装一个专为小程序设计的 MQTT 客户端类 WechatMqttClient。它不依赖任何第三方重型库,完美支持 MQTT 3.1.1 协议,解决了二进制数据收发、UTF-8编码兼容及自动重连等核心痛点。
核心难点分析
在封装之前,我们需要明确小程序端实现 MQTT 的几个关键挑战:
协议包构建:MQTT是基于二进制的协议。我们需要手动构建 Fixed Header(固定头部)、Variable Header(可变头部)和 Payload(载荷)。特别是剩余长度(Remaining Length)的变长编码算法,是容易出错的地方。
字符编码:MQTT协议规定主题(Topic)和客户端ID(ClientID)必须使用 UTF-8 编码。JavaScript 字符串内部是 UTF-16,直接转换字节会导致中文乱码或协议解析失败。我们需要手写 UTF-8 编解码器。
心跳机制:小程序网络环境不稳定,且WebSocket连接在无数据传输时可能被运营商或系统切断。必须在应用层实现基于 PINGREQ/PINGRESP 的心跳检测。
数据流处理:uni.connectSocket 返回的是 ArrayBuffer,需要将其转换为 Uint8Array 进行位运算解析。
解决方案:WechatMqttClient 类设计
我们设计了一个名为 WechatMqttClient 的类,它屏蔽了底层的二进制操作,对外提供标准的 connect, publish, subscribe, on 等事件驱动接口。
- 核心架构
该类主要包含以下模块:
连接管理:封装 uni.connectSocket,处理连接建立、关闭和错误监听。
协议编解码:实现 MQTT 3.1.1 的 CONNECT, PUBLISH, SUBSCRIBE, UNSUBSCRIBE, PINGREQ 等报文的构建与解析。
UTF-8 工具集:独立的 encodeString 和 decodeUtf8 方法,确保多语言字符集正确传输。
心跳与重连:内置指数退避算法的重连机制和定时心跳发送。
事件总线:简单的发布订阅模式,用于通知上层业务逻辑(如 connect, message, error)。 - 关键代码实现解析
A. 建立 WebSocket 连接
不同于标准 WebSocket,我们使用 uni-app 的 API,并指定子协议为 mqtt。
connect() {
// ... 验证URL逻辑 ...
this.socketTask = uni.connectSocket({
url: this.url,
protocols: ['mqtt'], // 重要:告知服务器这是MQTT协议
success: (res) => console.log('WS创建成功', res),
fail: (error) => this.emit('error', error)
});
this.socketTask.onOpen(() => {
// 连接打开后,立即发送 MQTT CONNECT 报文
this.sendMqttConnectPacket();
});
this.socketTask.onMessage((res) => {
// 接收二进制数据并解析
this.handleMqttPacket(res.data);
});
// ... 监听 close 和 error 以触发重连 ...
}
B. 构建 MQTT CONNECT 报文
这是握手的关键。我们需要严格按照 MQTT 3.1.1 规范组装字节流。
sendMqttConnectPacket() {
// 1. 准备 Payload (ClientID, Username, Password)
let payload = [];
const clientId = this.options.clientId || 'client_' + Date.now();
payload = payload.concat(this.encodeString(clientId));
if (this.options.username) payload = payload.concat(this.encodeString(this.options.username));
if (this.options.password) payload = payload.concat(this.encodeString(this.options.password));
// 2. 准备 Variable Header (Protocol Name, Level, Flags, KeepAlive)
let variableHeader = [];
variableHeader = variableHeader.concat(this.encodeString('MQTT'));
variableHeader.push(4); // Protocol Level 3.1.1
// Connect Flags: CleanSession(0x02), Username(0x80), Password(0x40)
let connectFlags = 0x02;
if (this.options.username) connectFlags |= 0x80;
if (this.options.password) connectFlags |= 0x40;
variableHeader.push(connectFlags);
// Keep Alive (2 bytes)
variableHeader.push((this.keepAlive >> 8) & 0xFF);
variableHeader.push(this.keepAlive & 0xFF);
// 3. 计算 Remaining Length 并编码
const remainingLength = variableHeader.length + payload.length;
const remainingLengthBytes = this.encodeRemainingLength(remainingLength);
// 4. 组装 Fixed Header (0x10 表示 CONNECT)
let mqttPacket = [0x10, ...remainingLengthBytes, ...variableHeader, ...payload];
// 5. 发送 ArrayBuffer
this.socketTask.send({
data: new Uint8Array(mqttPacket).buffer
});
}
C. UTF-8 编码处理
这是很多开源库在小程序端失效的原因。JavaScript 的 charCodeAt 返回的是 UTF-16 代码单元,直接转为字节会破坏多字节字符。我们需要手动实现 UTF-8 转换逻辑(代码中已包含完整的 encodeString 和 decodeUtf8 方法,支持代理对 surrogate pairs)。
D. 消息解析与心跳
在 handleMqttPacket 中,我们读取第一个字节判断报文类型。
0x20 (CONNACK): 检查返回码,若为0则标记 connected = true 并启动心跳。
0x30 (PUBLISH): 解析 Topic 长度,提取 Topic 字符串,剩余部分即为 Payload。
0xD0 (PINGRESP): 收到服务端的心跳响应,确认连接健康。
心跳逻辑通过 setInterval 实现,每隔 keepAlive / 2 秒发送一次 PINGREQ。
完整的mqtt客户端代码
// 封装的MQTT客户端
class WechatMqttClient {
constructor(url, options) {
this.url = url;
this.options = options;
this.socketTask = null;
this.connected = false;
this.eventHandlers = {};
this.messageId = 1;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
this.reconnectTimer = null;
this.shouldReconnect = true;
this.keepAlive = options.keepAlive || 60;
this.heartbeatTimer = null;
}
connect() {
try {
// 重置重连标志
this.shouldReconnect = true;
// 验证URL格式
if (!this.url || (!this.url.startsWith('wss://') && !this.url.startsWith('ws://'))) {
const error = new Error('无效的WebSocket URL: ' + this.url + ',必须使用ws://或wss://协议');
console.error(error.message);
this.emit('error', error);
return;
}
// 先关闭现有连接
if (this.socketTask) {
this.socketTask.close();
this.socketTask = null;
}
// 使用微信小程序的connectSocket - 简化header
this.socketTask = uni.connectSocket({
url: this.url,
protocols: ['mqtt'],
success: (res) => {
console.log('WebSocket连接创建成功:', res);
},
fail: (error) => {
console.error('WebSocket连接创建失败:', error);
const errorMsg = error.errMsg || JSON.stringify(error);
this.emit('error', new Error('WebSocket连接创建失败: ' + errorMsg));
}
});
// 监听连接打开
this.socketTask.onOpen((res) => {
console.log('WebSocket连接已打开:', res);
// 发送MQTT CONNECT协议包
this.sendMqttConnectPacket();
});
// 监听消息接收
this.socketTask.onMessage((res) => {
console.log('收到WebSocket消息:', res.data);
this.handleMqttPacket(res.data);
});
// 监听连接关闭
this.socketTask.onClose((res) => {
console.log('WebSocket连接已关闭:', res);
this.connected = false;
this.emit('close');
this.attemptReconnect();
});
// 监听错误
this.socketTask.onError((error) => {
console.error('WebSocket错误:', error);
this.emit('error', new Error('WebSocket错误: ' + JSON.stringify(error)));
this.attemptReconnect();
});
} catch (error) {
console.error('MQTT连接错误:', error);
this.emit('error', error);
}
}
// 构建并发送标准的MQTT CONNECT协议包
sendMqttConnectPacket() {
if (!this.socketTask) return;
try {
// 构建标准的MQTT 3.1.1 CONNECT协议包
const protocolName = 'MQTT';
const protocolLevel = 4; // MQTT 3.1.1
const cleanSession = true;
// 计算可变头部和载荷长度
let payload = [];
// Client ID
const clientId = this.options.clientId || 'client_' + Date.now();
payload = payload.concat(this.encodeString(clientId));
// Username
if (this.options.username) {
payload = payload.concat(this.encodeString(this.options.username));
}
// Password
if (this.options.password) {
payload = payload.concat(this.encodeString(this.options.password));
}
// 计算可变头部长度
let variableHeader = [];
// Protocol Name
variableHeader = variableHeader.concat(this.encodeString(protocolName));
// Protocol Level
variableHeader.push(protocolLevel);
// Connect Flags
let connectFlags = 0;
if (cleanSession) connectFlags |= 0x02;
if (this.options.username) connectFlags |= 0x80;
if (this.options.password) connectFlags |= 0x40;
variableHeader.push(connectFlags);
// Keep Alive
variableHeader.push((this.keepAlive >> 8) & 0xFF);
variableHeader.push(this.keepAlive & 0xFF);
// 计算剩余长度
const remainingLength = variableHeader.length + payload.length;
const remainingLengthBytes = this.encodeRemainingLength(remainingLength);
// 构建完整的MQTT包
let mqttPacket = [];
// Fixed Header: CONNECT (0x10)
mqttPacket.push(0x10);
// Remaining Length
mqttPacket = mqttPacket.concat(remainingLengthBytes);
// Variable Header
mqttPacket = mqttPacket.concat(variableHeader);
// Payload
mqttPacket = mqttPacket.concat(payload);
// 转换为ArrayBuffer发送
const buffer = new Uint8Array(mqttPacket).buffer;
this.socketTask.send({
data: buffer,
success: () => {
console.log('MQTT CONNECT协议包发送成功');
},
fail: (error) => {
console.error('MQTT CONNECT协议包发送失败:', error);
}
});
} catch (error) {
console.error('发送MQTT CONNECT协议包错误:', error);
}
}
// 编码字符串为MQTT格式 (长度 + UTF-8字节)
encodeString(str) {
// 使用小程序兼容的UTF-8编码
let bytes = [];
for (let i = 0; i < str.length; i++) {
let charCode = str.charCodeAt(i);
if (charCode < 0x80) {
bytes.push(charCode);
} else if (charCode < 0x800) {
bytes.push(0xC0 | (charCode >> 6));
bytes.push(0x80 | (charCode & 0x3F));
} else if (charCode < 0xD800 || charCode >= 0xE000) {
bytes.push(0xE0 | (charCode >> 12));
bytes.push(0x80 | ((charCode >> 6) & 0x3F));
bytes.push(0x80 | (charCode & 0x3F));
} else {
// 处理代理对 (surrogate pair)
i++;
const nextCharCode = str.charCodeAt(i);
const codePoint = ((charCode - 0xD800) << 10) | (nextCharCode - 0xDC00) + 0x10000;
bytes.push(0xF0 | (codePoint >> 18));
bytes.push(0x80 | ((codePoint >> 12) & 0x3F));
bytes.push(0x80 | ((codePoint >> 6) & 0x3F));
bytes.push(0x80 | (codePoint & 0x3F));
}
}
let result = [];
result.push((bytes.length >> 8) & 0xFF);
result.push(bytes.length & 0xFF);
for (let i = 0; i < bytes.length; i++) {
result.push(bytes[i]);
}
return result;
}
// 解码UTF-8字节为字符串
decodeUtf8(bytes) {
let result = '';
let i = 0;
while (i < bytes.length) {
const byte1 = bytes[i];
if (byte1 < 0x80) {
// 单字节
result += String.fromCharCode(byte1);
i++;
} else if (byte1 < 0xE0) {
// 双字节
const byte2 = bytes[i + 1];
const charCode = ((byte1 & 0x1F) << 6) | (byte2 & 0x3F);
result += String.fromCharCode(charCode);
i += 2;
} else if (byte1 < 0xF0) {
// 三字节
const byte2 = bytes[i + 1];
const byte3 = bytes[i + 2];
const charCode = ((byte1 & 0x0F) << 12) | ((byte2 & 0x3F) << 6) | (byte3 & 0x3F);
result += String.fromCharCode(charCode);
i += 3;
} else {
// 四字节
const byte2 = bytes[i + 1];
const byte3 = bytes[i + 2];
const byte4 = bytes[i + 3];
const codePoint = ((byte1 & 0x07) << 18) | ((byte2 & 0x3F) << 12) | ((byte3 & 0x3F) << 6) | (byte4 & 0x3F);
// 处理UTF-16代理对
if (codePoint >= 0x10000) {
const highSurrogate = ((codePoint - 0x10000) >> 10) + 0xD800;
const lowSurrogate = ((codePoint - 0x10000) & 0x3FF) + 0xDC00;
result += String.fromCharCode(highSurrogate, lowSurrogate);
} else {
result += String.fromCharCode(codePoint);
}
i += 4;
}
}
return result;
}
// 将字符串转换为UTF-8字节数组
stringToBytes(str) {
let bytes = [];
for (let i = 0; i < str.length; i++) {
let charCode = str.charCodeAt(i);
if (charCode < 0x80) {
bytes.push(charCode);
} else if (charCode < 0x800) {
bytes.push(0xC0 | (charCode >> 6));
bytes.push(0x80 | (charCode & 0x3F));
} else if (charCode < 0xD800 || charCode >= 0xE000) {
bytes.push(0xE0 | (charCode >> 12));
bytes.push(0x80 | ((charCode >> 6) & 0x3F));
bytes.push(0x80 | (charCode & 0x3F));
} else {
// 处理代理对 (surrogate pair)
i++;
const nextCharCode = str.charCodeAt(i);
const codePoint = ((charCode - 0xD800) << 10) | (nextCharCode - 0xDC00) + 0x10000;
bytes.push(0xF0 | (codePoint >> 18));
bytes.push(0x80 | ((codePoint >> 12) & 0x3F));
bytes.push(0x80 | ((codePoint >> 6) & 0x3F));
bytes.push(0x80 | (codePoint & 0x3F));
}
}
return bytes;
}
// 编码MQTT剩余长度
encodeRemainingLength(length) {
let result = [];
let digit;
do {
digit = length % 128;
length = Math.floor(length / 128);
if (length > 0) {
digit |= 0x80;
}
result.push(digit);
} while (length > 0);
return result;
}
// 处理MQTT协议包
handleMqttPacket(data) {
try {
let buffer;
if (data instanceof ArrayBuffer) {
buffer = new Uint8Array(data);
} else if (typeof data === 'string') {
// 如果是字符串,可能是测试数据
try {
const parsed = JSON.parse(data);
if (parsed.topic && parsed.payload) {
this.emit('message', parsed.topic, parsed.payload);
return;
}
} catch (e) {
// 不是JSON,继续处理
}
this.emit('message', 'unknown', data);
return;
} else {
this.emit('message', 'unknown', data);
return;
}
if (buffer.length < 2) {
console.warn('MQTT包太短');
return;
}
const fixedHeader = buffer[0];
const packetType = (fixedHeader >> 4) & 0x0F;
// CONNACK (0x20) - 连接确认
if (packetType === 2) {
if (buffer.length >= 4) {
const returnCode = buffer[3];
if (returnCode === 0) {
console.log('MQTT连接成功');
this.connected = true;
this.reconnectAttempts = 0;
// 启动心跳机制
this.startHeartbeat();
this.emit('connect');
} else {
console.error('MQTT连接失败,返回码:', returnCode);
this.emit('error', new Error('MQTT连接失败,返回码: ' + returnCode));
}
}
return;
}
// PUBLISH (0x30) - 消息发布
if (packetType === 3) {
// 解析剩余长度
let remainingLength = 0;
let multiplier = 1;
let index = 1;
let digit;
do {
digit = buffer[index++];
remainingLength += (digit & 0x7F) * multiplier;
multiplier *= 128;
} while ((digit & 0x80) !== 0);
// 提取可变头部和载荷
const variableHeaderAndPayload = buffer.subarray(index, index + remainingLength);
// 解析主题名称长度
const topicLength = (variableHeaderAndPayload[0] << 8) | variableHeaderAndPayload[1];
const topicNameBytes = variableHeaderAndPayload.subarray(2, 2 + topicLength);
// 解码主题名称
const topicName = this.decodeUtf8(topicNameBytes);
// 确定QoS级别
const qos = (fixedHeader >> 1) & 0x03;
// 跳过消息ID(如果QoS > 0)
let payloadStart = 2 + topicLength;
if (qos > 0) {
payloadStart += 2;
}
// 提取消息载荷
const payload = variableHeaderAndPayload.subarray(payloadStart);
// 发送消息载荷
this.emit('message', topicName, payload);
return;
}
// SUBACK (0x90) - 订阅确认
if (packetType === 9) {
console.log('收到MQTT SUBACK');
return;
}
// UNSUBACK (0xB0) - 取消订阅确认
if (packetType === 11) {
console.log('收到MQTT UNSUBACK');
return;
}
// PINGRESP (0xD0) - Ping响应
if (packetType === 13) {
console.log('收到MQTT PINGRESP');
return;
}
// 其他类型的包
console.log('收到其他类型的MQTT包:', packetType);
} catch (error) {
console.error('处理MQTT协议包错误:', error);
this.emit('error', error);
}
}
publish(topic, message, callback) {
if (this.connected && this.socketTask) {
try {
let payloadBytes;
// 处理消息载荷
if (typeof message === 'string') {
// 如果是十六进制字符串,转换为字节
if (/^[0-9A-Fa-f\s]+$/.test(message)) {
const cleanHex = message.replace(/\s/g, '');
payloadBytes = [];
for (let i = 0; i < cleanHex.length; i += 2) {
payloadBytes.push(parseInt(cleanHex.substr(i, 2), 16));
}
} else {
// 普通字符串
payloadBytes = this.stringToBytes(message);
}
} else if (message instanceof Uint8Array) {
payloadBytes = Array.from(message);
} else if (message instanceof ArrayBuffer) {
payloadBytes = Array.from(new Uint8Array(message));
} else {
// 其他类型转换为字符串
payloadBytes = this.stringToBytes(JSON.stringify(message));
}
// 构建MQTT PUBLISH协议包
let variableHeader = [];
// Topic
variableHeader = variableHeader.concat(this.encodeString(topic));
// QoS 0,没有messageId
// 计算剩余长度
const remainingLength = variableHeader.length + payloadBytes.length;
const remainingLengthBytes = this.encodeRemainingLength(remainingLength);
// 构建完整的MQTT包
let mqttPacket = [];
// Fixed Header: PUBLISH (0x30) - QoS 0, no retain
mqttPacket.push(0x30);
// Remaining Length
mqttPacket = mqttPacket.concat(remainingLengthBytes);
// Variable Header
mqttPacket = mqttPacket.concat(variableHeader);
// Payload
mqttPacket = mqttPacket.concat(payloadBytes);
// 转换为ArrayBuffer发送
const buffer = new Uint8Array(mqttPacket).buffer;
this.socketTask.send({
data: buffer,
success: () => {
if (callback) callback();
},
fail: (error) => {
console.error('MQTT PUBLISH协议包发送失败:', error);
if (callback) callback(error);
}
});
} catch (error) {
console.error('发布消息错误:', error);
if (callback) callback(error);
}
} else {
console.warn('WebSocket未连接,无法发布消息');
if (callback) callback(new Error('WebSocket未连接'));
}
}
subscribe(topic, callback) {
if (this.connected && this.socketTask) {
try {
// 构建MQTT SUBSCRIBE协议包
let variableHeader = [];
// Message ID
const messageId = this.messageId++;
variableHeader.push((messageId >> 8) & 0xFF);
variableHeader.push(messageId & 0xFF);
// Payload: Topic + QoS
let payload = [];
payload = payload.concat(this.encodeString(topic));
payload.push(0); // QoS 0
// 计算剩余长度
const remainingLength = variableHeader.length + payload.length;
const remainingLengthBytes = this.encodeRemainingLength(remainingLength);
// 构建完整的MQTT包
let mqttPacket = [];
// Fixed Header: SUBSCRIBE (0x82)
mqttPacket.push(0x82);
// Remaining Length
mqttPacket = mqttPacket.concat(remainingLengthBytes);
// Variable Header
mqttPacket = mqttPacket.concat(variableHeader);
// Payload
mqttPacket = mqttPacket.concat(payload);
// 转换为ArrayBuffer发送
const buffer = new Uint8Array(mqttPacket).buffer;
this.socketTask.send({
data: buffer,
success: () => {
if (callback) callback();
},
fail: (error) => {
console.error('MQTT SUBSCRIBE协议包发送失败:', error);
if (callback) callback(error);
}
});
} catch (error) {
console.error('订阅错误:', error);
if (callback) callback(error);
}
} else {
console.warn('WebSocket未连接,无法订阅');
if (callback) callback(new Error('WebSocket未连接'));
}
}
unsubscribe(topic) {
if (this.connected && this.socketTask) {
try {
// 构建MQTT UNSUBSCRIBE协议包
let variableHeader = [];
// Message ID
const messageId = this.messageId++;
variableHeader.push((messageId >> 8) & 0xFF);
variableHeader.push(messageId & 0xFF);
// Payload: Topic
let payload = [];
payload = payload.concat(this.encodeString(topic));
// 计算剩余长度
const remainingLength = variableHeader.length + payload.length;
const remainingLengthBytes = this.encodeRemainingLength(remainingLength);
// 构建完整的MQTT包
let mqttPacket = [];
// Fixed Header: UNSUBSCRIBE (0xA2)
mqttPacket.push(0xA2);
// Remaining Length
mqttPacket = mqttPacket.concat(remainingLengthBytes);
// Variable Header
mqttPacket = mqttPacket.concat(variableHeader);
// Payload
mqttPacket = mqttPacket.concat(payload);
// 转换为ArrayBuffer发送
const buffer = new Uint8Array(mqttPacket).buffer;
this.socketTask.send({
data: buffer,
fail: (error) => {
console.error('MQTT UNSUBSCRIBE协议包发送失败:', error);
}
});
} catch (error) {
console.error('取消订阅错误:', error);
}
}
}
sendPingreq() {
if (this.connected && this.socketTask) {
try {
// 构建MQTT PINGREQ协议包
const pingreqPacket = [0xC0, 0x00]; // PINGREQ (0xC0) + 剩余长度 0
const buffer = new Uint8Array(pingreqPacket).buffer;
console.log('发送MQTT PINGREQ');
this.socketTask.send({
data: buffer,
success: () => {
console.log('MQTT PINGREQ发送成功');
},
fail: (error) => {
console.error('MQTT PINGREQ发送失败:', error);
}
});
} catch (error) {
console.error('发送MQTT PINGREQ错误:', error);
}
}
}
startHeartbeat() {
// 清除现有的心跳定时器
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
// 每keepAlive/2秒发送一次PINGREQ
const heartbeatInterval = (this.keepAlive * 1000) / 2;
console.log('启动心跳机制,间隔:', heartbeatInterval, 'ms');
this.heartbeatTimer = setInterval(() => {
this.sendPingreq();
}, heartbeatInterval);
}
stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
console.log('停止心跳机制');
}
}
end() {
// 停止重连
this.shouldReconnect = false;
// 停止心跳
this.stopHeartbeat();
// 清除重连定时器
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
if (this.socketTask) {
this.socketTask.close({
success: () => {
console.log('WebSocket连接已关闭');
}
});
this.connected = false;
}
}
attemptReconnect() {
// 检查是否应该重连
if (!this.shouldReconnect) {
console.log('已停止重连');
return;
}
// 停止当前心跳
this.stopHeartbeat();
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
console.log(`尝试重新连接 (${this.reconnectAttempts}/${this.maxReconnectAttempts}),延迟: ${delay}ms`);
this.reconnectTimer = setTimeout(() => {
this.connect();
}, delay);
} else {
console.error('达到最大重连次数,停止重连');
this.emit('error', new Error('达到最大重连次数'));
}
}
on(event, handler) {
if (!this.eventHandlers[event]) {
this.eventHandlers[event] = [];
}
this.eventHandlers[event].push(handler);
}
emit(event, ...args) {
if (this.eventHandlers[event]) {
this.eventHandlers[event].forEach(handler => handler(...args));
}
}
}
// 创建MQTT连接函数
export const createMqttConnection = (url, options) => {
return new WechatMqttClient(url, options);
};
在 uni-app 中使用
下面展示如何在 uni-app (Vue 3 setup 语法糖) 中集成该客户端,实现一个设备状态监控弹窗。
1. 引入与配置
import { createMqttConnection } from '@/utils/miniMqtt.js'; // 引入封装好的类
// 配置项
const mqttUrl = 'wss://your-mqtt-broker.com/mqtt';
const clientId = "emqx_test_" + Math.random().toString(16).substring(2, 8);
const username = "local_cabinet";
const password = "hUwRNRvjcaGwsuU3";
2. 初始化连接与事件监听
在组件挂载或用户操作时初始化:
let client = null;
const initMqtt = () => {
if (client) client.end(); // 清理旧连接
client = createMqttConnection(mqttUrl, {
clientId,
username,
password,
keepAlive: 60
});
// 监听连接成功
client.on('connect', () => {
console.log('MQTT连接成功');
addDebugMessage("连接成功", "success");
// 连接成功后立即订阅主题
client.subscribe('wash/hy/device001/pubmsg', (err) => {
if(!err) addDebugMessage("订阅成功", "success");
});
});
// 监听错误
client.on('error', (err) => {
console.error(err);
addDebugMessage("发生错误:" + err.message, "error");
});
// 监听消息
client.on('message', (topic, message) => {
handleMqttMessage(topic, message);
});
client.connect();
};
3. 消息收发处理
发送消息(支持 Hex 字符串):
很多硬件设备通信使用十六进制指令。我们的封装类在 publish 方法中做了特殊处理:如果检测到传入的是纯十六进制字符串,会自动转换为 Uint8Array 发送。
const publishMessage = (topic, message) => {
if (!client || !client.connected) return;
// 假设 message 是 "01 03 00 00 00 0A C4 0B" 这样的十六进制字符串
// 或者普通文本 "Hello"
client.publish(topic, message, (err) => {
if (err) {
uni.showToast({ title: '发送失败', icon: 'none' });
} else {
addDebugMessage("指令已发送", "primary");
}
});
};
接收消息解析:
接收到的 message 通常是 Uint8Array。我们可以将其转换为十六进制字符串以便调试或发送给后端解析。
const handleMqttMessage = (topic, message) => {
// 将 Uint8Array 转为 Hex 字符串显示
let hexString = "";
if (message instanceof Uint8Array) {
for (let i = 0; i < message.length; i++) {
hexString += message[i].toString(16).toUpperCase().padStart(2, "0") + " ";
}
}
console.log(`收到消息 [${topic}]: ${hexString}`);
addDebugMessage(`收到: ${hexString}`, "default");
// 此处可调用后端API解析具体的业务含义
};
4. 完整页面示例结构
<template>
<view>
<uv-popup ref="popupRef" mode="center" round="10" :closeable="true" @change="change" :safeAreaInsetBottom="false">
<view class="w-670 p-40 text-center flex flex-col">
<view class="fs-36" :style="{ color:theme.primaryFontColor}">通信状态</view>
<scroll-view ref="scrollRef" :scroll-top="scrollTop" scroll-y="true" class="w-full h-420 mt-60">
<view class="scroll-view">
<view v-for="(item,index) in messageInfo" :key="index" class="scroll-item">
<text class="fs-28 text-left" :style="{ color:theme.labelColor}">{{item.time}}</text>
<text class="fs-32 text-left mt-10" :style="{color:item.type}">{{item.message}}</text>
</view>
</view>
</scroll-view>
<view class="mt-60">
<uv-button
:loading="loading"
text="查询设备状态"
@click="sendMsg"
:customStyle="{
width:'590rpx',
height:'112rpx',
borderRadius:'16rpx',
backgroundColor:theme.primaryColor,
color:theme.whiteColor
}"
></uv-button>
</view>
</view>
</uv-popup>
</view>
</template>
<script setup>
import { ref,nextTick,getCurrentInstance } from 'vue'
import { getQueryCommandAPI, getQueryCommandTextAPI } from '@/api/device';
import config from '@/config/config';
import { createMqttConnection } from '@/utils/miniMqtt.js';
import { useAppStore } from '@/store/app.js';
const theme = useAppStore().theme;
const instance = getCurrentInstance();
let popupRef = ref(null)
let scrollRef = ref(null)
let deviceInfo = ref(null)
let loading = ref(false)
let messageInfo = ref([])
let scrollTop = ref(0)
let subscribeTopic = "";
let publishTopic = "";
let currentPublishTopic = "";
let currentSubscribeTopic = "";
let ctimer = null;
let client = null;
// MQTT 配置
const mqttUrl = config.api.wss;
const clientId = "emqx_test_" + Math.random().toString(16).substring(2, 8);
const username = "local_cabinet";
const password = "hUwRNRvjcaGwsuU3";
let open = ()=>{
popupRef.value && popupRef.value.open()
}
let openMqtt = (params)=>{
deviceInfo.value = params
// 生成订阅和发布主题
let prefix = params.transmission == 1 ? "wash/hy/" : "wash/ts/";
subscribeTopic = prefix + params.device_no + "/pubmsg";
publishTopic = prefix + params.device_no + "/submsg";
currentPublishTopic = publishTopic;
open()
initMqtt()
}
// 初始化MQTT
let initMqtt = () => {
loading.value = true;
if (client) {
client.end();
console.log("已断开连接");
}
addDebugMessage("MQTT正在尝试连接...", theme.primaryFontColor);
try {
// 使用封装的MQTT客户端
client = createMqttConnection(mqttUrl, {
clientId,
username,
password,
rejectUnauthorized: false,
});
// 设置事件监听
client.on('connect', () => {
console.log('MQTT连接成功');
addDebugMessage("MQTT连接成功", theme.successColor);
loading.value = false;
// 订阅主题
addSubscribeTopic(subscribeTopic);
});
client.on('reconnect', () => {
console.log("MQTT尝试重连...");
addDebugMessage("MQTT正在重连...", theme.primaryFontColor);
});
client.on('error', (error) => {
console.error("MQTT连接错误:", error);
addDebugMessage("MQTT连接错误:" + error.message, theme.errorColor);
loading.value = false;
});
client.on('message', (topic, message) => {
// 处理接收到的消息
handleMqttMessage(topic, message);
});
// 开始连接
client.connect();
} catch (error) {
console.error("MQTT初始化失败:", error);
addDebugMessage("MQTT初始化失败: " + error.message, theme.errorColor);
loading.value = false;
}
};
// 处理MQTT消息
let handleMqttMessage = (topic, message) => {
try {
// 转换为16进制显示
const hexMessage = messageToHexString(message);
// 显示接收消息
getQueryCommandTextAPI({
command: hexMessage,
agreement: deviceInfo.value.agreement
}).then(res => {
loading.value = false;
if (res.data) {
let data = res.data;
clearTimeout(ctimer);
addDebugMessage(`收到消息:${data.run_status_text ? data.run_status_text : ''} ${data.run_fault == "[]" ? "" : data.run_fault}`, theme.primaryFontColor);
}
}).catch(error => {
clearTimeout(ctimer);
loading.value = false;
addDebugMessage(error.msg, theme.errorColor);
});
} catch (error) {
loading.value = false;
addDebugMessage("消息解析错误: " + error.message, theme.errorColor);
}
};
let sendMsg = () => {
loading.value = true;
getQueryCommandAPI({
deviceNo: deviceInfo.value.deviceNo,
agreement: deviceInfo.value.agreement
}).then(res => {
if (res.data) {
let queryCommand = res.data;
// 验证是否为有效的十六进制字符串
if (isValidHexString(queryCommand)) {
publishMessage(currentPublishTopic, queryCommand);
uni.$uv.toast("查询命令发送成功");
} else {
addDebugMessage("警告: 查询命令不是有效的十六进制格式,按文本发送", theme.errorColor);
publishMessage(currentPublishTopic, queryCommand);
uni.$uv.toast("查询命令发送成功");
}
} else {
uni.$uv.toast("获取查询命令失败或不支持的协议");
}
});
startCoutdown();
};
let startCoutdown = () => {
clearTimeout(ctimer);
ctimer = setTimeout(() => {
loading.value = false;
addDebugMessage("查询失败", theme.errorColor);
}, 5000);
};
let scrollToBottom = () => {
nextTick(() => {
const query = uni.createSelectorQuery().in(instance.proxy);
query.select('.scroll-view').boundingClientRect(res => {
scrollTop.value = res.height
}).exec();
});
};
// 订阅主题
let addSubscribeTopic = topic => {
if (currentSubscribeTopic && client) {
client.unsubscribe(currentSubscribeTopic);
}
if (topic && client && client.connected) {
client.subscribe(topic, (err) => {
if (!err) {
currentSubscribeTopic = topic;
addDebugMessage("订阅成功 ", theme.successColor);
} else {
addDebugMessage("订阅失败: " + err.message, theme.errorColor);
}
});
}
};
// 发布消息
let publishMessage = (topic, message) => {
if (client && client.connected) {
let messageToSend;
// 检查是否为十六进制字符串
if (typeof message === "string" && isValidHexString(message)) {
try {
// 转换为二进制数据
messageToSend = hexStringToBuffer(message);
} catch (error) {
addDebugMessage("十六进制转换失败: " + error.message, theme.errorColor);
return;
}
} else {
// 普通字符串消息
messageToSend = message;
addDebugMessage("发送文本消息: " + message, theme.primaryFontColor);
}
client.publish(topic, messageToSend, (err) => {
if (err) {
addDebugMessage("消息发送失败: " + err.message, theme.errorColor);
} else {
addDebugMessage("消息发送成功", theme.successColor);
}
});
} else {
addDebugMessage("MQTT未连接,无法发送消息", theme.errorColor);
}
};
// 将十六进制字符串转换为二进制数据
let hexStringToBuffer = hexString => {
// 移除空格和非十六进制字符
const cleanHex = hexString.replace(/[^0-9A-Fa-f]/g, "");
// 确保是偶数长度
const evenLengthHex = cleanHex.length % 2 === 0 ? cleanHex : "0" + cleanHex;
// 创建Uint8Array
const bytes = new Uint8Array(evenLengthHex.length / 2);
for (let i = 0; i < evenLengthHex.length; i += 2) {
bytes[i / 2] = parseInt(evenLengthHex.substr(i, 2), 16);
}
return bytes;
};
let addDebugMessage = (message, type) => {
messageInfo.value.push({
message,
type,
time: uni.$uv.date(new Date(),'hh:MM:ss')
});
scrollToBottom();
};
// 验证十六进制字符串格式
let isValidHexString = hexString => {
const cleanHex = hexString.replace(/\s+/g, "");
return /^[0-9A-Fa-f]+$/.test(cleanHex);
};
// 将消息转换为16进制字符串的函数
let messageToHexString = message => {
let hexString = "";
if (message instanceof Uint8Array) {
// 如果是Uint8Array
for (let i = 0; i < message.length; i++) {
hexString += message[i].toString(16).toUpperCase().padStart(2, "0") + " ";
}
} else if (typeof message === "string") {
// 如果是字符串,转换每个字符的字节值
for (let i = 0; i < message.length; i++) {
hexString += message.charCodeAt(i).toString(16).toUpperCase().padStart(2, "0") + " ";
}
} else if (message && typeof message === "object" && message.constructor && message.constructor.name === "Buffer") {
// 如果是Buffer对象(但在浏览器中通常不会遇到)
for (let i = 0; i < message.length; i++) {
hexString += message[i].toString(16).toUpperCase().padStart(2, "0") + " ";
}
} else if (message instanceof ArrayBuffer) {
// 如果是ArrayBuffer
const uint8Array = new Uint8Array(message);
for (let i = 0; i < uint8Array.length; i++) {
hexString += uint8Array[i].toString(16).toUpperCase().padStart(2, "0") + " ";
}
} else {
// 其他情况,尝试转换为字符串处理
const str = message.toString();
for (let i = 0; i < str.length; i++) {
hexString += str.charCodeAt(i).toString(16).toUpperCase().padStart(2, "0") + " ";
}
}
return hexString.trim();
};
let change = (e)=>{
if(!e.show){
if (client) {
// 先取消订阅,再关闭连接
if (currentSubscribeTopic) {
client.unsubscribe(currentSubscribeTopic);
}
client.end();
}
messageInfo.value = [];
currentSubscribeTopic = "";
currentPublishTopic = "";
}
}
defineExpose({
openMqtt
})
</script>
<style scoped>
.scroll-item {
display: flex;
flex-direction: column;
width: 100%;
padding: 30rpx 0;
border-bottom: 2rpx solid #F5F5F5;
}
</style>
0 个评论
要回复文章请先登录或注册