1***@qq.com
1***@qq.com
  • 发布:2026-02-26 17:25
  • 更新:2026-02-26 17:25
  • 阅读:17

uniapp基于websocket封装的MQTT客户端,解决使用mqtt.js库在小程序端的兼容问题

分类:uni-app

前言

在物联网(IoT)开发中,MQTT协议因其轻量级、低带宽占用和高可靠性,成为了设备通信的首选协议。在Web端或Node.js环境中,开发者通常直接使用成熟的 mqtt.js 库即可轻松实现连接。然而,当我们将目光转向微信小程序或uni-app小程序端时,情况变得复杂起来。
微信小程序的运行环境(Mini Program Environment)有着严格的限制:
不支持TCP Socket:小程序无法直接建立TCP连接,必须通过WebSocket (wss://) 进行通信。
API差异:小程序使用 uni.connectSocket (或 wx.connectSocket) 而非标准的 WebSocket API,且对二进制数据处理(ArrayBuffer)有特定要求。
依赖限制:mqtt.js 默认依赖 Node.js 的 net 模块或浏览器的标准 WebSocket,直接引入往往会导致打包失败或运行时报错(如 process is not defined 或 WebSocket is not a constructor)。
虽然社区存在一些适配方案(如使用 mqtt/dist/mqtt.min.js 并手动注入 WebSocket 实现),但在处理二进制协议解析、心跳保活以及断线重连时,往往显得臃肿且难以调试。
本文将介绍一种轻量级、原生适配的解决方案:基于 uni-app 的 connectSocket API,从零封装一个专为小程序设计的 MQTT 客户端类 WechatMqttClient。它不依赖任何第三方重型库,完美支持 MQTT 3.1.1 协议,解决了二进制数据收发、UTF-8编码兼容及自动重连等核心痛点。

核心难点分析

在封装之前,我们需要明确小程序端实现 MQTT 的几个关键挑战:
协议包构建:MQTT是基于二进制的协议。我们需要手动构建 Fixed Header(固定头部)、Variable Header(可变头部)和 Payload(载荷)。特别是剩余长度(Remaining Length)的变长编码算法,是容易出错的地方。
字符编码:MQTT协议规定主题(Topic)和客户端ID(ClientID)必须使用 UTF-8 编码。JavaScript 字符串内部是 UTF-16,直接转换字节会导致中文乱码或协议解析失败。我们需要手写 UTF-8 编解码器。
心跳机制:小程序网络环境不稳定,且WebSocket连接在无数据传输时可能被运营商或系统切断。必须在应用层实现基于 PINGREQ/PINGRESP 的心跳检测。
数据流处理:uni.connectSocket 返回的是 ArrayBuffer,需要将其转换为 Uint8Array 进行位运算解析。

解决方案:WechatMqttClient 类设计

我们设计了一个名为 WechatMqttClient 的类,它屏蔽了底层的二进制操作,对外提供标准的 connect, publish, subscribe, on 等事件驱动接口。

  1. 核心架构
    该类主要包含以下模块:
    连接管理:封装 uni.connectSocket,处理连接建立、关闭和错误监听。
    协议编解码:实现 MQTT 3.1.1 的 CONNECT, PUBLISH, SUBSCRIBE, UNSUBSCRIBE, PINGREQ 等报文的构建与解析。
    UTF-8 工具集:独立的 encodeString 和 decodeUtf8 方法,确保多语言字符集正确传输。
    心跳与重连:内置指数退避算法的重连机制和定时心跳发送。
    事件总线:简单的发布订阅模式,用于通知上层业务逻辑(如 connect, message, error)。
  2. 关键代码实现解析

A. 建立 WebSocket 连接

不同于标准 WebSocket,我们使用 uni-app 的 API,并指定子协议为 mqtt。

connect() {  
  // ... 验证URL逻辑 ...  

  this.socketTask = uni.connectSocket({  
    url: this.url,  
    protocols: ['mqtt'], // 重要:告知服务器这是MQTT协议  
    success: (res) => console.log('WS创建成功', res),  
    fail: (error) => this.emit('error', error)  
  });  

  this.socketTask.onOpen(() => {  
    // 连接打开后,立即发送 MQTT CONNECT 报文  
    this.sendMqttConnectPacket();  
  });  

  this.socketTask.onMessage((res) => {  
    // 接收二进制数据并解析  
    this.handleMqttPacket(res.data);  
  });  

  // ... 监听 close 和 error 以触发重连 ...  
}

B. 构建 MQTT CONNECT 报文

这是握手的关键。我们需要严格按照 MQTT 3.1.1 规范组装字节流。

sendMqttConnectPacket() {  
  // 1. 准备 Payload (ClientID, Username, Password)  
  let payload = [];  
  const clientId = this.options.clientId || 'client_' + Date.now();  
  payload = payload.concat(this.encodeString(clientId));  

  if (this.options.username) payload = payload.concat(this.encodeString(this.options.username));  
  if (this.options.password) payload = payload.concat(this.encodeString(this.options.password));  

  // 2. 准备 Variable Header (Protocol Name, Level, Flags, KeepAlive)  
  let variableHeader = [];  
  variableHeader = variableHeader.concat(this.encodeString('MQTT'));  
  variableHeader.push(4); // Protocol Level 3.1.1  

  // Connect Flags: CleanSession(0x02), Username(0x80), Password(0x40)  
  let connectFlags = 0x02;   
  if (this.options.username) connectFlags |= 0x80;  
  if (this.options.password) connectFlags |= 0x40;  
  variableHeader.push(connectFlags);  

  // Keep Alive (2 bytes)  
  variableHeader.push((this.keepAlive >> 8) & 0xFF);  
  variableHeader.push(this.keepAlive & 0xFF);  

  // 3. 计算 Remaining Length 并编码  
  const remainingLength = variableHeader.length + payload.length;  
  const remainingLengthBytes = this.encodeRemainingLength(remainingLength);  

  // 4. 组装 Fixed Header (0x10 表示 CONNECT)  
  let mqttPacket = [0x10, ...remainingLengthBytes, ...variableHeader, ...payload];  

  // 5. 发送 ArrayBuffer  
  this.socketTask.send({  
    data: new Uint8Array(mqttPacket).buffer  
  });  
}

C. UTF-8 编码处理

这是很多开源库在小程序端失效的原因。JavaScript 的 charCodeAt 返回的是 UTF-16 代码单元,直接转为字节会破坏多字节字符。我们需要手动实现 UTF-8 转换逻辑(代码中已包含完整的 encodeString 和 decodeUtf8 方法,支持代理对 surrogate pairs)。

D. 消息解析与心跳

在 handleMqttPacket 中,我们读取第一个字节判断报文类型。
0x20 (CONNACK): 检查返回码,若为0则标记 connected = true 并启动心跳。
0x30 (PUBLISH): 解析 Topic 长度,提取 Topic 字符串,剩余部分即为 Payload。
0xD0 (PINGRESP): 收到服务端的心跳响应,确认连接健康。
心跳逻辑通过 setInterval 实现,每隔 keepAlive / 2 秒发送一次 PINGREQ。

完整的mqtt客户端代码

// 封装的MQTT客户端  
class WechatMqttClient {  
  constructor(url, options) {  
    this.url = url;  
    this.options = options;  
    this.socketTask = null;  
    this.connected = false;  
    this.eventHandlers = {};  
    this.messageId = 1;  
    this.reconnectAttempts = 0;  
    this.maxReconnectAttempts = 5;  
    this.reconnectTimer = null;  
    this.shouldReconnect = true;  
    this.keepAlive = options.keepAlive || 60;  
    this.heartbeatTimer = null;  
  }  

  connect() {  
    try {  
      // 重置重连标志  
      this.shouldReconnect = true;  

      // 验证URL格式  
      if (!this.url || (!this.url.startsWith('wss://') && !this.url.startsWith('ws://'))) {  
        const error = new Error('无效的WebSocket URL: ' + this.url + ',必须使用ws://或wss://协议');  
        console.error(error.message);  
        this.emit('error', error);  
        return;  
      }  

      // 先关闭现有连接  
      if (this.socketTask) {  
        this.socketTask.close();  
        this.socketTask = null;  
      }  

      // 使用微信小程序的connectSocket - 简化header  
      this.socketTask = uni.connectSocket({  
        url: this.url,  
        protocols: ['mqtt'],  
        success: (res) => {  
          console.log('WebSocket连接创建成功:', res);  
        },  
        fail: (error) => {  
          console.error('WebSocket连接创建失败:', error);  
          const errorMsg = error.errMsg || JSON.stringify(error);  
          this.emit('error', new Error('WebSocket连接创建失败: ' + errorMsg));  
        }  
      });  

      // 监听连接打开  
      this.socketTask.onOpen((res) => {  
        console.log('WebSocket连接已打开:', res);  
        // 发送MQTT CONNECT协议包  
        this.sendMqttConnectPacket();  
      });  

      // 监听消息接收  
      this.socketTask.onMessage((res) => {  
        console.log('收到WebSocket消息:', res.data);  
        this.handleMqttPacket(res.data);  
      });  

      // 监听连接关闭  
      this.socketTask.onClose((res) => {  
        console.log('WebSocket连接已关闭:', res);  
        this.connected = false;  
        this.emit('close');  
        this.attemptReconnect();  
      });  

      // 监听错误  
      this.socketTask.onError((error) => {  
        console.error('WebSocket错误:', error);  
        this.emit('error', new Error('WebSocket错误: ' + JSON.stringify(error)));  
        this.attemptReconnect();  
      });  

    } catch (error) {  
      console.error('MQTT连接错误:', error);  
      this.emit('error', error);  
    }  
  }  

  // 构建并发送标准的MQTT CONNECT协议包  
  sendMqttConnectPacket() {  
    if (!this.socketTask) return;  

    try {  
      // 构建标准的MQTT 3.1.1 CONNECT协议包  
      const protocolName = 'MQTT';  
      const protocolLevel = 4; // MQTT 3.1.1  
      const cleanSession = true;  

      // 计算可变头部和载荷长度  
      let payload = [];  

      // Client ID  
      const clientId = this.options.clientId || 'client_' + Date.now();  
      payload = payload.concat(this.encodeString(clientId));  

      // Username  
      if (this.options.username) {  
        payload = payload.concat(this.encodeString(this.options.username));  
      }  

      // Password  
      if (this.options.password) {  
        payload = payload.concat(this.encodeString(this.options.password));  
      }  

      // 计算可变头部长度  
      let variableHeader = [];  

      // Protocol Name  
      variableHeader = variableHeader.concat(this.encodeString(protocolName));  

      // Protocol Level  
      variableHeader.push(protocolLevel);  

      // Connect Flags  
      let connectFlags = 0;  
      if (cleanSession) connectFlags |= 0x02;  
      if (this.options.username) connectFlags |= 0x80;  
      if (this.options.password) connectFlags |= 0x40;  
      variableHeader.push(connectFlags);  

      // Keep Alive  
      variableHeader.push((this.keepAlive >> 8) & 0xFF);  
      variableHeader.push(this.keepAlive & 0xFF);  

      // 计算剩余长度  
      const remainingLength = variableHeader.length + payload.length;  
      const remainingLengthBytes = this.encodeRemainingLength(remainingLength);  

      // 构建完整的MQTT包  
      let mqttPacket = [];  

      // Fixed Header: CONNECT (0x10)  
      mqttPacket.push(0x10);  

      // Remaining Length  
      mqttPacket = mqttPacket.concat(remainingLengthBytes);  

      // Variable Header  
      mqttPacket = mqttPacket.concat(variableHeader);  

      // Payload  
      mqttPacket = mqttPacket.concat(payload);  

      // 转换为ArrayBuffer发送  
      const buffer = new Uint8Array(mqttPacket).buffer;  

      this.socketTask.send({  
        data: buffer,  
        success: () => {  
          console.log('MQTT CONNECT协议包发送成功');  
        },  
        fail: (error) => {  
          console.error('MQTT CONNECT协议包发送失败:', error);  
        }  
      });  
    } catch (error) {  
      console.error('发送MQTT CONNECT协议包错误:', error);  
    }  
  }  

  // 编码字符串为MQTT格式 (长度 + UTF-8字节)  
  encodeString(str) {  
    // 使用小程序兼容的UTF-8编码  
    let bytes = [];  
    for (let i = 0; i < str.length; i++) {  
      let charCode = str.charCodeAt(i);  
      if (charCode < 0x80) {  
        bytes.push(charCode);  
      } else if (charCode < 0x800) {  
        bytes.push(0xC0 | (charCode >> 6));  
        bytes.push(0x80 | (charCode & 0x3F));  
      } else if (charCode < 0xD800 || charCode >= 0xE000) {  
        bytes.push(0xE0 | (charCode >> 12));  
        bytes.push(0x80 | ((charCode >> 6) & 0x3F));  
        bytes.push(0x80 | (charCode & 0x3F));  
      } else {  
        // 处理代理对 (surrogate pair)  
        i++;  
        const nextCharCode = str.charCodeAt(i);  
        const codePoint = ((charCode - 0xD800) << 10) | (nextCharCode - 0xDC00) + 0x10000;  
        bytes.push(0xF0 | (codePoint >> 18));  
        bytes.push(0x80 | ((codePoint >> 12) & 0x3F));  
        bytes.push(0x80 | ((codePoint >> 6) & 0x3F));  
        bytes.push(0x80 | (codePoint & 0x3F));  
      }  
    }  

    let result = [];  
    result.push((bytes.length >> 8) & 0xFF);  
    result.push(bytes.length & 0xFF);  
    for (let i = 0; i < bytes.length; i++) {  
      result.push(bytes[i]);  
    }  
    return result;  
  }  

  // 解码UTF-8字节为字符串  
  decodeUtf8(bytes) {  
    let result = '';  
    let i = 0;  

    while (i < bytes.length) {  
      const byte1 = bytes[i];  

      if (byte1 < 0x80) {  
        // 单字节  
        result += String.fromCharCode(byte1);  
        i++;  
      } else if (byte1 < 0xE0) {  
        // 双字节  
        const byte2 = bytes[i + 1];  
        const charCode = ((byte1 & 0x1F) << 6) | (byte2 & 0x3F);  
        result += String.fromCharCode(charCode);  
        i += 2;  
      } else if (byte1 < 0xF0) {  
        // 三字节  
        const byte2 = bytes[i + 1];  
        const byte3 = bytes[i + 2];  
        const charCode = ((byte1 & 0x0F) << 12) | ((byte2 & 0x3F) << 6) | (byte3 & 0x3F);  
        result += String.fromCharCode(charCode);  
        i += 3;  
      } else {  
        // 四字节  
        const byte2 = bytes[i + 1];  
        const byte3 = bytes[i + 2];  
        const byte4 = bytes[i + 3];  
        const codePoint = ((byte1 & 0x07) << 18) | ((byte2 & 0x3F) << 12) | ((byte3 & 0x3F) << 6) | (byte4 & 0x3F);  

        // 处理UTF-16代理对  
        if (codePoint >= 0x10000) {  
          const highSurrogate = ((codePoint - 0x10000) >> 10) + 0xD800;  
          const lowSurrogate = ((codePoint - 0x10000) & 0x3FF) + 0xDC00;  
          result += String.fromCharCode(highSurrogate, lowSurrogate);  
        } else {  
          result += String.fromCharCode(codePoint);  
        }  

        i += 4;  
      }  
    }  

    return result;  
  }  

  // 将字符串转换为UTF-8字节数组  
  stringToBytes(str) {  
    let bytes = [];  
    for (let i = 0; i < str.length; i++) {  
      let charCode = str.charCodeAt(i);  
      if (charCode < 0x80) {  
        bytes.push(charCode);  
      } else if (charCode < 0x800) {  
        bytes.push(0xC0 | (charCode >> 6));  
        bytes.push(0x80 | (charCode & 0x3F));  
      } else if (charCode < 0xD800 || charCode >= 0xE000) {  
        bytes.push(0xE0 | (charCode >> 12));  
        bytes.push(0x80 | ((charCode >> 6) & 0x3F));  
        bytes.push(0x80 | (charCode & 0x3F));  
      } else {  
        // 处理代理对 (surrogate pair)  
        i++;  
        const nextCharCode = str.charCodeAt(i);  
        const codePoint = ((charCode - 0xD800) << 10) | (nextCharCode - 0xDC00) + 0x10000;  
        bytes.push(0xF0 | (codePoint >> 18));  
        bytes.push(0x80 | ((codePoint >> 12) & 0x3F));  
        bytes.push(0x80 | ((codePoint >> 6) & 0x3F));  
        bytes.push(0x80 | (codePoint & 0x3F));  
      }  
    }  
    return bytes;  
  }  

  // 编码MQTT剩余长度  
  encodeRemainingLength(length) {  
    let result = [];  
    let digit;  
    do {  
      digit = length % 128;  
      length = Math.floor(length / 128);  
      if (length > 0) {  
        digit |= 0x80;  
      }  
      result.push(digit);  
    } while (length > 0);  
    return result;  
  }  

  // 处理MQTT协议包  
  handleMqttPacket(data) {  
    try {  
      let buffer;  
      if (data instanceof ArrayBuffer) {  
        buffer = new Uint8Array(data);  
      } else if (typeof data === 'string') {  
        // 如果是字符串,可能是测试数据  
        try {  
          const parsed = JSON.parse(data);  
          if (parsed.topic && parsed.payload) {  
            this.emit('message', parsed.topic, parsed.payload);  
            return;  
          }  
        } catch (e) {  
          // 不是JSON,继续处理  
        }  
        this.emit('message', 'unknown', data);  
        return;  
      } else {  
        this.emit('message', 'unknown', data);  
        return;  
      }  

      if (buffer.length < 2) {  
        console.warn('MQTT包太短');  
        return;  
      }  

      const fixedHeader = buffer[0];  
      const packetType = (fixedHeader >> 4) & 0x0F;  

      // CONNACK (0x20) - 连接确认  
      if (packetType === 2) {  
        if (buffer.length >= 4) {  
          const returnCode = buffer[3];  
          if (returnCode === 0) {  
            console.log('MQTT连接成功');  
            this.connected = true;  
            this.reconnectAttempts = 0;  
            // 启动心跳机制  
            this.startHeartbeat();  
            this.emit('connect');  
          } else {  
            console.error('MQTT连接失败,返回码:', returnCode);  
            this.emit('error', new Error('MQTT连接失败,返回码: ' + returnCode));  
          }  
        }  
        return;  
      }  

      // PUBLISH (0x30) - 消息发布  
      if (packetType === 3) {  
        // 解析剩余长度  
        let remainingLength = 0;  
        let multiplier = 1;  
        let index = 1;  
        let digit;  

        do {  
          digit = buffer[index++];  
          remainingLength += (digit & 0x7F) * multiplier;  
          multiplier *= 128;  
        } while ((digit & 0x80) !== 0);  

        // 提取可变头部和载荷  
        const variableHeaderAndPayload = buffer.subarray(index, index + remainingLength);  

        // 解析主题名称长度  
        const topicLength = (variableHeaderAndPayload[0] << 8) | variableHeaderAndPayload[1];  
        const topicNameBytes = variableHeaderAndPayload.subarray(2, 2 + topicLength);  

        // 解码主题名称  
        const topicName = this.decodeUtf8(topicNameBytes);  

        // 确定QoS级别  
        const qos = (fixedHeader >> 1) & 0x03;  

        // 跳过消息ID(如果QoS > 0)  
        let payloadStart = 2 + topicLength;  
        if (qos > 0) {  
          payloadStart += 2;  
        }  

        // 提取消息载荷  
        const payload = variableHeaderAndPayload.subarray(payloadStart);  

        // 发送消息载荷  
        this.emit('message', topicName, payload);  
        return;  
      }  

      // SUBACK (0x90) - 订阅确认  
      if (packetType === 9) {  
        console.log('收到MQTT SUBACK');  
        return;  
      }  

      // UNSUBACK (0xB0) - 取消订阅确认  
      if (packetType === 11) {  
        console.log('收到MQTT UNSUBACK');  
        return;  
      }  

      // PINGRESP (0xD0) - Ping响应  
      if (packetType === 13) {  
        console.log('收到MQTT PINGRESP');  
        return;  
      }  

      // 其他类型的包  
      console.log('收到其他类型的MQTT包:', packetType);  

    } catch (error) {  
      console.error('处理MQTT协议包错误:', error);  
      this.emit('error', error);  
    }  
  }  

  publish(topic, message, callback) {  
    if (this.connected && this.socketTask) {  
      try {  
        let payloadBytes;  

        // 处理消息载荷  
        if (typeof message === 'string') {  
          // 如果是十六进制字符串,转换为字节  
          if (/^[0-9A-Fa-f\s]+$/.test(message)) {  
            const cleanHex = message.replace(/\s/g, '');  
            payloadBytes = [];  
            for (let i = 0; i < cleanHex.length; i += 2) {  
              payloadBytes.push(parseInt(cleanHex.substr(i, 2), 16));  
            }  
          } else {  
            // 普通字符串  
            payloadBytes = this.stringToBytes(message);  
          }  
        } else if (message instanceof Uint8Array) {  
          payloadBytes = Array.from(message);  
        } else if (message instanceof ArrayBuffer) {  
          payloadBytes = Array.from(new Uint8Array(message));  
        } else {  
          // 其他类型转换为字符串  
          payloadBytes = this.stringToBytes(JSON.stringify(message));  
        }  

        // 构建MQTT PUBLISH协议包  
        let variableHeader = [];  

        // Topic  
        variableHeader = variableHeader.concat(this.encodeString(topic));  

        // QoS 0,没有messageId  

        // 计算剩余长度  
        const remainingLength = variableHeader.length + payloadBytes.length;  
        const remainingLengthBytes = this.encodeRemainingLength(remainingLength);  

        // 构建完整的MQTT包  
        let mqttPacket = [];  

        // Fixed Header: PUBLISH (0x30) - QoS 0, no retain  
        mqttPacket.push(0x30);  

        // Remaining Length  
        mqttPacket = mqttPacket.concat(remainingLengthBytes);  

        // Variable Header  
        mqttPacket = mqttPacket.concat(variableHeader);  

        // Payload  
        mqttPacket = mqttPacket.concat(payloadBytes);  

        // 转换为ArrayBuffer发送  
        const buffer = new Uint8Array(mqttPacket).buffer;  

        this.socketTask.send({  
          data: buffer,  
          success: () => {  
            if (callback) callback();  
          },  
          fail: (error) => {  
            console.error('MQTT PUBLISH协议包发送失败:', error);  
            if (callback) callback(error);  
          }  
        });  
      } catch (error) {  
        console.error('发布消息错误:', error);  
        if (callback) callback(error);  
      }  
    } else {  
      console.warn('WebSocket未连接,无法发布消息');  
      if (callback) callback(new Error('WebSocket未连接'));  
    }  
  }  

  subscribe(topic, callback) {  
    if (this.connected && this.socketTask) {  
      try {  
        // 构建MQTT SUBSCRIBE协议包  
        let variableHeader = [];  

        // Message ID  
        const messageId = this.messageId++;  
        variableHeader.push((messageId >> 8) & 0xFF);  
        variableHeader.push(messageId & 0xFF);  

        // Payload: Topic + QoS  
        let payload = [];  
        payload = payload.concat(this.encodeString(topic));  
        payload.push(0); // QoS 0  

        // 计算剩余长度  
        const remainingLength = variableHeader.length + payload.length;  
        const remainingLengthBytes = this.encodeRemainingLength(remainingLength);  

        // 构建完整的MQTT包  
        let mqttPacket = [];  

        // Fixed Header: SUBSCRIBE (0x82)  
        mqttPacket.push(0x82);  

        // Remaining Length  
        mqttPacket = mqttPacket.concat(remainingLengthBytes);  

        // Variable Header  
        mqttPacket = mqttPacket.concat(variableHeader);  

        // Payload  
        mqttPacket = mqttPacket.concat(payload);  

        // 转换为ArrayBuffer发送  
        const buffer = new Uint8Array(mqttPacket).buffer;  

        this.socketTask.send({  
          data: buffer,  
          success: () => {  
            if (callback) callback();  
          },  
          fail: (error) => {  
            console.error('MQTT SUBSCRIBE协议包发送失败:', error);  
            if (callback) callback(error);  
          }  
        });  
      } catch (error) {  
        console.error('订阅错误:', error);  
        if (callback) callback(error);  
      }  
    } else {  
      console.warn('WebSocket未连接,无法订阅');  
      if (callback) callback(new Error('WebSocket未连接'));  
    }  
  }  

  unsubscribe(topic) {  
    if (this.connected && this.socketTask) {  
      try {  
        // 构建MQTT UNSUBSCRIBE协议包  
        let variableHeader = [];  

        // Message ID  
        const messageId = this.messageId++;  
        variableHeader.push((messageId >> 8) & 0xFF);  
        variableHeader.push(messageId & 0xFF);  

        // Payload: Topic  
        let payload = [];  
        payload = payload.concat(this.encodeString(topic));  

        // 计算剩余长度  
        const remainingLength = variableHeader.length + payload.length;  
        const remainingLengthBytes = this.encodeRemainingLength(remainingLength);  

        // 构建完整的MQTT包  
        let mqttPacket = [];  

        // Fixed Header: UNSUBSCRIBE (0xA2)  
        mqttPacket.push(0xA2);  

        // Remaining Length  
        mqttPacket = mqttPacket.concat(remainingLengthBytes);  

        // Variable Header  
        mqttPacket = mqttPacket.concat(variableHeader);  

        // Payload  
        mqttPacket = mqttPacket.concat(payload);  

        // 转换为ArrayBuffer发送  
        const buffer = new Uint8Array(mqttPacket).buffer;  

        this.socketTask.send({  
          data: buffer,  
          fail: (error) => {  
            console.error('MQTT UNSUBSCRIBE协议包发送失败:', error);  
          }  
        });  
      } catch (error) {  
        console.error('取消订阅错误:', error);  
      }  
    }  
  }  

  sendPingreq() {  
    if (this.connected && this.socketTask) {  
      try {  
        // 构建MQTT PINGREQ协议包  
        const pingreqPacket = [0xC0, 0x00]; // PINGREQ (0xC0) + 剩余长度 0  

        const buffer = new Uint8Array(pingreqPacket).buffer;  

        console.log('发送MQTT PINGREQ');  

        this.socketTask.send({  
          data: buffer,  
          success: () => {  
            console.log('MQTT PINGREQ发送成功');  
          },  
          fail: (error) => {  
            console.error('MQTT PINGREQ发送失败:', error);  
          }  
        });  
      } catch (error) {  
        console.error('发送MQTT PINGREQ错误:', error);  
      }  
    }  
  }  

  startHeartbeat() {  
    // 清除现有的心跳定时器  
    if (this.heartbeatTimer) {  
      clearInterval(this.heartbeatTimer);  
      this.heartbeatTimer = null;  
    }  

    // 每keepAlive/2秒发送一次PINGREQ  
    const heartbeatInterval = (this.keepAlive * 1000) / 2;  

    console.log('启动心跳机制,间隔:', heartbeatInterval, 'ms');  

    this.heartbeatTimer = setInterval(() => {  
      this.sendPingreq();  
    }, heartbeatInterval);  
  }  

  stopHeartbeat() {  
    if (this.heartbeatTimer) {  
      clearInterval(this.heartbeatTimer);  
      this.heartbeatTimer = null;  
      console.log('停止心跳机制');  
    }  
  }  

  end() {  
    // 停止重连  
    this.shouldReconnect = false;  

    // 停止心跳  
    this.stopHeartbeat();  

    // 清除重连定时器  
    if (this.reconnectTimer) {  
      clearTimeout(this.reconnectTimer);  
      this.reconnectTimer = null;  
    }  

    if (this.socketTask) {  
      this.socketTask.close({  
        success: () => {  
          console.log('WebSocket连接已关闭');  
        }  
      });  
      this.connected = false;  
    }  
  }  

  attemptReconnect() {  
    // 检查是否应该重连  
    if (!this.shouldReconnect) {  
      console.log('已停止重连');  
      return;  
    }  

    // 停止当前心跳  
    this.stopHeartbeat();  

    if (this.reconnectAttempts < this.maxReconnectAttempts) {  
      this.reconnectAttempts++;  
      const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);  

      console.log(`尝试重新连接 (${this.reconnectAttempts}/${this.maxReconnectAttempts}),延迟: ${delay}ms`);  

      this.reconnectTimer = setTimeout(() => {  
        this.connect();  
      }, delay);  
    } else {  
      console.error('达到最大重连次数,停止重连');  
      this.emit('error', new Error('达到最大重连次数'));  
    }  
  }  

  on(event, handler) {  
    if (!this.eventHandlers[event]) {  
      this.eventHandlers[event] = [];  
    }  
    this.eventHandlers[event].push(handler);  
  }  

  emit(event, ...args) {  
    if (this.eventHandlers[event]) {  
      this.eventHandlers[event].forEach(handler => handler(...args));  
    }  
  }  
}  

// 创建MQTT连接函数  
export const createMqttConnection = (url, options) => {  
  return new WechatMqttClient(url, options);  
};

在 uni-app 中使用

下面展示如何在 uni-app (Vue 3 setup 语法糖) 中集成该客户端,实现一个设备状态监控弹窗。

1. 引入与配置

import { createMqttConnection } from '@/utils/miniMqtt.js'; // 引入封装好的类  

// 配置项  
const mqttUrl = 'wss://your-mqtt-broker.com/mqtt';  
const clientId = "emqx_test_" + Math.random().toString(16).substring(2, 8);  
const username = "local_cabinet";  
const password = "hUwRNRvjcaGwsuU3";

2. 初始化连接与事件监听

在组件挂载或用户操作时初始化:

let client = null;  

const initMqtt = () => {  
  if (client) client.end(); // 清理旧连接  

  client = createMqttConnection(mqttUrl, {  
    clientId,  
    username,  
    password,  
    keepAlive: 60  
  });  

  // 监听连接成功  
  client.on('connect', () => {  
    console.log('MQTT连接成功');  
    addDebugMessage("连接成功", "success");  
    // 连接成功后立即订阅主题  
    client.subscribe('wash/hy/device001/pubmsg', (err) => {  
      if(!err) addDebugMessage("订阅成功", "success");  
    });  
  });  

  // 监听错误  
  client.on('error', (err) => {  
    console.error(err);  
    addDebugMessage("发生错误:" + err.message, "error");  
  });  

  // 监听消息  
  client.on('message', (topic, message) => {  
    handleMqttMessage(topic, message);  
  });  

  client.connect();  
};

3. 消息收发处理

发送消息(支持 Hex 字符串):
很多硬件设备通信使用十六进制指令。我们的封装类在 publish 方法中做了特殊处理:如果检测到传入的是纯十六进制字符串,会自动转换为 Uint8Array 发送。

const publishMessage = (topic, message) => {  
  if (!client || !client.connected) return;  

  // 假设 message 是 "01 03 00 00 00 0A C4 0B" 这样的十六进制字符串  
  // 或者普通文本 "Hello"  
  client.publish(topic, message, (err) => {  
    if (err) {  
      uni.showToast({ title: '发送失败', icon: 'none' });  
    } else {  
      addDebugMessage("指令已发送", "primary");  
    }  
  });  
};

接收消息解析:

接收到的 message 通常是 Uint8Array。我们可以将其转换为十六进制字符串以便调试或发送给后端解析。

const handleMqttMessage = (topic, message) => {  
  // 将 Uint8Array 转为 Hex 字符串显示  
  let hexString = "";  
  if (message instanceof Uint8Array) {  
    for (let i = 0; i < message.length; i++) {  
      hexString += message[i].toString(16).toUpperCase().padStart(2, "0") + " ";  
    }  
  }  

  console.log(`收到消息 [${topic}]: ${hexString}`);  
  addDebugMessage(`收到: ${hexString}`, "default");  

  // 此处可调用后端API解析具体的业务含义  
};

4. 完整页面示例结构

<template>  
    <view>  
        <uv-popup ref="popupRef" mode="center" round="10" :closeable="true" @change="change" :safeAreaInsetBottom="false">  
            <view class="w-670 p-40 text-center flex flex-col">  
                <view class="fs-36" :style="{ color:theme.primaryFontColor}">通信状态</view>  
                <scroll-view ref="scrollRef" :scroll-top="scrollTop" scroll-y="true" class="w-full h-420 mt-60">  
                    <view class="scroll-view">  
                         <view v-for="(item,index) in messageInfo" :key="index" class="scroll-item">  
                             <text class="fs-28 text-left" :style="{ color:theme.labelColor}">{{item.time}}</text>  
                             <text class="fs-32 text-left mt-10" :style="{color:item.type}">{{item.message}}</text>  
                         </view>  
                    </view>  
                </scroll-view>  
                <view class="mt-60">  
                    <uv-button   
                        :loading="loading"   
                        text="查询设备状态"   
                        @click="sendMsg"  
                        :customStyle="{  
                            width:'590rpx',  
                            height:'112rpx',  
                            borderRadius:'16rpx',  
                            backgroundColor:theme.primaryColor,  
                            color:theme.whiteColor  
                        }"  
                    ></uv-button>  
                </view>  
            </view>  
        </uv-popup>  
    </view>  
</template>  

<script setup>  
import { ref,nextTick,getCurrentInstance } from 'vue'  
import { getQueryCommandAPI, getQueryCommandTextAPI } from '@/api/device';  
import config from '@/config/config';  
import { createMqttConnection } from '@/utils/miniMqtt.js';  
import { useAppStore } from '@/store/app.js';  

const theme = useAppStore().theme;  
const instance = getCurrentInstance();  

let popupRef = ref(null)  
let scrollRef = ref(null)  
let deviceInfo = ref(null)  
let loading = ref(false)  
let messageInfo = ref([])  
let scrollTop = ref(0)  
let subscribeTopic = "";  
let publishTopic = "";  
let currentPublishTopic = "";  
let currentSubscribeTopic = "";  
let ctimer = null;  
let client = null;  
// MQTT 配置  
const mqttUrl = config.api.wss;  
const clientId = "emqx_test_" + Math.random().toString(16).substring(2, 8);  
const username = "local_cabinet";  
const password = "hUwRNRvjcaGwsuU3";  

let open = ()=>{  
    popupRef.value && popupRef.value.open()  
}  
let openMqtt = (params)=>{  
    deviceInfo.value = params  
     // 生成订阅和发布主题  
    let prefix = params.transmission == 1 ? "wash/hy/" : "wash/ts/";  
    subscribeTopic = prefix + params.device_no + "/pubmsg";  
    publishTopic = prefix + params.device_no + "/submsg";  
    currentPublishTopic = publishTopic;  
    open()  
    initMqtt()  
}  

// 初始化MQTT  
let initMqtt = () => {  
  loading.value = true;  

  if (client) {  
    client.end();  
    console.log("已断开连接");  
  }  

  addDebugMessage("MQTT正在尝试连接...", theme.primaryFontColor);  

  try {  
    // 使用封装的MQTT客户端  
    client = createMqttConnection(mqttUrl, {  
      clientId,  
      username,  
      password,  
      rejectUnauthorized: false,  
    });  

    // 设置事件监听  
    client.on('connect', () => {  
      console.log('MQTT连接成功');  
      addDebugMessage("MQTT连接成功", theme.successColor);  
      loading.value = false;  
      // 订阅主题  
      addSubscribeTopic(subscribeTopic);  
    });  

    client.on('reconnect', () => {  
      console.log("MQTT尝试重连...");  
      addDebugMessage("MQTT正在重连...", theme.primaryFontColor);  
    });  

    client.on('error', (error) => {  
      console.error("MQTT连接错误:", error);  
      addDebugMessage("MQTT连接错误:" + error.message, theme.errorColor);  
      loading.value = false;  
    });  

    client.on('message', (topic, message) => {  
      // 处理接收到的消息  
      handleMqttMessage(topic, message);  
    });  

    // 开始连接  
    client.connect();  

  } catch (error) {  
    console.error("MQTT初始化失败:", error);  
    addDebugMessage("MQTT初始化失败: " + error.message, theme.errorColor);  
    loading.value = false;  
  }  
};  

// 处理MQTT消息  
let handleMqttMessage = (topic, message) => {  
  try {  

    // 转换为16进制显示  
    const hexMessage = messageToHexString(message);  

    // 显示接收消息  
    getQueryCommandTextAPI({  
      command: hexMessage,  
      agreement: deviceInfo.value.agreement  
    }).then(res => {  
      loading.value = false;  
      if (res.data) {  
        let data = res.data;  
        clearTimeout(ctimer);  
        addDebugMessage(`收到消息:${data.run_status_text ? data.run_status_text : ''} ${data.run_fault == "[]" ? "" : data.run_fault}`, theme.primaryFontColor);  
      }  
    }).catch(error => {  
      clearTimeout(ctimer);  
      loading.value = false;  
      addDebugMessage(error.msg, theme.errorColor);  
    });  
  } catch (error) {  
    loading.value = false;  
    addDebugMessage("消息解析错误: " + error.message, theme.errorColor);  
  }  
};  

let sendMsg = () => {  
  loading.value = true;  
  getQueryCommandAPI({  
    deviceNo: deviceInfo.value.deviceNo,  
    agreement: deviceInfo.value.agreement  
  }).then(res => {  
    if (res.data) {  
      let queryCommand = res.data;  
      // 验证是否为有效的十六进制字符串  
      if (isValidHexString(queryCommand)) {  
        publishMessage(currentPublishTopic, queryCommand);  
        uni.$uv.toast("查询命令发送成功");  
      } else {  
        addDebugMessage("警告: 查询命令不是有效的十六进制格式,按文本发送", theme.errorColor);  
        publishMessage(currentPublishTopic, queryCommand);  
        uni.$uv.toast("查询命令发送成功");  
      }  
    } else {  
      uni.$uv.toast("获取查询命令失败或不支持的协议");  
    }  
  });  
  startCoutdown();  
};  

let startCoutdown = () => {  
  clearTimeout(ctimer);  
  ctimer = setTimeout(() => {  
    loading.value = false;  
    addDebugMessage("查询失败", theme.errorColor);  
  }, 5000);  
};  

let scrollToBottom = () => {  
  nextTick(() => {  
         const query = uni.createSelectorQuery().in(instance.proxy);  
            query.select('.scroll-view').boundingClientRect(res => {  
                scrollTop.value = res.height  
        }).exec();  
  });  
};  

// 订阅主题  
let addSubscribeTopic = topic => {  
  if (currentSubscribeTopic && client) {  
    client.unsubscribe(currentSubscribeTopic);  
  }  

  if (topic && client && client.connected) {  
    client.subscribe(topic, (err) => {  
      if (!err) {  
        currentSubscribeTopic = topic;  
        addDebugMessage("订阅成功 ", theme.successColor);  
      } else {  
        addDebugMessage("订阅失败: " + err.message, theme.errorColor);  
      }  
    });  
  }  
};  

// 发布消息  
let publishMessage = (topic, message) => {  
  if (client && client.connected) {  
    let messageToSend;  

    // 检查是否为十六进制字符串  
    if (typeof message === "string" && isValidHexString(message)) {  
      try {  
        // 转换为二进制数据  
        messageToSend = hexStringToBuffer(message);  
      } catch (error) {  
        addDebugMessage("十六进制转换失败: " + error.message, theme.errorColor);  
        return;  
      }  
    } else {  
      // 普通字符串消息  
      messageToSend = message;  
      addDebugMessage("发送文本消息: " + message, theme.primaryFontColor);  
    }  

    client.publish(topic, messageToSend, (err) => {  
      if (err) {  
        addDebugMessage("消息发送失败: " + err.message, theme.errorColor);  
      } else {  
        addDebugMessage("消息发送成功", theme.successColor);  
      }  
    });  
  } else {  
    addDebugMessage("MQTT未连接,无法发送消息", theme.errorColor);  
  }  
};  

// 将十六进制字符串转换为二进制数据  
let hexStringToBuffer = hexString => {  
  // 移除空格和非十六进制字符  
  const cleanHex = hexString.replace(/[^0-9A-Fa-f]/g, "");  

  // 确保是偶数长度  
  const evenLengthHex = cleanHex.length % 2 === 0 ? cleanHex : "0" + cleanHex;  

  // 创建Uint8Array  
  const bytes = new Uint8Array(evenLengthHex.length / 2);  

  for (let i = 0; i < evenLengthHex.length; i += 2) {  
    bytes[i / 2] = parseInt(evenLengthHex.substr(i, 2), 16);  
  }  

  return bytes;  
};  

let addDebugMessage = (message, type) => {  
  messageInfo.value.push({  
    message,  
    type,  
    time: uni.$uv.date(new Date(),'hh:MM:ss')  
  });  
  scrollToBottom();  
};  

// 验证十六进制字符串格式  
let isValidHexString = hexString => {  
  const cleanHex = hexString.replace(/\s+/g, "");  
  return /^[0-9A-Fa-f]+$/.test(cleanHex);  
};  

// 将消息转换为16进制字符串的函数  
let messageToHexString = message => {  
  let hexString = "";  
  if (message instanceof Uint8Array) {  
    // 如果是Uint8Array  
    for (let i = 0; i < message.length; i++) {  
      hexString += message[i].toString(16).toUpperCase().padStart(2, "0") + " ";  
    }  
  } else if (typeof message === "string") {  
    // 如果是字符串,转换每个字符的字节值  
    for (let i = 0; i < message.length; i++) {  
      hexString += message.charCodeAt(i).toString(16).toUpperCase().padStart(2, "0") + " ";  
    }  
  } else if (message && typeof message === "object" && message.constructor && message.constructor.name === "Buffer") {  
    // 如果是Buffer对象(但在浏览器中通常不会遇到)  
    for (let i = 0; i < message.length; i++) {  
      hexString += message[i].toString(16).toUpperCase().padStart(2, "0") + " ";  
    }  
  } else if (message instanceof ArrayBuffer) {  
    // 如果是ArrayBuffer  
    const uint8Array = new Uint8Array(message);  
    for (let i = 0; i < uint8Array.length; i++) {  
      hexString += uint8Array[i].toString(16).toUpperCase().padStart(2, "0") + " ";  
    }  
  } else {  
    // 其他情况,尝试转换为字符串处理  
    const str = message.toString();  
    for (let i = 0; i < str.length; i++) {  
      hexString += str.charCodeAt(i).toString(16).toUpperCase().padStart(2, "0") + " ";  
    }  
  }  

  return hexString.trim();  
};  

let change = (e)=>{  
    if(!e.show){  
         if (client) {  
            // 先取消订阅,再关闭连接  
            if (currentSubscribeTopic) {  
              client.unsubscribe(currentSubscribeTopic);  
            }  
            client.end();  
          }  
          messageInfo.value = [];  
          currentSubscribeTopic = "";  
          currentPublishTopic = "";  
    }  
}  

defineExpose({  
    openMqtt  
})  

</script>  

<style scoped>  
    .scroll-item {  
      display: flex;  
      flex-direction: column;  
      width: 100%;  
      padding: 30rpx 0;  
      border-bottom: 2rpx solid #F5F5F5;  
    }  
</style>
0 关注 分享

要回复文章请先登录注册