参考文献

项目结构

  • 该项目主要分两个部分message-servermessage-client
    • message-client: 基于Netty实现的WebSocket客户端,具有与message-server交互的功能.
    • message-server: 基于SpringBoot结合Netty实现的WebSocket服务端.

核心代码

消息格式定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
syntax = "proto3";

package protocol;
option optimize_for = SPEED;
option java_package = "cn.holelin.messagepushnetty.protocol";
option java_outer_classname = "MessageProtocol";

// 消息格式
message MessageProto{

// 消息的唯一Id可以用 UUID 表示
string messageId = 1;
// 消息发送方
string fromUid = 2;
// 消息接收方
string sendUid = 3;
// 聊天模式 群聊还是单聊
int32 chatType = 4;
// 消息类型 比如登录消息、聊天消息、ack 消息、ping、pong 消息
int32 messageType = 5;
// 消息格式 如文本 图片等
int32 messageFormat = 6;
// 消息体
string content = 7;
// 发送消息的时间戳
int64 timestamp = 8;

}

服务端

  • WebSocketServerInitializer服务端handler初始化类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    package com.holelin.messageserver.initializer;

    import com.google.protobuf.MessageLite;
    import com.google.protobuf.MessageLiteOrBuilder;
    import com.holelin.messageserver.handler.WebSocketServerHandler;
    import com.holelin.messageserver.protocol.MessageProtocol;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.handler.codec.MessageToMessageDecoder;
    import io.netty.handler.codec.MessageToMessageEncoder;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.HttpServerCodec;
    import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.WebSocketFrame;
    import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
    import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
    import io.netty.handler.codec.protobuf.ProtobufDecoder;
    import io.netty.handler.stream.ChunkedWriteHandler;
    import io.netty.handler.timeout.IdleStateHandler;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;

    import java.util.List;
    import java.util.concurrent.TimeUnit;

    import static io.netty.buffer.Unpooled.wrappedBuffer;

    /**
    * @Description:
    * @Author: HoleLin
    * @CreateDate: 2022/12/16 10:26
    * @UpdateUser: HoleLin
    * @UpdateDate: 2022/12/16 10:26
    * @UpdateRemark: 修改内容
    * @Version: 1.0
    */
    @Component
    public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {

    @Autowired
    private WebSocketServerHandler webSocketServerHandler;

    @Value("${message.websocket.work-path:/ws}")
    private String workPath;

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {

    final ChannelPipeline pipeline = ch.pipeline();
    /*
    说明
    1. IdleStateHandler 是netty 提供的处理空闲状态的处理器
    2. long readerIdleTime : 表示多长时间没有读, 就会发送一个心跳检测包检测是否连接
    3. long writerIdleTime : 表示多长时间没有写, 就会发送一个心跳检测包检测是否连接
    4. long allIdleTime : 表示多长时间没有读写, 就会发送一个心跳检测包检测是否连接
    5. 文档说明
    triggers an {@link IdleStateEvent} when a {@link Channel} has not performed
    read, write, or both operation for a while.
    6. 当 IdleStateEvent 触发后 , 就会传递给管道 的下一个handler去处理,通过调用(触发)
    下一个handler 的 userEventTriggered , 在该方法中去处理 IdleStateEvent(读空闲,写空闲,读写空闲)
    7.handlerRemoved有时候是无法感知连接断掉,所以还是需要心跳包的检测来判断连接是否还有效
    */
    pipeline.addLast(new IdleStateHandler(6, 6, 6, TimeUnit.HOURS));
    pipeline.addLast("http-server-codec", new HttpServerCodec());
    pipeline.addLast("chunk-write", new ChunkedWriteHandler());
    pipeline.addLast("http-aggregator", new HttpObjectAggregator(64 * 1024));
    // WebSocket数据压缩
    pipeline.addLast(new WebSocketServerCompressionHandler());
    // 协议包长度限制
    pipeline.addLast(new WebSocketServerProtocolHandler(workPath, null, true));
    // 协议包解码
    pipeline.addLast(new MessageToMessageDecoder<WebSocketFrame>() {
    @Override
    protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> objs) throws Exception {
    ByteBuf buf = frame.content();
    objs.add(buf);
    buf.retain();
    }
    });
    pipeline.addLast("protobuf-decoder", new ProtobufDecoder(MessageProtocol.MessageProto.getDefaultInstance()));
    pipeline.addLast(webSocketServerHandler);

    // 协议包编码
    pipeline.addLast("custom-encoder", new MessageToMessageEncoder<MessageLiteOrBuilder>() {
    @Override
    protected void encode(ChannelHandlerContext ctx, MessageLiteOrBuilder msg, List<Object> out) throws Exception {
    ByteBuf result = null;
    if (msg instanceof MessageLite) {
    result = wrappedBuffer(((MessageLite) msg).toByteArray());
    }
    if (msg instanceof MessageLite.Builder) {
    result = wrappedBuffer(((MessageLite.Builder) msg).build().toByteArray());
    }
    // ==== 上面代码片段是拷贝自TCP ProtobufEncoder 源码 ====
    // 然后下面再转成websocket二进制流,因为客户端不能直接解析protobuf编码生成的
    assert result != null;
    WebSocketFrame frame = new BinaryWebSocketFrame(result);
    out.add(frame);
    }
    });
    }
    }
  • 业务处理handler: WebSocketServerHandler

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    package com.holelin.messageserver.handler;

    import com.holelin.messageserver.consts.StringConstants;
    import com.holelin.messageserver.enums.ChatTypeEnum;
    import com.holelin.messageserver.enums.MessageTypeEnum;
    import com.holelin.messageserver.protocol.MessageProtocol;
    import com.holelin.messageserver.utils.ServerChannelCache;
    import com.holelin.messageserver.utils.SnowflakeUtil;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.channel.group.ChannelGroup;
    import io.netty.handler.timeout.IdleStateEvent;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;

    import java.util.Objects;


    /**
    * @Description:
    * @Author: HoleLin
    * @CreateDate: 2022/12/16 10:41
    * @UpdateUser: HoleLin
    * @UpdateDate: 2022/12/16 10:41
    * @UpdateRemark: 修改内容
    * @Version: 1.0
    */
    @Component
    @ChannelHandler.Sharable
    public class WebSocketServerHandler extends SimpleChannelInboundHandler<MessageProtocol.MessageProto> {
    private final static Logger LOGGER = LoggerFactory.getLogger(WebSocketServerHandler.class);

    @Autowired
    private ServerChannelCache channelCache;
    public static final String SYSTEM_USE_ID = "SYSTEM";


    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (evt instanceof IdleStateEvent) {

    //将 evt 向下转型 IdleStateEvent
    IdleStateEvent event = (IdleStateEvent) evt;
    String eventType = null;
    switch (event.state()) {
    case ALL_IDLE:
    eventType = "读写空闲";
    final Channel channel = ctx.channel();
    channelCache.loginOut(channel);
    channel.close();
    break;
    }
    }
    }


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol.MessageProto message) {
    final Channel channel = ctx.channel();
    final int messageType = message.getMessageType();
    final int chatType = message.getChatType();
    // 处理系统基本功能
    if (MessageTypeEnum.PING.code == messageType
    && ChatTypeEnum.SYSTEM.code == chatType) {
    handlePing(message, channel);
    }
    // 处理登录消息
    if (MessageTypeEnum.LOGIN.code == messageType
    && ChatTypeEnum.SYSTEM.code == chatType) {
    handleLogin(channel);
    }
    // 处理创建聊天组
    if (MessageTypeEnum.CREATE_AND_JOIN_GROUP.code == messageType
    && ChatTypeEnum.SYSTEM.code == chatType) {
    handleCreateGroup(message, channel);
    }

    // 处理离开聊天组
    if (MessageTypeEnum.LEAVE_GROUP.code == messageType
    && ChatTypeEnum.SYSTEM.code == chatType) {
    handleLevelGroup(message, channel);
    }

    // 处理聊天
    // 处理单聊
    if (MessageTypeEnum.SEND_MESSAGE.code == messageType
    && ChatTypeEnum.SINGLE.code == chatType) {
    handleSingleChat(message, channel);
    }

    if (MessageTypeEnum.SEND_MESSAGE.code == messageType
    && ChatTypeEnum.GROUP.code == chatType) {
    handleGroupChat(message, channel);
    }

    }

    private void handleLevelGroup(MessageProtocol.MessageProto message, Channel channel) {
    final String fromUid = message.getFromUid();
    final String groupId = message.getSendUid();
    final Channel fromChannel = channelCache.getChannelByClientId(fromUid);
    final MessageProtocol.MessageProto.Builder builder = MessageProtocol.MessageProto.newBuilder()
    .setMessageType(MessageTypeEnum.LEAVE_GROUP_RESPONSE.code)
    .setFromUid(SYSTEM_USE_ID)
    .setSendUid(fromUid);
    if (Objects.nonNull(fromChannel)) {
    if (channelCache.existsGroupId(groupId)) {
    channelCache.leaveGroup(groupId, channel);
    fromChannel.writeAndFlush(builder.setContent(StringConstants.TRUE).build());
    LOGGER.info("客户端ID:{}离开客户端组:{}", fromUid, groupId);
    } else {
    fromChannel.writeAndFlush(builder.setContent(StringConstants.FALSE).build());
    LOGGER.warn("客户端ID:{}离开客户端组:{}失败,租户端组不存在", fromUid, groupId);
    }
    }
    }

    private void handleGroupChat(MessageProtocol.MessageProto message, Channel currentChannel) {
    final String groupId = message.getSendUid();
    final String fromUid = message.getFromUid();
    final ChannelGroup channelGroup = channelCache.getChannelGroup(groupId);
    final MessageProtocol.MessageProto.Builder builder = MessageProtocol.MessageProto.newBuilder()
    .setMessageType(MessageTypeEnum.SEND_MESSAGE_RESPONSE.code)
    .setChatType(ChatTypeEnum.GROUP.code)
    .setFromUid(SYSTEM_USE_ID)
    .setSendUid(fromUid);
    if (Objects.nonNull(channelGroup) && Objects.nonNull(channelGroup.find(currentChannel.id()))) {
    channelGroup.writeAndFlush(message);
    LOGGER.info("发送客户端组消息成功,消息为:{}", message);
    // 返回值
    currentChannel.writeAndFlush(builder.setContent(StringConstants.TRUE).build());
    } else {
    currentChannel.writeAndFlush(builder.setContent(StringConstants.FALSE).build());
    LOGGER.warn("{}组不存在", groupId);
    }
    }

    /**
    * 处理创建聊天组
    *
    * @param message
    * @param channel
    */
    private void handleCreateGroup(MessageProtocol.MessageProto message, Channel channel) {
    final String groupId = message.getSendUid();
    final String fromUid = message.getFromUid();
    final Channel fromChannel = channelCache.getChannelByClientId(fromUid);
    final MessageProtocol.MessageProto.Builder builder = MessageProtocol.MessageProto.newBuilder()
    .setMessageType(MessageTypeEnum.CREATE_AND_JOIN_GROUP_RESPONSE.code)
    .setFromUid(SYSTEM_USE_ID)
    .setSendUid(fromUid);
    channelCache.createAndJoinGroup(groupId, channel);
    if (Objects.nonNull(fromChannel)) {
    fromChannel.writeAndFlush(builder.setContent(StringConstants.TRUE).build());
    LOGGER.info("用户:{}加入用户组:{}成功", fromUid, groupId);
    }
    }

    /**
    * 处理单聊消息
    *
    * @param message
    */
    private void handleSingleChat(MessageProtocol.MessageProto message, Channel currentChannel) {
    final String sendUid = message.getSendUid();
    final String fromUid = message.getFromUid();
    final Channel channel = channelCache.getChannelByClientId(sendUid);
    final MessageProtocol.MessageProto.Builder builder = MessageProtocol.MessageProto.newBuilder()
    .setMessageType(MessageTypeEnum.SEND_MESSAGE_RESPONSE.code)
    .setChatType(ChatTypeEnum.SINGLE.code)
    .setFromUid(SYSTEM_USE_ID)
    .setSendUid(fromUid);
    if (Objects.nonNull(channel)) {
    channel.writeAndFlush(message);
    currentChannel.writeAndFlush(builder.setContent(StringConstants.TRUE).build());
    LOGGER.info("发送客户端消息成功,消息为:{}", message);
    } else {
    currentChannel.writeAndFlush(builder.setContent(StringConstants.FALSE).build());
    LOGGER.warn("用户:{},不在线", sendUid);
    }
    }

    /**
    * 处理登录消息
    */
    private void handleLogin(Channel channel) {
    final String clientId = String.valueOf(SnowflakeUtil.genId());
    channelCache.login(clientId, channel);
    final MessageProtocol.MessageProto responseMessage = MessageProtocol.MessageProto.newBuilder()
    .setMessageType(MessageTypeEnum.LOGIN_RESPONSE.code)
    .setFromUid(clientId)
    .setSendUid(SYSTEM_USE_ID)
    .setContent(clientId)
    .build();
    channel.writeAndFlush(responseMessage);
    LOGGER.info("用户:{}上线了...", clientId);
    }

    /**
    * 处理PING消息
    *
    * @param message
    * @param channel
    */
    private void handlePing(MessageProtocol.MessageProto message, Channel channel) {
    final String fromUid = message.getFromUid();
    // 构建PONG消息
    final MessageProtocol.MessageProto pongMessage = MessageProtocol.MessageProto.newBuilder()
    .setMessageType(MessageTypeEnum.PONG.code)
    .setFromUid(SYSTEM_USE_ID)
    .setSendUid(fromUid)
    .setContent(MessageTypeEnum.PONG.toString())
    .build();

    channel.writeAndFlush(pongMessage);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    cause.printStackTrace();
    ctx.close();
    }
    }

客户端

  • WebSocketClient

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    package com.holelin.messageclient.client;

    import com.google.protobuf.MessageLite;
    import com.google.protobuf.MessageLiteOrBuilder;
    import com.holelin.messageclient.handler.WebSocketClientHandler;
    import com.holelin.messageclient.handler.WebSocketMessageClientHandler;
    import com.holelin.messageclient.protocol.MessageProtocol;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.MessageToMessageDecoder;
    import io.netty.handler.codec.MessageToMessageEncoder;
    import io.netty.handler.codec.http.DefaultHttpHeaders;
    import io.netty.handler.codec.http.HttpClientCodec;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
    import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
    import io.netty.handler.codec.http.websocketx.WebSocketFrame;
    import io.netty.handler.codec.http.websocketx.WebSocketVersion;
    import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
    import io.netty.handler.codec.protobuf.ProtobufDecoder;
    import io.netty.util.concurrent.DefaultThreadFactory;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    import java.lang.reflect.Proxy;
    import java.net.URI;
    import java.net.URISyntaxException;
    import java.util.List;
    import java.util.Objects;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    import static io.netty.buffer.Unpooled.wrappedBuffer;


    /**
    * @Description:
    * @Author: HoleLin
    * @CreateDate: 2023/1/3 16:09
    * @UpdateUser: HoleLin
    * @UpdateDate: 2023/1/3 16:09
    * @UpdateRemark: 修改内容
    * @Version: 1.0
    */
    public class WebSocketClient implements Client {

    private final static Logger LOGGER = LoggerFactory.getLogger(WebSocketClient.class);

    private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    public static final String CLIENT_HANDLER_NAME = "client-handler";


    /**
    * 服务端访问地址
    * 例: ws://localhost:8080/websocketPath
    */
    private URI uri;

    private Channel channel;
    private WebSocketMessageClientHandler messageClientHandler;

    public WebSocketClient(String uri) throws URISyntaxException {
    this.uri = new URI(uri);

    }

    /**
    * 根据构造函数传入的URI连接服务器,启动客户端
    */
    public void initClient() {
    final NioEventLoopGroup group = new NioEventLoopGroup(new DefaultThreadFactory("WebSocketClient"));
    final Bootstrap bootstrap = new Bootstrap();
    messageClientHandler = new WebSocketMessageClientHandler();
    bootstrap.group(group)
    .channel(NioSocketChannel.class)
    .option(ChannelOption.TCP_NODELAY, true)
    .handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    final ChannelPipeline pipeline = ch.pipeline();

    pipeline.addLast("http-codec", new HttpClientCodec());
    pipeline.addLast("http-aggregator", new HttpObjectAggregator(65536));
    pipeline.addLast(WebSocketClientCompressionHandler.INSTANCE);
    // 都属于ChannelInboundHandler,按照顺序执行
    // 协议包解码
    pipeline.addLast("websocket-decoder", new MessageToMessageDecoder<WebSocketFrame>() {
    @Override
    protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> objs) throws Exception {
    ByteBuf buf = frame.content();
    objs.add(buf);
    buf.retain();
    }
    });
    pipeline.addLast("protobuf-decoder", new ProtobufDecoder(MessageProtocol.MessageProto.getDefaultInstance()));
    pipeline.addLast("message-handler", messageClientHandler);
    pipeline.addLast(CLIENT_HANDLER_NAME, new WebSocketClientHandler());

    // 都属于ChannelOutboundHandler,逆序执行
    // 编码器
    pipeline.addLast(new MessageToMessageEncoder<MessageLiteOrBuilder>() {
    @Override
    protected void encode(ChannelHandlerContext ctx, MessageLiteOrBuilder msg, List<Object> out) throws Exception {
    ByteBuf result = null;
    if (msg instanceof MessageLite) {
    result = wrappedBuffer(((MessageLite) msg).toByteArray());
    }
    if (msg instanceof MessageLite.Builder) {
    result = wrappedBuffer(((MessageLite.Builder) msg).build().toByteArray());
    }

    // ==== 上面代码片段是拷贝自TCP ProtobufEncoder 源码 ====
    // 然后下面再转成websocket二进制流,因为客户端不能直接解析protobuf编码生成的
    assert result != null;
    WebSocketFrame frame = new BinaryWebSocketFrame(result);
    out.add(frame);
    }
    });
    }
    });
    try {
    final String host = uri.getHost();
    final int port = uri.getPort();
    final ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
    channel = channelFuture.channel();
    channelFuture.addListener(listener -> {
    if (listener.isSuccess()) {
    LOGGER.debug("客户端连接host:{},port:{}成功", host, port);
    } else {
    LOGGER.error("客户端连接host:{},port:{}失败", host, port);
    }
    });
    // 通过它构造握手响应消息返回给客户端,
    // 同时将WebSocket相关的编码和解码类动态添加到ChannelPipeline中,用于WebSocket消息的编解码,
    // 添加WebSocketEncoder和WebSocketDecoder之后,服务端就可以自动对WebSocket消息进行编解码了
    WebSocketClientHandshaker handShaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders());
    final WebSocketClientHandler clientHandler = (WebSocketClientHandler) channel.pipeline().get(CLIENT_HANDLER_NAME);
    clientHandler.setHandShaker(handShaker);
    handShaker.handshake(channel);
    //阻塞等待是否握手成功
    final ChannelFuture handShakerChannelFuture = clientHandler.handshakeFuture().sync();
    handShakerChannelFuture.addListener(listener -> {
    if (listener.isSuccess()) {
    LOGGER.debug("WebSocket 握手成功");
    } else {
    LOGGER.error("WebSocket 握手失败");
    }
    });
    } catch (InterruptedException e) {
    throw new RuntimeException(e);
    }

    }

    public Object getBean(final Class<?> serviceClass) {
    return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
    new Class[]{serviceClass},
    ((proxy, method, args) -> {
    if (Objects.isNull(messageClientHandler)) {
    initClient();
    }
    messageClientHandler.setMessage((MessageProtocol.MessageProto) args[0]);
    return executor.submit(messageClientHandler).get();
    }));
    }


    @Override
    public void close() {
    channel.close();
    }
    }

  • 业务处理类:WebSocketMessageClientHandler

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    package com.holelin.messageclient.handler;

    import com.holelin.messageclient.protocol.MessageProtocol;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import lombok.extern.slf4j.Slf4j;

    import java.util.Objects;
    import java.util.concurrent.Callable;

    /**
    * @Description: WebSocket客户端消息处理类
    * @Author: HoleLin
    * @CreateDate: 2023/1/5 13:40
    * @UpdateUser: HoleLin
    * @UpdateDate: 2023/1/5 13:40
    * @UpdateRemark: 修改内容
    * @Version: 1.0
    */
    @Slf4j
    public class WebSocketMessageClientHandler extends SimpleChannelInboundHandler<MessageProtocol.MessageProto> implements Callable {
    /**
    * 上下文
    */
    private ChannelHandlerContext context;

    /**
    * 发送的消息
    */
    private MessageProtocol.MessageProto message;
    /**
    * 服务器返回值
    */
    private MessageProtocol.MessageProto result;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    this.context = ctx;
    }

    @Override
    protected synchronized void channelRead0(ChannelHandlerContext ctx, MessageProtocol.MessageProto msg) throws Exception {
    log.info("收到服务器的消息:{}", msg.getContent());
    this.result = msg;
    notify();
    }


    @Override
    public synchronized Object call() throws Exception {

    if (Objects.nonNull(message)) {
    final Channel channel = context.pipeline().channel();
    channel.writeAndFlush(message);
    }
    // 等待服务器发送的消息
    wait();
    return result;
    }

    /**
    * 设置发送的消息
    *
    */
    public void setMessage(MessageProtocol.MessageProto message) {
    this.message = message;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    ctx.channel().close();
    }
    }

项目启动方式

  • message-server: 以SpringBoot项目启动方式启动

  • message-client: 为封装的与message-server交互的API,其中具有连通性验证,登录服务器,创建并加入客户端组,离开客户端组,给指定客户端发送消息,给指定客户端组发送消息等功能.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    package com.diannei.ai.messageclient.client;


    import com.diannei.ai.messageclient.utils.SendMessageUtil;
    import com.diannei.ai.messageclient.utils.SnowflakeUtil;
    import org.junit.jupiter.api.Test;

    import java.net.URISyntaxException;

    class WebSocketClientTest {

    String uri = "ws://localhost:10089/ws";

    @Test
    void ping() throws URISyntaxException {
    final WebSocketClient client = new WebSocketClient(uri);
    final SendMessageUtil sendMessageUtil = new SendMessageUtil(client);
    final Boolean ping = sendMessageUtil.ping();
    System.out.println(ping);
    }
    @Test
    void login() throws URISyntaxException {
    final WebSocketClient client = new WebSocketClient(uri);
    final SendMessageUtil sendMessageUtil = new SendMessageUtil(client);
    System.out.println(sendMessageUtil.login());
    }

    @Test
    void createGroup() throws URISyntaxException {
    final WebSocketClient client = new WebSocketClient(uri);
    final SendMessageUtil sendMessageUtil = new SendMessageUtil(client);
    final String clientId = sendMessageUtil.login();
    System.out.println(sendMessageUtil.createGroup(clientId, String.valueOf(SnowflakeUtil.genId())));
    }


    @Test
    void leaveGroup() throws URISyntaxException {
    final WebSocketClient client = new WebSocketClient(uri);
    final SendMessageUtil sendMessageUtil = new SendMessageUtil(client);
    final String clientId = sendMessageUtil.login();
    System.out.println(sendMessageUtil.leaveGroup(clientId, String.valueOf(SnowflakeUtil.genId())));
    }


    @Test
    void sendSingleMessage() throws URISyntaxException {
    final WebSocketClient client = new WebSocketClient(uri);
    final SendMessageUtil sendMessageUtil = new SendMessageUtil(client);
    final String clientId = sendMessageUtil.login();
    System.out.println(sendMessageUtil.sendSingleMessage(clientId, "1063424804638380032","测试消息"));
    }


    @Test
    void sendGroupMessage() throws URISyntaxException {
    final WebSocketClient client = new WebSocketClient(uri);
    final SendMessageUtil sendMessageUtil = new SendMessageUtil(client);
    final String clientId = sendMessageUtil.login();
    System.out.println(sendMessageUtil.sendGroupMessage(clientId, "TEXT","测试消息"));
    }
    }

项目调试

  • 调试可以使用两种方式:

    • Java-Client-API方式,即上方的单元测试类

    • 前端JS调试

      • 调试文件位于message-server/src/main/resources/front目录下
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      117
      118
      119
      120
      121
      122
      123
      124
      125
      126
      127
      128
      129
      130
      131
      132
      133
      134
      135
      136
      137
      138
      139
      140
      141
      142
      143
      144
      145
      146
      147
      148
      149
      150
      151
      152
      153
      154
      155
      156
      157
      158
      159
      160
      161
      162
      163
      164
      165
      166
      167
      168
      169
      170
      171
      172
      173
      174
      175
      176
      177
      178
      179
      180
      181
      182
      183
      184
      185
      186
      187
      188
      189
      190
      191
      192
      193
      194
      195
      196
      197
      198
      199
      200
      201
      202
      203
      204
      205
      206
      207
      208
      209
      210
      211
      212
      213
      214
      215
      216
      217
      218
      219
      220
      221
      222
      223
      224
      225
      226
      227
      228
      229
      230
      231
      232
      233
      234
      235
      236
      237
      238
      239
      240
      241
      242
      243
      244
      245
      246
      247
      248
      249
      250
      251
      252
      253
      254
      255
      256
      257
      258
      259
      260
      261
      262
      263
      264
      265
      266
      267
      268
      269
      270
      271
      272
      273
      274
      275
      276
      277
      278
      279
      280
      281
      282
      283
      284
      285
      286
      287
      288
      289
      290
      291
      292
      293
      294
      295
      296
      297
      298
      299
      300
      301
      302
      303
      304
      305
      306
      307
      308
      309
      310
      311
      312
      313
      314
      315
      316
      317
      318
      319
      320
      321
      322
      323
      324
      325
      326
      327
      328
      329
      330
      331
      332
      333
      334
      335
      336
      337
      338
      339
      340
      341
      342
      343
      344
      345
      346
      347
      348
      349
      350
      351
      352
      353
      354
      355
      356
      357
      358
      359
      360
      361
      362
      363
      364
      365
      366
      367
      368
      369
      370
      371
      372
      373
      374
      375
      376
      377
      378
      379
      380
      381
      382
      383
      384
      385
      386
      387
      388
      389
      390
      391
      392
      393
      394
      395
      396
      397
      398
      <!DOCTYPE html>
      <html lang="en">
      <head>
      <meta charset="UTF-8">
      <title>WebSocket客户端</title>
      </head>
      <body>

      <script src="protobuf.min.js"></script>

      <script type="text/javascript">
      let socket;
      const SEND_MESSAGE = 6;
      const LOGIN_RESPONSE = 20;
      const CREATE_AND_JOIN_GROUP_RESPONSE = 30;
      const LEAVE_GROUP_RESPONSE = 50;
      const SEND_MESSAGE_RESPONSE = 60;

      const CHAT_SIGNLE = 1;
      const CHAT_GROUP = 2;
      let currentClientId;

      //如果浏览器支持WebSocket
      if (window.WebSocket) {
      //参数就是与服务器连接的地址
      socket = new WebSocket("ws://localhost:10089/ws");
      //客户端收到服务器消息的时候就会执行这个回调方法
      socket.onmessage = function (event) {
      const ta = document.getElementById("responseText");
      // 解码
      messageDecoder({
      data: event.data,
      success: function (responseMessage) {
      console.log("返回值为", responseMessage)
      if (LOGIN_RESPONSE === responseMessage.messageType) {
      handleLoginRes(responseMessage, ta);
      }
      if (CREATE_AND_JOIN_GROUP_RESPONSE === responseMessage.messageType) {
      handleCreateAndJoinGroupRes(responseMessage, ta);
      }
      if (LEAVE_GROUP_RESPONSE === responseMessage.messageType) {
      handleLeaveGroupRes(responseMessage, ta);
      }

      if (SEND_MESSAGE_RESPONSE === responseMessage.messageType) {
      handleSendMessageRes(responseMessage, ta);
      }
      if (SEND_MESSAGE === responseMessage.messageType && CHAT_SIGNLE === responseMessage.chatType) {
      handleSingleRes(responseMessage, ta);
      }
      if (SEND_MESSAGE === responseMessage.messageType && CHAT_GROUP === responseMessage.chatType) {
      handleGroupRes(responseMessage, ta);
      }
      },
      fail: function (err) {
      console.log(err);
      },
      complete: function () {
      console.log("解码全部完成")
      }
      })
      }
      //连接建立的回调函数
      socket.onopen = function (event) {
      const ta = document.getElementById("responseText");
      ta.value = "连接开启";
      }
      //连接断掉的回调函数
      socket.onclose = function (event) {
      const ta = document.getElementById("responseText");
      ta.value = ta.value + "\n" + "连接关闭";
      }
      } else {
      alert("浏览器不支持WebSocket!");
      }

      /**
      *============================
      * 发送数据
      *============================
      */

      //发送数据
      function send(message) {
      if (!window.WebSocket) {
      return;
      }
      // socket.binaryType = "arraybuffer";
      // 判断是否开启
      if (socket.readyState !== WebSocket.OPEN) {
      alert("连接没有开启");
      return;
      }
      const data = JSON.parse(JSON.stringify(message));
      console.log(data);
      messageEncoder({
      data: data,
      success: function (buffer) {
      console.log("编码成功");
      socket.send(buffer);
      },
      fail: function (err) {
      console.log(err);
      },
      complete: function () {
      console.log("编码全部完成")
      }
      });
      }

      function login() {
      if (!window.WebSocket) {
      return;
      }
      // socket.binaryType = "arraybuffer";
      // 判断是否开启
      if (socket.readyState !== WebSocket.OPEN) {
      alert("连接没有开启");
      return;
      }
      const data = {
      "fromUid": "",
      "sendUid": "",
      "chatType": 0,
      "messageType": 2,
      "messageFormat": 0,
      "content": "LOGIN"
      };
      console.log(data);
      messageEncoder({
      data: data,
      success: function (buffer) {
      console.log("编码成功");
      socket.send(buffer);
      },
      fail: function (err) {
      console.log(err);
      },
      complete: function () {
      console.log("编码全部完成")
      }
      });
      }

      function createGroup(groupId) {
      if (!window.WebSocket) {
      return;
      }
      // socket.binaryType = "arraybuffer";
      // 判断是否开启
      if (socket.readyState !== WebSocket.OPEN) {
      alert("连接没有开启");
      return;
      }
      let data = {
      "fromUid": currentClientId,
      "sendUid": groupId,
      "chatType": 0,
      "messageType": 3,
      "messageFormat": 0,
      "content": "CREATE_GROUP"
      };
      send(data);
      }

      function leaveGroup(groupId) {
      if (!window.WebSocket) {
      return;
      }
      // socket.binaryType = "arraybuffer";
      // 判断是否开启
      if (socket.readyState !== WebSocket.OPEN) {
      alert("连接没有开启");
      return;
      }
      const data = {
      "fromUid": currentClientId,
      "sendUid": groupId,
      "chatType": 0,
      "messageType": 5,
      "messageFormat": 0,
      "content": "LEAVE_GROUP"
      };
      send(data);
      }

      function sendSingleMessage(sendToClientId, content) {
      if (!window.WebSocket) {
      return;
      }
      // 判断是否开启
      if (socket.readyState !== WebSocket.OPEN) {
      alert("连接没有开启");
      return;
      }
      const data = {
      "fromUid": currentClientId,
      "sendUid": sendToClientId,
      "chatType": 1,
      "messageType": 6,
      "messageFormat": 0,
      "content": content
      };
      send(data);
      }

      function sendGroupMessage(groupId, content) {
      if (!window.WebSocket) {
      return;
      }
      // 判断是否开启
      if (socket.readyState !== WebSocket.OPEN) {
      alert("连接没有开启");
      return;
      }
      const data = {
      "fromUid": currentClientId,
      "sendUid": groupId,
      "chatType": 2,
      "messageType": 6,
      "messageFormat": 0,
      "content": content
      };
      send(data);
      }

      /**
      *============================
      * 返回值处理
      *============================
      */
      function handleGroupRes(resp, ta) {
      let fromUid = resp.fromUid
      let content = resp.content;
      ta.value = ta.value + "\n收到来自:" + fromUid + "的群组消息:" + content;
      }

      function handleSingleRes(resp, ta) {
      let content = resp.content;
      let fromUid = resp.fromUid
      ta.value = ta.value + "\n收到来自:" + fromUid + "的单聊消息:" + content;
      }

      function handleLoginRes(resp, ta) {
      let content = resp.content;
      currentClientId = content;
      console.log("currentClientId:" + currentClientId)
      ta.value = ta.value + "\n当前登录的ClientId为:" + content;
      }

      function handleCreateAndJoinGroupRes(resp, ta) {
      let content = resp.content;
      if ("TRUE" === content) {
      ta.value = ta.value + "\n创建客户端组成功";
      } else {
      ta.value = ta.value + "\n创建客户端组失败";
      }
      }

      function handleLeaveGroupRes(resp, ta) {
      let content = resp.content;
      if ("TRUE" === content) {
      ta.value = ta.value + "\n离开客户端组成功";
      } else {
      ta.value = ta.value + "\n离开客户端组失败";
      }
      }

      function handleSendMessageRes(resp, ta) {
      let content = resp.content;
      if ("TRUE" === content) {
      ta.value = ta.value + "\n发送消息成功";
      } else {
      ta.value = ta.value + "\n发送消息失败";
      }
      }

      /**
      *============================
      * protobuf 编解码处理
      *============================
      */
      /**
      * 发送的消息编码成 protobuf
      */
      function messageEncoder(obj) {
      let data = obj.data;
      let success = obj.success; // 成功的回调
      let fail = obj.fail; // 失败的回调
      let complete = obj.complete; // 成功或者失败都会回调
      protobuf.load("./MessageProtocol.proto", function (err, root) {
      if (err) {
      if (typeof fail === "function") {
      fail(err)
      }
      if (typeof complete === "function") {
      complete()
      }
      return;
      }
      // Obtain a message type
      let MessageProto = root.lookupType("MessageProto");
      console.log(MessageProto);

      // Exemplary payload
      let payload = data;
      // Verify the payload if necessary (i.e. when possibly incomplete or invalid)
      let errMsg = MessageProto.verify(payload);
      if (errMsg) {
      if (typeof fail === "function") {
      fail(errMsg)
      }
      if (typeof complete === "function") {
      complete()
      }
      return;
      }
      // Create a new message
      let message = MessageProto.create(payload); // or use .fromObject if conversion is necessary
      // Encode a message to an Uint8Array (browser) or Buffer (node)
      let buffer = MessageProto.encode(message).finish();
      if (typeof success === "function") {
      success(buffer)
      }
      if (typeof complete === "function") {
      complete()
      }
      });
      }

      /**
      * 接收到服务器二进制流的消息进行解码
      */
      function messageDecoder(obj) {
      let data = obj.data;
      let success = obj.success; // 成功的回调
      let fail = obj.fail; // 失败的回调
      let complete = obj.complete; // 成功或者失败都会回调
      protobuf.load("./MessageProtocol.proto", function (err, root) {
      if (err) {
      if (typeof fail === "function") {
      fail(err)
      }
      if (typeof complete === "function") {
      complete()
      }
      return;
      }
      // Obtain a message type
      let MessageProto = root.lookupType("MessageProto");
      console.log(MessageProto);
      let reader = new FileReader();
      reader.readAsArrayBuffer(data);
      reader.onload = function (e) {
      let buf = new Uint8Array(reader.result);
      let messageProto = MessageProto.decode(buf);
      console.log(messageProto);
      if (typeof success === "function") {
      success(messageProto)
      }
      if (typeof complete === "function") {
      complete()
      }
      }
      });
      }
      </script>

      <h1>欢迎访问客服系统</h1>
      <form onsubmit="return false">
      <h4>用户登录:</h4>
      <input type="button" value="登录" onclick="login()">
      <br/>
      <br/>
      GroupId:<input type="text" name="groupId">
      <input type="button" value="加入客户端组" onclick="createGroup(this.form.groupId.value)">
      <br/>
      <br/>
      GroupId:<input type="text" name="groupId2">
      <input type="button" value="离开客户端组" onclick="leaveGroup(this.form.groupId2.value)">
      <br/>
      <br/>
      SendToClientId:<input type="text" name="clientId">
      <textarea name="message1" style="width: 400px;height: 100px"></textarea>
      <input type="button" value="发送数据"
      onclick="sendSingleMessage(this.form.clientId.value,this.form.message1.value);">
      <br/>
      <br/>
      GroupId:<input type="text" name="groupId3">
      <textarea name="message2" style="width: 400px;height: 100px"></textarea>
      <input type="button" value="发送数据"
      onclick="sendGroupMessage(this.form.groupId3.value,this.form.message2.value);">
      <h3>回复消息:</h3>
      <textarea id="responseText" style="width: 400px;height: 300px;"></textarea>
      <input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空数据">
      </form>
      </body>
      </html>

项目源码地址