二八中医
二八中医
  • 发布:2017-08-28 14:46
  • 更新:2021-07-28 09:58
  • 阅读:10630

个推+nettysocketio实现IM即时通讯

分类:MUI

目标
技术选型
前端
后端java
MAVEN配置
离线消息

相关材料
目标
实现WEB和APP通用的即时通讯功能。一份代码同时支持各种WEB浏览器以及IOS,Android环境下的APP

技术选型
使用基于netty的nettysocketio实现,后台为JS,前台为HTML5页面,部署时采用nginx中转,java部署在tomcat上

前端
var socket
if(/^plus|android/.test(this.agent)) {
//android的默认浏览器内核不支持websocket10草案
socket = socketIO.connect(LContext.imUrl, {
transports: ['polling']
})
} else {
socket = socketIO.connect(LContext.imUrl)
}
socket.on('reconnect_attempt', function() {
socket.io.opts.transports = ['polling']
})
socket.on('connect', function() {
var data = lpage.user.getData()
////console.log('连接成功..')
////console.log('<span class="connect-msg">Client has connected to the server!</span>\n' + JSON.stringify(data))
//登记当前会话关联的用户
socket.emit('startConnection', {
extra: {
id: data.id,
token: data.token
}
})
})
socket.on('startConnection', function() {
////console.log('连接到服务器')
lpage.socket = socket
// document.querySelector('header .mui-title').innerText = lpage.fromUserName
})
//注册事件,服务器通过事件调用本方法
socket.on('push', function(data, ackCallback) {
////console.log('接收消息' + JSON.stringify(data))
lpage.addNews(data)
if(ackCallback) {
////console.log('接收到消息后,返回消息给服务器确认')
ackCallback('返回到服务器的确认消息')
}
})
socket.on('disconnect', function() {
////console.log('断开连接')
lpage.socket = undefined
setTimeout(function() {
lpage.initConnection()
}, 5000)
//5秒后尝试恢复连接
// document.querySelector('header .mui-title').innerText = '未连接(点击重新连接)'
})
// socket.on('error', (error) => {
// ////console.log('error' + JSON.stringify(error))
// })
//
// socket.on('connect_error', (error) => {
// ////console.log('error' + JSON.stringify(error))
// })
//
// socket.on('connect_timeout', (error) => {
// ////console.log('error' + JSON.stringify(error))
// })
awk
后端java
@Service
public class IMHandleService implements ApplicationListener<ContextRefreshedEvent> {
Logger logger = Logger.getLogger(IMHandleService.class);
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
logger.info("运行init event on IMHandleService");
if (event.getApplicationContext().getParent() != null)// root
// application
// context
// 没有parent,他就是老大.
{
// 需要执行的逻辑代码,当spring容器初始化完成后就会执行该方法。
logger.info("\n\n\n\n\n__\n\n\n已经加载了ContextRefreshedEvent\n\n\n\n");
return;
}
// 或者下面这种方式
// if (event.getApplicationContext().getDisplayName().equals("Root
// WebApplicationContext")) {
// System.out.println("\n\n\n
\n\n加载一次的 \n\n ____\n\n\n\n");
//
// logger.info("\n\n\n\n\n__\n\n\n加载一次的
// \n\n_____\n\n");
// }
Configuration config = new Configuration();
config.setHostname("localhost");
config.setPort(10086);
config.setUpgradeTimeout(60000000);// 设置websocket过期时间
// config.setFirstDataTimeout(60000000);
// config.setPingInterval(28);
SocketIOServer server = new SocketIOServer(config);
logger.info(server.getConfiguration().getFirstDataTimeout() + " " + server.getConfiguration().getPingInterval()

  • " " + server.getConfiguration().getUpgradeTimeout() + " "
  • server.getConfiguration().getPingTimeout());
    server.addConnectListener(new ConnectListener() {// 添加客户端连接监听器
    @Override
    public void onConnect(SocketIOClient client) {
    logger.info(client.getRemoteAddress().toString() + " " + client.getTransport().getValue() + " "
    • client.getHandshakeData().getUrl() + " 接入" + client.getSessionId());
      // 调用客户端的事件
      client.sendEvent("startConnection", "hello");
      }
      });
      server.addDisconnectListener(new DisconnectListener() {
      @Override
      public void onDisconnect(SocketIOClient client) {
      logger.info(client.getRemoteAddress().toString() + " " + client.getTransport().getValue() + " "
    • client.getHandshakeData().getUrl() + " 断开" + client.getSessionId());
      // 清理连接的session
      // 检测登录状态,已登录
      // 保存当前会话
      String userId = sessionUserMap.get("sessionUserMap", client.getSessionId().toString());
      if (userId == null) {
      return;
      }
      // 清除在线状态
      sessionUserMap.delete("sessionUserMap", client.getSessionId().toString());
      userChatSession.remove(SESSION_STORAGE_PREFIX + userId, client.getSessionId().toString());
      logger.info("清理session userId= " + userId);
      logger.info("session列表长度 " + userChatSession.members(SESSION_STORAGE_PREFIX + userId).toArray().length);
      }
      });
      /**
      • 监听事件,客户端连接成功后调用本方法
        */
        server.addEventListener("startConnection", Operation.class, new DataListener<Operation>() {
        @Override
        public void onData(final SocketIOClient client, Operation data, AckRequest ackRequest) {
        if (data.extra == null || data.extra.id == null || data.extra.token == null
        || !tokenManager.checkToken(new TokenModel(data.extra.id.toString(), data.extra.token))) {
        return;
        }
        // 检测登录状态,已登录
        // 保存当前会话
        sessionUserMap.put("sessionUserMap", client.getSessionId().toString(), data.extra.id.toString());
        userChatSession.add(SESSION_STORAGE_PREFIX + data.extra.id.toString(),
        client.getSessionId().toString());
        logger.info("确认连接成功,绑定userId= " + data.extra.id.toString() + " sessionId "
    • client.getSessionId().toString());
      logger.info("session列表长度 "
    • userChatSession.members(SESSION_STORAGE_PREFIX + data.extra.id.toString()).toArray().length);
      // 握手
      // if (data.getMessage().equals("hello")) {
      // Long userid = data.getUser();
      // logger.info(Thread.currentThread().getName() +
      // "web读取到的userid:" + userid);
      // // send message back to client with ack callback
      // // WITH data
      // client.sendEvent("push", new
      // AckCallback<String>(String.class) {
      // @Override
      // public void onSuccess(String result) {
      // logger.info("ack from client: " + client.getSessionId() + "
      // data: " + result);
      // }
      // }, context.SESSION_TIME);
      //
      // } else {
      // logger.info("行情接收到了不应该有的web客户端请求1111...");
      // }
      }
      });
      /**
      • 监听事件,客户端通过事件调用本方法
        */
        server.addEventListener("message", Operation.class, new DataListener<Operation>() {
        @Override
        public void onData(final SocketIOClient client, Operation data, AckRequest ackRequest) {
        // 握手
        if (data.getMessage().equals("hello")) {
        Long userid = data.getTargetuser();
        logger.info(Thread.currentThread().getName() + "web读取到的userid:" + userid);
        // send message back to client with ack callback
        // WITH data
        client.sendEvent("push", new AckCallback<String>(String.class) {
        @Override
        public void onSuccess(String result) {
        logger.info("接收到客户端反馈 " + client.getSessionId() + " data: " + result);
        }
        });
        } else {
        logger.info("接收到了不应该有的web客户端请求...");
        }
        }
        });
        if (serverBuff == null) {
        logger.info("初始化 IMHandleService的 监听server");
        try {
        server.start();
        serverBuff = server;
        } catch (Exception e) {
        e.printStackTrace();
        logger.error("\n\nIM服务器启动失败...\n\n");
        }
        // try {
        // Thread.sleep(Integer.MAX_VALUE);
        // } catch (InterruptedException e) {
        // // TODO Auto-generated catch block
        // e.printStackTrace();
        // }
        // server.stop();
        } else {
        logger.info(" IMHandleService的 监听server已启动");
        }
        }
        @PreDestroy
        public void closeSocket() {
        logger.info(" IMHandleService 关闭中...");
        if (serverBuff != null) {
        serverBuff.stop();
        for (String sessionId : sessionUserMap.keys("sessionUserMap")) {
        String userId = sessionUserMap.get("sessionUserMap", sessionId);
        userChatSession.remove(SESSION_STORAGE_PREFIX + userId, sessionId);
        sessionUserMap.delete("sessionUserMap", sessionId);
        logger.info("清理session " + userId + " 当前长度为" + sessionUserMap.keys("sessionUserMap").size() + " "
    • userChatSession.size(SESSION_STORAGE_PREFIX + userId));
      }
      logger.info(" IMHandleService 已关闭完成");
      }
      }
      }
      http
      MAVEN配置
      <dependency>
      <groupId>com.corundumstudio.socketio</groupId>
      <artifactId>netty-socketio</artifactId>
      <version>1.7.12</version>
      </dependency>
      xml
      离线消息
      对于所有消息,默认都通过在线服务发送,当发送失败时,启用个推的JAVA SDK进行消息推送即可,具体参考个推官网


在WEB和IOS设备上都可以正常使用,但是在androidAPP中会出现一分钟左右就断线一次的问题。最后在一篇博客上找到了问题所在,由于android的默认浏览器内核支持的websocket版本为10,而Netty的默认实现目前还只支持到7.6,如果有兴趣自己升级的话,可以参考netty升级websocket草案10,

本文链接:http://blog.betweenfriends.cn/post/imnettysocketio.html

0 关注 分享

要回复文章请先登录注册

jonychen1

jonychen1

应用市场的这款im即时通讯插件感觉还不错,支持编译为安卓、ios的APP,还有微信小程序,我们使用下来感觉还不错,你也可以试试,插件市场地址:[https://ext.dcloud.net.cn/plugin?id=5177](https://ext.dcloud.net.cn/plugin?id=5177)
2021-07-28 09:58