目标
技术选型
前端
后端java
MAVEN配置
离线消息
坑
相关材料
目标
实现WEB和APP通用的即时通讯功能。一份代码同时支持各种WEB浏览器以及IOS,Android环境下的APP
技术选型
使用基于netty的nettysocketio实现,后台为JS,前台为HTML5页面,部署时采用nginx中转,java部署在tomcat上
前端
var socket
if(/^plus|android/.test(this.agent)) {
//android的默认浏览器内核不支持websocket10草案
socket = socketIO.connect(LContext.imUrl, {
transports: ['polling']
})
} else {
socket = socketIO.connect(LContext.imUrl)
}
socket.on('reconnect_attempt', function() {
socket.io.opts.transports = ['polling']
})
socket.on('connect', function() {
var data = lpage.user.getData()
////console.log('连接成功..')
////console.log('<span class="connect-msg">Client has connected to the server!</span>\n' + JSON.stringify(data))
//登记当前会话关联的用户
socket.emit('startConnection', {
extra: {
id: data.id,
token: data.token
}
})
})
socket.on('startConnection', function() {
////console.log('连接到服务器')
lpage.socket = socket
// document.querySelector('header .mui-title').innerText = lpage.fromUserName
})
//注册事件,服务器通过事件调用本方法
socket.on('push', function(data, ackCallback) {
////console.log('接收消息' + JSON.stringify(data))
lpage.addNews(data)
if(ackCallback) {
////console.log('接收到消息后,返回消息给服务器确认')
ackCallback('返回到服务器的确认消息')
}
})
socket.on('disconnect', function() {
////console.log('断开连接')
lpage.socket = undefined
setTimeout(function() {
lpage.initConnection()
}, 5000)
//5秒后尝试恢复连接
// document.querySelector('header .mui-title').innerText = '未连接(点击重新连接)'
})
// socket.on('error', (error) => {
// ////console.log('error' + JSON.stringify(error))
// })
//
// socket.on('connect_error', (error) => {
// ////console.log('error' + JSON.stringify(error))
// })
//
// socket.on('connect_timeout', (error) => {
// ////console.log('error' + JSON.stringify(error))
// })
awk
后端java
@Service
public class IMHandleService implements ApplicationListener<ContextRefreshedEvent> {
Logger logger = Logger.getLogger(IMHandleService.class);
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
logger.info("运行init event on IMHandleService");
if (event.getApplicationContext().getParent() != null)// root
// application
// context
// 没有parent,他就是老大.
{
// 需要执行的逻辑代码,当spring容器初始化完成后就会执行该方法。
logger.info("\n\n\n\n\n__\n\n\n已经加载了ContextRefreshedEvent\n\n\n\n");
return;
}
// 或者下面这种方式
// if (event.getApplicationContext().getDisplayName().equals("Root
// WebApplicationContext")) {
// System.out.println("\n\n\n\n\n加载一次的 \n\n ____\n\n\n\n");
//
// logger.info("\n\n\n\n\n__\n\n\n加载一次的
// \n\n_____\n\n");
// }
Configuration config = new Configuration();
config.setHostname("localhost");
config.setPort(10086);
config.setUpgradeTimeout(60000000);// 设置websocket过期时间
// config.setFirstDataTimeout(60000000);
// config.setPingInterval(28);
SocketIOServer server = new SocketIOServer(config);
logger.info(server.getConfiguration().getFirstDataTimeout() + " " + server.getConfiguration().getPingInterval()
- " " + server.getConfiguration().getUpgradeTimeout() + " "
- server.getConfiguration().getPingTimeout());
server.addConnectListener(new ConnectListener() {// 添加客户端连接监听器
@Override
public void onConnect(SocketIOClient client) {
logger.info(client.getRemoteAddress().toString() + " " + client.getTransport().getValue() + " "- client.getHandshakeData().getUrl() + " 接入" + client.getSessionId());
// 调用客户端的事件
client.sendEvent("startConnection", "hello");
}
});
server.addDisconnectListener(new DisconnectListener() {
@Override
public void onDisconnect(SocketIOClient client) {
logger.info(client.getRemoteAddress().toString() + " " + client.getTransport().getValue() + " " - client.getHandshakeData().getUrl() + " 断开" + client.getSessionId());
// 清理连接的session
// 检测登录状态,已登录
// 保存当前会话
String userId = sessionUserMap.get("sessionUserMap", client.getSessionId().toString());
if (userId == null) {
return;
}
// 清除在线状态
sessionUserMap.delete("sessionUserMap", client.getSessionId().toString());
userChatSession.remove(SESSION_STORAGE_PREFIX + userId, client.getSessionId().toString());
logger.info("清理session userId= " + userId);
logger.info("session列表长度 " + userChatSession.members(SESSION_STORAGE_PREFIX + userId).toArray().length);
}
});
/**- 监听事件,客户端连接成功后调用本方法
*/
server.addEventListener("startConnection", Operation.class, new DataListener<Operation>() {
@Override
public void onData(final SocketIOClient client, Operation data, AckRequest ackRequest) {
if (data.extra == null || data.extra.id == null || data.extra.token == null
|| !tokenManager.checkToken(new TokenModel(data.extra.id.toString(), data.extra.token))) {
return;
}
// 检测登录状态,已登录
// 保存当前会话
sessionUserMap.put("sessionUserMap", client.getSessionId().toString(), data.extra.id.toString());
userChatSession.add(SESSION_STORAGE_PREFIX + data.extra.id.toString(),
client.getSessionId().toString());
logger.info("确认连接成功,绑定userId= " + data.extra.id.toString() + " sessionId "
- 监听事件,客户端连接成功后调用本方法
- client.getSessionId().toString());
logger.info("session列表长度 " - userChatSession.members(SESSION_STORAGE_PREFIX + data.extra.id.toString()).toArray().length);
// 握手
// if (data.getMessage().equals("hello")) {
// Long userid = data.getUser();
// logger.info(Thread.currentThread().getName() +
// "web读取到的userid:" + userid);
// // send message back to client with ack callback
// // WITH data
// client.sendEvent("push", new
// AckCallback<String>(String.class) {
// @Override
// public void onSuccess(String result) {
// logger.info("ack from client: " + client.getSessionId() + "
// data: " + result);
// }
// }, context.SESSION_TIME);
//
// } else {
// logger.info("行情接收到了不应该有的web客户端请求1111...");
// }
}
});
/**- 监听事件,客户端通过事件调用本方法
*/
server.addEventListener("message", Operation.class, new DataListener<Operation>() {
@Override
public void onData(final SocketIOClient client, Operation data, AckRequest ackRequest) {
// 握手
if (data.getMessage().equals("hello")) {
Long userid = data.getTargetuser();
logger.info(Thread.currentThread().getName() + "web读取到的userid:" + userid);
// send message back to client with ack callback
// WITH data
client.sendEvent("push", new AckCallback<String>(String.class) {
@Override
public void onSuccess(String result) {
logger.info("接收到客户端反馈 " + client.getSessionId() + " data: " + result);
}
});
} else {
logger.info("接收到了不应该有的web客户端请求...");
}
}
});
if (serverBuff == null) {
logger.info("初始化 IMHandleService的 监听server");
try {
server.start();
serverBuff = server;
} catch (Exception e) {
e.printStackTrace();
logger.error("\n\nIM服务器启动失败...\n\n");
}
// try {
// Thread.sleep(Integer.MAX_VALUE);
// } catch (InterruptedException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
// server.stop();
} else {
logger.info(" IMHandleService的 监听server已启动");
}
}
@PreDestroy
public void closeSocket() {
logger.info(" IMHandleService 关闭中...");
if (serverBuff != null) {
serverBuff.stop();
for (String sessionId : sessionUserMap.keys("sessionUserMap")) {
String userId = sessionUserMap.get("sessionUserMap", sessionId);
userChatSession.remove(SESSION_STORAGE_PREFIX + userId, sessionId);
sessionUserMap.delete("sessionUserMap", sessionId);
logger.info("清理session " + userId + " 当前长度为" + sessionUserMap.keys("sessionUserMap").size() + " "
- 监听事件,客户端通过事件调用本方法
- userChatSession.size(SESSION_STORAGE_PREFIX + userId));
}
logger.info(" IMHandleService 已关闭完成");
}
}
}
http
MAVEN配置
<dependency>
<groupId>com.corundumstudio.socketio</groupId>
<artifactId>netty-socketio</artifactId>
<version>1.7.12</version>
</dependency>
xml
离线消息
对于所有消息,默认都通过在线服务发送,当发送失败时,启用个推的JAVA SDK进行消息推送即可,具体参考个推官网
- client.getHandshakeData().getUrl() + " 接入" + client.getSessionId());
坑
在WEB和IOS设备上都可以正常使用,但是在androidAPP中会出现一分钟左右就断线一次的问题。最后在一篇博客上找到了问题所在,由于android的默认浏览器内核支持的websocket版本为10,而Netty的默认实现目前还只支持到7.6,如果有兴趣自己升级的话,可以参考netty升级websocket草案10,
本文链接:http://blog.betweenfriends.cn/post/imnettysocketio.html
1 个评论
要回复文章请先登录或注册
jonychen1