Netty-实现基于WebSocket+Protobuf的消息推送系统
参考文献
项目结构
- 该项目主要分两个部分
message-server
和message-client
message-client
: 基于Netty
实现的WebSocket
客户端,具有与message-server
交互的功能.message-server
: 基于SpringBoot
结合Netty
实现的WebSocket
服务端.
核心代码
消息格式定义
1 | syntax = "proto3"; |
服务端
-
WebSocketServerInitializer
服务端handler
初始化类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106package com.holelin.messageserver.initializer;
import com.google.protobuf.MessageLite;
import com.google.protobuf.MessageLiteOrBuilder;
import com.holelin.messageserver.handler.WebSocketServerHandler;
import com.holelin.messageserver.protocol.MessageProtocol;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static io.netty.buffer.Unpooled.wrappedBuffer;
/**
* @Description:
* @Author: HoleLin
* @CreateDate: 2022/12/16 10:26
* @UpdateUser: HoleLin
* @UpdateDate: 2022/12/16 10:26
* @UpdateRemark: 修改内容
* @Version: 1.0
*/
public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {
private WebSocketServerHandler webSocketServerHandler;
private String workPath;
protected void initChannel(SocketChannel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
/*
说明
1. IdleStateHandler 是netty 提供的处理空闲状态的处理器
2. long readerIdleTime : 表示多长时间没有读, 就会发送一个心跳检测包检测是否连接
3. long writerIdleTime : 表示多长时间没有写, 就会发送一个心跳检测包检测是否连接
4. long allIdleTime : 表示多长时间没有读写, 就会发送一个心跳检测包检测是否连接
5. 文档说明
triggers an {@link IdleStateEvent} when a {@link Channel} has not performed
read, write, or both operation for a while.
6. 当 IdleStateEvent 触发后 , 就会传递给管道 的下一个handler去处理,通过调用(触发)
下一个handler 的 userEventTriggered , 在该方法中去处理 IdleStateEvent(读空闲,写空闲,读写空闲)
7.handlerRemoved有时候是无法感知连接断掉,所以还是需要心跳包的检测来判断连接是否还有效
*/
pipeline.addLast(new IdleStateHandler(6, 6, 6, TimeUnit.HOURS));
pipeline.addLast("http-server-codec", new HttpServerCodec());
pipeline.addLast("chunk-write", new ChunkedWriteHandler());
pipeline.addLast("http-aggregator", new HttpObjectAggregator(64 * 1024));
// WebSocket数据压缩
pipeline.addLast(new WebSocketServerCompressionHandler());
// 协议包长度限制
pipeline.addLast(new WebSocketServerProtocolHandler(workPath, null, true));
// 协议包解码
pipeline.addLast(new MessageToMessageDecoder<WebSocketFrame>() {
protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> objs) throws Exception {
ByteBuf buf = frame.content();
objs.add(buf);
buf.retain();
}
});
pipeline.addLast("protobuf-decoder", new ProtobufDecoder(MessageProtocol.MessageProto.getDefaultInstance()));
pipeline.addLast(webSocketServerHandler);
// 协议包编码
pipeline.addLast("custom-encoder", new MessageToMessageEncoder<MessageLiteOrBuilder>() {
protected void encode(ChannelHandlerContext ctx, MessageLiteOrBuilder msg, List<Object> out) throws Exception {
ByteBuf result = null;
if (msg instanceof MessageLite) {
result = wrappedBuffer(((MessageLite) msg).toByteArray());
}
if (msg instanceof MessageLite.Builder) {
result = wrappedBuffer(((MessageLite.Builder) msg).build().toByteArray());
}
// ==== 上面代码片段是拷贝自TCP ProtobufEncoder 源码 ====
// 然后下面再转成websocket二进制流,因为客户端不能直接解析protobuf编码生成的
assert result != null;
WebSocketFrame frame = new BinaryWebSocketFrame(result);
out.add(frame);
}
});
}
} -
业务处理
handler
:WebSocketServerHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227package com.holelin.messageserver.handler;
import com.holelin.messageserver.consts.StringConstants;
import com.holelin.messageserver.enums.ChatTypeEnum;
import com.holelin.messageserver.enums.MessageTypeEnum;
import com.holelin.messageserver.protocol.MessageProtocol;
import com.holelin.messageserver.utils.ServerChannelCache;
import com.holelin.messageserver.utils.SnowflakeUtil;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Objects;
/**
* @Description:
* @Author: HoleLin
* @CreateDate: 2022/12/16 10:41
* @UpdateUser: HoleLin
* @UpdateDate: 2022/12/16 10:41
* @UpdateRemark: 修改内容
* @Version: 1.0
*/
.Sharable
public class WebSocketServerHandler extends SimpleChannelInboundHandler<MessageProtocol.MessageProto> {
private final static Logger LOGGER = LoggerFactory.getLogger(WebSocketServerHandler.class);
private ServerChannelCache channelCache;
public static final String SYSTEM_USE_ID = "SYSTEM";
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
//将 evt 向下转型 IdleStateEvent
IdleStateEvent event = (IdleStateEvent) evt;
String eventType = null;
switch (event.state()) {
case ALL_IDLE:
eventType = "读写空闲";
final Channel channel = ctx.channel();
channelCache.loginOut(channel);
channel.close();
break;
}
}
}
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol.MessageProto message) {
final Channel channel = ctx.channel();
final int messageType = message.getMessageType();
final int chatType = message.getChatType();
// 处理系统基本功能
if (MessageTypeEnum.PING.code == messageType
&& ChatTypeEnum.SYSTEM.code == chatType) {
handlePing(message, channel);
}
// 处理登录消息
if (MessageTypeEnum.LOGIN.code == messageType
&& ChatTypeEnum.SYSTEM.code == chatType) {
handleLogin(channel);
}
// 处理创建聊天组
if (MessageTypeEnum.CREATE_AND_JOIN_GROUP.code == messageType
&& ChatTypeEnum.SYSTEM.code == chatType) {
handleCreateGroup(message, channel);
}
// 处理离开聊天组
if (MessageTypeEnum.LEAVE_GROUP.code == messageType
&& ChatTypeEnum.SYSTEM.code == chatType) {
handleLevelGroup(message, channel);
}
// 处理聊天
// 处理单聊
if (MessageTypeEnum.SEND_MESSAGE.code == messageType
&& ChatTypeEnum.SINGLE.code == chatType) {
handleSingleChat(message, channel);
}
if (MessageTypeEnum.SEND_MESSAGE.code == messageType
&& ChatTypeEnum.GROUP.code == chatType) {
handleGroupChat(message, channel);
}
}
private void handleLevelGroup(MessageProtocol.MessageProto message, Channel channel) {
final String fromUid = message.getFromUid();
final String groupId = message.getSendUid();
final Channel fromChannel = channelCache.getChannelByClientId(fromUid);
final MessageProtocol.MessageProto.Builder builder = MessageProtocol.MessageProto.newBuilder()
.setMessageType(MessageTypeEnum.LEAVE_GROUP_RESPONSE.code)
.setFromUid(SYSTEM_USE_ID)
.setSendUid(fromUid);
if (Objects.nonNull(fromChannel)) {
if (channelCache.existsGroupId(groupId)) {
channelCache.leaveGroup(groupId, channel);
fromChannel.writeAndFlush(builder.setContent(StringConstants.TRUE).build());
LOGGER.info("客户端ID:{}离开客户端组:{}", fromUid, groupId);
} else {
fromChannel.writeAndFlush(builder.setContent(StringConstants.FALSE).build());
LOGGER.warn("客户端ID:{}离开客户端组:{}失败,租户端组不存在", fromUid, groupId);
}
}
}
private void handleGroupChat(MessageProtocol.MessageProto message, Channel currentChannel) {
final String groupId = message.getSendUid();
final String fromUid = message.getFromUid();
final ChannelGroup channelGroup = channelCache.getChannelGroup(groupId);
final MessageProtocol.MessageProto.Builder builder = MessageProtocol.MessageProto.newBuilder()
.setMessageType(MessageTypeEnum.SEND_MESSAGE_RESPONSE.code)
.setChatType(ChatTypeEnum.GROUP.code)
.setFromUid(SYSTEM_USE_ID)
.setSendUid(fromUid);
if (Objects.nonNull(channelGroup) && Objects.nonNull(channelGroup.find(currentChannel.id()))) {
channelGroup.writeAndFlush(message);
LOGGER.info("发送客户端组消息成功,消息为:{}", message);
// 返回值
currentChannel.writeAndFlush(builder.setContent(StringConstants.TRUE).build());
} else {
currentChannel.writeAndFlush(builder.setContent(StringConstants.FALSE).build());
LOGGER.warn("{}组不存在", groupId);
}
}
/**
* 处理创建聊天组
*
* @param message
* @param channel
*/
private void handleCreateGroup(MessageProtocol.MessageProto message, Channel channel) {
final String groupId = message.getSendUid();
final String fromUid = message.getFromUid();
final Channel fromChannel = channelCache.getChannelByClientId(fromUid);
final MessageProtocol.MessageProto.Builder builder = MessageProtocol.MessageProto.newBuilder()
.setMessageType(MessageTypeEnum.CREATE_AND_JOIN_GROUP_RESPONSE.code)
.setFromUid(SYSTEM_USE_ID)
.setSendUid(fromUid);
channelCache.createAndJoinGroup(groupId, channel);
if (Objects.nonNull(fromChannel)) {
fromChannel.writeAndFlush(builder.setContent(StringConstants.TRUE).build());
LOGGER.info("用户:{}加入用户组:{}成功", fromUid, groupId);
}
}
/**
* 处理单聊消息
*
* @param message
*/
private void handleSingleChat(MessageProtocol.MessageProto message, Channel currentChannel) {
final String sendUid = message.getSendUid();
final String fromUid = message.getFromUid();
final Channel channel = channelCache.getChannelByClientId(sendUid);
final MessageProtocol.MessageProto.Builder builder = MessageProtocol.MessageProto.newBuilder()
.setMessageType(MessageTypeEnum.SEND_MESSAGE_RESPONSE.code)
.setChatType(ChatTypeEnum.SINGLE.code)
.setFromUid(SYSTEM_USE_ID)
.setSendUid(fromUid);
if (Objects.nonNull(channel)) {
channel.writeAndFlush(message);
currentChannel.writeAndFlush(builder.setContent(StringConstants.TRUE).build());
LOGGER.info("发送客户端消息成功,消息为:{}", message);
} else {
currentChannel.writeAndFlush(builder.setContent(StringConstants.FALSE).build());
LOGGER.warn("用户:{},不在线", sendUid);
}
}
/**
* 处理登录消息
*/
private void handleLogin(Channel channel) {
final String clientId = String.valueOf(SnowflakeUtil.genId());
channelCache.login(clientId, channel);
final MessageProtocol.MessageProto responseMessage = MessageProtocol.MessageProto.newBuilder()
.setMessageType(MessageTypeEnum.LOGIN_RESPONSE.code)
.setFromUid(clientId)
.setSendUid(SYSTEM_USE_ID)
.setContent(clientId)
.build();
channel.writeAndFlush(responseMessage);
LOGGER.info("用户:{}上线了...", clientId);
}
/**
* 处理PING消息
*
* @param message
* @param channel
*/
private void handlePing(MessageProtocol.MessageProto message, Channel channel) {
final String fromUid = message.getFromUid();
// 构建PONG消息
final MessageProtocol.MessageProto pongMessage = MessageProtocol.MessageProto.newBuilder()
.setMessageType(MessageTypeEnum.PONG.code)
.setFromUid(SYSTEM_USE_ID)
.setSendUid(fromUid)
.setContent(MessageTypeEnum.PONG.toString())
.build();
channel.writeAndFlush(pongMessage);
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
客户端
-
WebSocketClient
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183package com.holelin.messageclient.client;
import com.google.protobuf.MessageLite;
import com.google.protobuf.MessageLiteOrBuilder;
import com.holelin.messageclient.handler.WebSocketClientHandler;
import com.holelin.messageclient.handler.WebSocketMessageClientHandler;
import com.holelin.messageclient.protocol.MessageProtocol;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Proxy;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static io.netty.buffer.Unpooled.wrappedBuffer;
/**
* @Description:
* @Author: HoleLin
* @CreateDate: 2023/1/3 16:09
* @UpdateUser: HoleLin
* @UpdateDate: 2023/1/3 16:09
* @UpdateRemark: 修改内容
* @Version: 1.0
*/
public class WebSocketClient implements Client {
private final static Logger LOGGER = LoggerFactory.getLogger(WebSocketClient.class);
private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
public static final String CLIENT_HANDLER_NAME = "client-handler";
/**
* 服务端访问地址
* 例: ws://localhost:8080/websocketPath
*/
private URI uri;
private Channel channel;
private WebSocketMessageClientHandler messageClientHandler;
public WebSocketClient(String uri) throws URISyntaxException {
this.uri = new URI(uri);
}
/**
* 根据构造函数传入的URI连接服务器,启动客户端
*/
public void initClient() {
final NioEventLoopGroup group = new NioEventLoopGroup(new DefaultThreadFactory("WebSocketClient"));
final Bootstrap bootstrap = new Bootstrap();
messageClientHandler = new WebSocketMessageClientHandler();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("http-codec", new HttpClientCodec());
pipeline.addLast("http-aggregator", new HttpObjectAggregator(65536));
pipeline.addLast(WebSocketClientCompressionHandler.INSTANCE);
// 都属于ChannelInboundHandler,按照顺序执行
// 协议包解码
pipeline.addLast("websocket-decoder", new MessageToMessageDecoder<WebSocketFrame>() {
protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> objs) throws Exception {
ByteBuf buf = frame.content();
objs.add(buf);
buf.retain();
}
});
pipeline.addLast("protobuf-decoder", new ProtobufDecoder(MessageProtocol.MessageProto.getDefaultInstance()));
pipeline.addLast("message-handler", messageClientHandler);
pipeline.addLast(CLIENT_HANDLER_NAME, new WebSocketClientHandler());
// 都属于ChannelOutboundHandler,逆序执行
// 编码器
pipeline.addLast(new MessageToMessageEncoder<MessageLiteOrBuilder>() {
protected void encode(ChannelHandlerContext ctx, MessageLiteOrBuilder msg, List<Object> out) throws Exception {
ByteBuf result = null;
if (msg instanceof MessageLite) {
result = wrappedBuffer(((MessageLite) msg).toByteArray());
}
if (msg instanceof MessageLite.Builder) {
result = wrappedBuffer(((MessageLite.Builder) msg).build().toByteArray());
}
// ==== 上面代码片段是拷贝自TCP ProtobufEncoder 源码 ====
// 然后下面再转成websocket二进制流,因为客户端不能直接解析protobuf编码生成的
assert result != null;
WebSocketFrame frame = new BinaryWebSocketFrame(result);
out.add(frame);
}
});
}
});
try {
final String host = uri.getHost();
final int port = uri.getPort();
final ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
channel = channelFuture.channel();
channelFuture.addListener(listener -> {
if (listener.isSuccess()) {
LOGGER.debug("客户端连接host:{},port:{}成功", host, port);
} else {
LOGGER.error("客户端连接host:{},port:{}失败", host, port);
}
});
// 通过它构造握手响应消息返回给客户端,
// 同时将WebSocket相关的编码和解码类动态添加到ChannelPipeline中,用于WebSocket消息的编解码,
// 添加WebSocketEncoder和WebSocketDecoder之后,服务端就可以自动对WebSocket消息进行编解码了
WebSocketClientHandshaker handShaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders());
final WebSocketClientHandler clientHandler = (WebSocketClientHandler) channel.pipeline().get(CLIENT_HANDLER_NAME);
clientHandler.setHandShaker(handShaker);
handShaker.handshake(channel);
//阻塞等待是否握手成功
final ChannelFuture handShakerChannelFuture = clientHandler.handshakeFuture().sync();
handShakerChannelFuture.addListener(listener -> {
if (listener.isSuccess()) {
LOGGER.debug("WebSocket 握手成功");
} else {
LOGGER.error("WebSocket 握手失败");
}
});
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public Object getBean(final Class<?> serviceClass) {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class[]{serviceClass},
((proxy, method, args) -> {
if (Objects.isNull(messageClientHandler)) {
initClient();
}
messageClientHandler.setMessage((MessageProtocol.MessageProto) args[0]);
return executor.submit(messageClientHandler).get();
}));
}
public void close() {
channel.close();
}
} -
业务处理类:
WebSocketMessageClientHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74package com.holelin.messageclient.handler;
import com.holelin.messageclient.protocol.MessageProtocol;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.Objects;
import java.util.concurrent.Callable;
/**
* @Description: WebSocket客户端消息处理类
* @Author: HoleLin
* @CreateDate: 2023/1/5 13:40
* @UpdateUser: HoleLin
* @UpdateDate: 2023/1/5 13:40
* @UpdateRemark: 修改内容
* @Version: 1.0
*/
public class WebSocketMessageClientHandler extends SimpleChannelInboundHandler<MessageProtocol.MessageProto> implements Callable {
/**
* 上下文
*/
private ChannelHandlerContext context;
/**
* 发送的消息
*/
private MessageProtocol.MessageProto message;
/**
* 服务器返回值
*/
private MessageProtocol.MessageProto result;
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.context = ctx;
}
protected synchronized void channelRead0(ChannelHandlerContext ctx, MessageProtocol.MessageProto msg) throws Exception {
log.info("收到服务器的消息:{}", msg.getContent());
this.result = msg;
notify();
}
public synchronized Object call() throws Exception {
if (Objects.nonNull(message)) {
final Channel channel = context.pipeline().channel();
channel.writeAndFlush(message);
}
// 等待服务器发送的消息
wait();
return result;
}
/**
* 设置发送的消息
*
*/
public void setMessage(MessageProtocol.MessageProto message) {
this.message = message;
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.channel().close();
}
}
项目启动方式
-
message-server
: 以SpringBoot项目启动方式启动 -
message-client
: 为封装的与message-server
交互的API,其中具有连通性验证,登录服务器,创建并加入客户端组,离开客户端组,给指定客户端发送消息,给指定客户端组发送消息等功能.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62package com.diannei.ai.messageclient.client;
import com.diannei.ai.messageclient.utils.SendMessageUtil;
import com.diannei.ai.messageclient.utils.SnowflakeUtil;
import org.junit.jupiter.api.Test;
import java.net.URISyntaxException;
class WebSocketClientTest {
String uri = "ws://localhost:10089/ws";
void ping() throws URISyntaxException {
final WebSocketClient client = new WebSocketClient(uri);
final SendMessageUtil sendMessageUtil = new SendMessageUtil(client);
final Boolean ping = sendMessageUtil.ping();
System.out.println(ping);
}
void login() throws URISyntaxException {
final WebSocketClient client = new WebSocketClient(uri);
final SendMessageUtil sendMessageUtil = new SendMessageUtil(client);
System.out.println(sendMessageUtil.login());
}
void createGroup() throws URISyntaxException {
final WebSocketClient client = new WebSocketClient(uri);
final SendMessageUtil sendMessageUtil = new SendMessageUtil(client);
final String clientId = sendMessageUtil.login();
System.out.println(sendMessageUtil.createGroup(clientId, String.valueOf(SnowflakeUtil.genId())));
}
void leaveGroup() throws URISyntaxException {
final WebSocketClient client = new WebSocketClient(uri);
final SendMessageUtil sendMessageUtil = new SendMessageUtil(client);
final String clientId = sendMessageUtil.login();
System.out.println(sendMessageUtil.leaveGroup(clientId, String.valueOf(SnowflakeUtil.genId())));
}
void sendSingleMessage() throws URISyntaxException {
final WebSocketClient client = new WebSocketClient(uri);
final SendMessageUtil sendMessageUtil = new SendMessageUtil(client);
final String clientId = sendMessageUtil.login();
System.out.println(sendMessageUtil.sendSingleMessage(clientId, "1063424804638380032","测试消息"));
}
void sendGroupMessage() throws URISyntaxException {
final WebSocketClient client = new WebSocketClient(uri);
final SendMessageUtil sendMessageUtil = new SendMessageUtil(client);
final String clientId = sendMessageUtil.login();
System.out.println(sendMessageUtil.sendGroupMessage(clientId, "TEXT","测试消息"));
}
}
项目调试
-
调试可以使用两种方式:
-
Java-Client-API方式,即上方的单元测试类
-
前端JS调试
- 调试文件位于
message-server/src/main/resources/front
目录下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
<html lang="en">
<head>
<meta charset="UTF-8">
<title>WebSocket客户端</title>
</head>
<body>
<script src="protobuf.min.js"></script>
<script type="text/javascript">
let socket;
const SEND_MESSAGE = 6;
const LOGIN_RESPONSE = 20;
const CREATE_AND_JOIN_GROUP_RESPONSE = 30;
const LEAVE_GROUP_RESPONSE = 50;
const SEND_MESSAGE_RESPONSE = 60;
const CHAT_SIGNLE = 1;
const CHAT_GROUP = 2;
let currentClientId;
//如果浏览器支持WebSocket
if (window.WebSocket) {
//参数就是与服务器连接的地址
socket = new WebSocket("ws://localhost:10089/ws");
//客户端收到服务器消息的时候就会执行这个回调方法
socket.onmessage = function (event) {
const ta = document.getElementById("responseText");
// 解码
messageDecoder({
data: event.data,
success: function (responseMessage) {
console.log("返回值为", responseMessage)
if (LOGIN_RESPONSE === responseMessage.messageType) {
handleLoginRes(responseMessage, ta);
}
if (CREATE_AND_JOIN_GROUP_RESPONSE === responseMessage.messageType) {
handleCreateAndJoinGroupRes(responseMessage, ta);
}
if (LEAVE_GROUP_RESPONSE === responseMessage.messageType) {
handleLeaveGroupRes(responseMessage, ta);
}
if (SEND_MESSAGE_RESPONSE === responseMessage.messageType) {
handleSendMessageRes(responseMessage, ta);
}
if (SEND_MESSAGE === responseMessage.messageType && CHAT_SIGNLE === responseMessage.chatType) {
handleSingleRes(responseMessage, ta);
}
if (SEND_MESSAGE === responseMessage.messageType && CHAT_GROUP === responseMessage.chatType) {
handleGroupRes(responseMessage, ta);
}
},
fail: function (err) {
console.log(err);
},
complete: function () {
console.log("解码全部完成")
}
})
}
//连接建立的回调函数
socket.onopen = function (event) {
const ta = document.getElementById("responseText");
ta.value = "连接开启";
}
//连接断掉的回调函数
socket.onclose = function (event) {
const ta = document.getElementById("responseText");
ta.value = ta.value + "\n" + "连接关闭";
}
} else {
alert("浏览器不支持WebSocket!");
}
/**
*============================
* 发送数据
*============================
*/
//发送数据
function send(message) {
if (!window.WebSocket) {
return;
}
// socket.binaryType = "arraybuffer";
// 判断是否开启
if (socket.readyState !== WebSocket.OPEN) {
alert("连接没有开启");
return;
}
const data = JSON.parse(JSON.stringify(message));
console.log(data);
messageEncoder({
data: data,
success: function (buffer) {
console.log("编码成功");
socket.send(buffer);
},
fail: function (err) {
console.log(err);
},
complete: function () {
console.log("编码全部完成")
}
});
}
function login() {
if (!window.WebSocket) {
return;
}
// socket.binaryType = "arraybuffer";
// 判断是否开启
if (socket.readyState !== WebSocket.OPEN) {
alert("连接没有开启");
return;
}
const data = {
"fromUid": "",
"sendUid": "",
"chatType": 0,
"messageType": 2,
"messageFormat": 0,
"content": "LOGIN"
};
console.log(data);
messageEncoder({
data: data,
success: function (buffer) {
console.log("编码成功");
socket.send(buffer);
},
fail: function (err) {
console.log(err);
},
complete: function () {
console.log("编码全部完成")
}
});
}
function createGroup(groupId) {
if (!window.WebSocket) {
return;
}
// socket.binaryType = "arraybuffer";
// 判断是否开启
if (socket.readyState !== WebSocket.OPEN) {
alert("连接没有开启");
return;
}
let data = {
"fromUid": currentClientId,
"sendUid": groupId,
"chatType": 0,
"messageType": 3,
"messageFormat": 0,
"content": "CREATE_GROUP"
};
send(data);
}
function leaveGroup(groupId) {
if (!window.WebSocket) {
return;
}
// socket.binaryType = "arraybuffer";
// 判断是否开启
if (socket.readyState !== WebSocket.OPEN) {
alert("连接没有开启");
return;
}
const data = {
"fromUid": currentClientId,
"sendUid": groupId,
"chatType": 0,
"messageType": 5,
"messageFormat": 0,
"content": "LEAVE_GROUP"
};
send(data);
}
function sendSingleMessage(sendToClientId, content) {
if (!window.WebSocket) {
return;
}
// 判断是否开启
if (socket.readyState !== WebSocket.OPEN) {
alert("连接没有开启");
return;
}
const data = {
"fromUid": currentClientId,
"sendUid": sendToClientId,
"chatType": 1,
"messageType": 6,
"messageFormat": 0,
"content": content
};
send(data);
}
function sendGroupMessage(groupId, content) {
if (!window.WebSocket) {
return;
}
// 判断是否开启
if (socket.readyState !== WebSocket.OPEN) {
alert("连接没有开启");
return;
}
const data = {
"fromUid": currentClientId,
"sendUid": groupId,
"chatType": 2,
"messageType": 6,
"messageFormat": 0,
"content": content
};
send(data);
}
/**
*============================
* 返回值处理
*============================
*/
function handleGroupRes(resp, ta) {
let fromUid = resp.fromUid
let content = resp.content;
ta.value = ta.value + "\n收到来自:" + fromUid + "的群组消息:" + content;
}
function handleSingleRes(resp, ta) {
let content = resp.content;
let fromUid = resp.fromUid
ta.value = ta.value + "\n收到来自:" + fromUid + "的单聊消息:" + content;
}
function handleLoginRes(resp, ta) {
let content = resp.content;
currentClientId = content;
console.log("currentClientId:" + currentClientId)
ta.value = ta.value + "\n当前登录的ClientId为:" + content;
}
function handleCreateAndJoinGroupRes(resp, ta) {
let content = resp.content;
if ("TRUE" === content) {
ta.value = ta.value + "\n创建客户端组成功";
} else {
ta.value = ta.value + "\n创建客户端组失败";
}
}
function handleLeaveGroupRes(resp, ta) {
let content = resp.content;
if ("TRUE" === content) {
ta.value = ta.value + "\n离开客户端组成功";
} else {
ta.value = ta.value + "\n离开客户端组失败";
}
}
function handleSendMessageRes(resp, ta) {
let content = resp.content;
if ("TRUE" === content) {
ta.value = ta.value + "\n发送消息成功";
} else {
ta.value = ta.value + "\n发送消息失败";
}
}
/**
*============================
* protobuf 编解码处理
*============================
*/
/**
* 发送的消息编码成 protobuf
*/
function messageEncoder(obj) {
let data = obj.data;
let success = obj.success; // 成功的回调
let fail = obj.fail; // 失败的回调
let complete = obj.complete; // 成功或者失败都会回调
protobuf.load("./MessageProtocol.proto", function (err, root) {
if (err) {
if (typeof fail === "function") {
fail(err)
}
if (typeof complete === "function") {
complete()
}
return;
}
// Obtain a message type
let MessageProto = root.lookupType("MessageProto");
console.log(MessageProto);
// Exemplary payload
let payload = data;
// Verify the payload if necessary (i.e. when possibly incomplete or invalid)
let errMsg = MessageProto.verify(payload);
if (errMsg) {
if (typeof fail === "function") {
fail(errMsg)
}
if (typeof complete === "function") {
complete()
}
return;
}
// Create a new message
let message = MessageProto.create(payload); // or use .fromObject if conversion is necessary
// Encode a message to an Uint8Array (browser) or Buffer (node)
let buffer = MessageProto.encode(message).finish();
if (typeof success === "function") {
success(buffer)
}
if (typeof complete === "function") {
complete()
}
});
}
/**
* 接收到服务器二进制流的消息进行解码
*/
function messageDecoder(obj) {
let data = obj.data;
let success = obj.success; // 成功的回调
let fail = obj.fail; // 失败的回调
let complete = obj.complete; // 成功或者失败都会回调
protobuf.load("./MessageProtocol.proto", function (err, root) {
if (err) {
if (typeof fail === "function") {
fail(err)
}
if (typeof complete === "function") {
complete()
}
return;
}
// Obtain a message type
let MessageProto = root.lookupType("MessageProto");
console.log(MessageProto);
let reader = new FileReader();
reader.readAsArrayBuffer(data);
reader.onload = function (e) {
let buf = new Uint8Array(reader.result);
let messageProto = MessageProto.decode(buf);
console.log(messageProto);
if (typeof success === "function") {
success(messageProto)
}
if (typeof complete === "function") {
complete()
}
}
});
}
</script>
<h1>欢迎访问客服系统</h1>
<form onsubmit="return false">
<h4>用户登录:</h4>
<input type="button" value="登录" onclick="login()">
<br/>
<br/>
GroupId:<input type="text" name="groupId">
<input type="button" value="加入客户端组" onclick="createGroup(this.form.groupId.value)">
<br/>
<br/>
GroupId:<input type="text" name="groupId2">
<input type="button" value="离开客户端组" onclick="leaveGroup(this.form.groupId2.value)">
<br/>
<br/>
SendToClientId:<input type="text" name="clientId">
<textarea name="message1" style="width: 400px;height: 100px"></textarea>
<input type="button" value="发送数据"
onclick="sendSingleMessage(this.form.clientId.value,this.form.message1.value);">
<br/>
<br/>
GroupId:<input type="text" name="groupId3">
<textarea name="message2" style="width: 400px;height: 100px"></textarea>
<input type="button" value="发送数据"
onclick="sendGroupMessage(this.form.groupId3.value,this.form.message2.value);">
<h3>回复消息:</h3>
<textarea id="responseText" style="width: 400px;height: 300px;"></textarea>
<input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空数据">
</form>
</body>
</html> - 调试文件位于
-
项目源码地址
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 HoleLin's Blog!