netty-websocket

2021-11-11

java

历经万般红尘劫,犹如凉风轻拂面。——林清玄

今天用了这个netty-websocket-spring-boot-starter

那是相当的香啊

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package com.ruben.xchat.controller;

import cn.hutool.core.exceptions.ExceptionUtil;
import com.alibaba.fastjson.JSON;
import com.ruben.xchat.pojo.to.ChatTransferObject;
import com.ruben.xchat.service.WebSocketService;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.MultiValueMap;
import org.yeauty.annotation.*;
import org.yeauty.pojo.Session;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
* WebSocket控制层
* https://gitee.com/Yeauty/netty-websocket-spring-boot-starter
*
* @author <achao1441470436@gmail.com>
* @since 2021/11/11 9:14
*/
@Slf4j
@Component
@ServerEndpoint(path = "${ws.path}", port = "${ws.port}")
public class WebSocketController {

@Autowired
private WebSocketService webSocketService;

/**
* 建立会话之前
*
* @param session 会话
* @param headers 请求头
* @param req 请求参数
* @param reqMap 请求参数
* @param arg 路径上的参数
* @param pathMap 路径上的参数
* @author <achao1441470436@gmail.com>
* @since 2021/11/11 9:18
*/
@BeforeHandshake
public void handshake(Session session, HttpHeaders headers, @RequestParam String req, @RequestParam MultiValueMap<String, List<Object>> reqMap, @PathVariable String arg, @PathVariable Map<String, Object> pathMap) {
session.setSubprotocols("stomp");
// 进行认证
try {
webSocketService.verifyLogin(headers);
} catch (Exception e) {
session.close();
ExceptionUtil.wrapAndThrow(e);
}
}


/**
* 当有新的WebSocket连接完成时
*
* @param session 会话
* @param headers 请求头
* @param req 请求参数
* @param reqMap 请求参数
* @param arg 路径上的参数
* @param pathMap 路径上的参数
* @author <achao1441470436@gmail.com>
* @since 2021/11/11 9:21
*/
@OnOpen
public void onOpen(Session session, HttpHeaders headers, @RequestParam String req, @RequestParam MultiValueMap<String, List<Object>> reqMap, @PathVariable String arg, @PathVariable Map<String, Object> pathMap) {
log.info("new connection");
webSocketService.registerSession(headers, session);
}

/**
* 连接关闭
*
* @param session 会话
* @author <achao1441470436@gmail.com>
* @since 2021/11/11 9:22
*/
@OnClose
public void onClose(Session session) throws IOException {
log.info("one connection closed");
webSocketService.removeSession(session);
}

/**
* 连接发生异常
*
* @param session 会话
* @param throwable 异常
* @author <achao1441470436@gmail.com>
* @since 2021/11/11 9:22
*/
@OnError
public void onError(Session session, Throwable throwable) {
log.error("Connection error happened.", throwable);
webSocketService.removeSession(session);
}

/**
* 收到消息
*
* @param session 会话
* @author <achao1441470436@gmail.com>
* @since 2021/11/11 9:22
*/
@OnMessage
public void onMessage(Session session, String message) {
System.out.println(message);
ChatTransferObject chat = JSON.parseObject(message, ChatTransferObject.class);
switch (chat.getChatToType()) {
case CHAT_ONE:
log.info("发送到个人:{}", chat);
webSocketService.sendSomeone(session, chat);
break;
case CHAT_GROUP:
log.error("发送到群聊");
break;
default:
log.error("未识别的消息类型");
break;
}
}

/**
* 收到二进制消息
*
* @param session 会话
* @param bytes 消息
* @author <achao1441470436@gmail.com>
* @since 2021/11/11 9:22
*/
@OnBinary
public void onBinary(Session session, byte[] bytes) {
for (byte b : bytes) {
System.out.println(b);
}
session.sendBinary(bytes);
}

/**
* 收到Netty事件
*
* @param session 会话
* @param evt 事件
* @author <achao1441470436@gmail.com>
* @since 2021/11/11 9:22
*/
@OnEvent
public void onEvent(Session session, Object evt) {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
switch (idleStateEvent.state()) {
case READER_IDLE:
System.out.println("read idle");
break;
case WRITER_IDLE:
System.out.println("write idle");
break;
case ALL_IDLE:
System.out.println("all idle");
break;
default:
break;
}
}
}
}

使用注解进行配置,netty的各种配置例如端口、主机、都可以在yml中配置,文档就是gitee中的md,用来做即时通讯简直不要太香

netty-websocket-spring-boot-starter

License

English Docs

简介

本项目帮助你在spring-boot中使用Netty来开发WebSocket服务器,并像spring-websocket的注解开发一样简单

要求

  • jdk版本为1.8或1.8+

快速开始

  • 添加依赖:
1
2
3
4
5
<dependency>
<groupId>org.yeauty</groupId>
<artifactId>netty-websocket-spring-boot-starter</artifactId>
<version>0.12.0</version>
</dependency>
  • 在端点类上加上@ServerEndpoint注解,并在相应的方法上加上@BeforeHandshake@OnOpen@OnClose@OnError@OnMessage@OnBinary@OnEvent注解,样例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@ServerEndpoint(path = "/ws/{arg}")
public class MyWebSocket {

@BeforeHandshake
public void handshake(Session session, HttpHeaders headers, @RequestParam String req, @RequestParam MultiValueMap reqMap, @PathVariable String arg, @PathVariable Map pathMap){
session.setSubprotocols("stomp");
if (!"ok".equals(req)){
System.out.println("Authentication failed!");
session.close();
}
}

@OnOpen
public void onOpen(Session session, HttpHeaders headers, @RequestParam String req, @RequestParam MultiValueMap reqMap, @PathVariable String arg, @PathVariable Map pathMap){
System.out.println("new connection");
System.out.println(req);
}

@OnClose
public void onClose(Session session) throws IOException {
System.out.println("one connection closed");
}

@OnError
public void onError(Session session, Throwable throwable) {
throwable.printStackTrace();
}

@OnMessage
public void onMessage(Session session, String message) {
System.out.println(message);
session.sendText("Hello Netty!");
}

@OnBinary
public void onBinary(Session session, byte[] bytes) {
for (byte b : bytes) {
System.out.println(b);
}
session.sendBinary(bytes);
}

@OnEvent
public void onEvent(Session session, Object evt) {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
switch (idleStateEvent.state()) {
case READER_IDLE:
System.out.println("read idle");
break;
case WRITER_IDLE:
System.out.println("write idle");
break;
case ALL_IDLE:
System.out.println("all idle");
break;
default:
break;
}
}
}

}
  • 打开WebSocket客户端,连接到ws://127.0.0.1:80/ws/xxx

注解

@ServerEndpoint

当ServerEndpointExporter类通过Spring配置进行声明并被使用,它将会去扫描带有@ServerEndpoint注解的类 被注解的类将被注册成为一个WebSocket端点 所有的配置项都在这个注解的属性中 ( 如:@ServerEndpoint("/ws") )

@BeforeHandshake

当有新的连接进入时,对该方法进行回调 注入参数的类型:Session、HttpHeaders…

@OnOpen

当有新的WebSocket连接完成时,对该方法进行回调 注入参数的类型:Session、HttpHeaders…

@OnClose

当有WebSocket连接关闭时,对该方法进行回调 注入参数的类型:Session

@OnError

当有WebSocket抛出异常时,对该方法进行回调 注入参数的类型:Session、Throwable

@OnMessage

当接收到字符串消息时,对该方法进行回调 注入参数的类型:Session、String

@OnBinary

当接收到二进制消息时,对该方法进行回调 注入参数的类型:Session、byte[]

@OnEvent

当接收到Netty的事件时,对该方法进行回调 注入参数的类型:Session、Object

配置

所有的配置项都在这个注解的属性中

属性 默认值 说明
path “/” WebSocket的path,也可以用value来设置
host “0.0.0.0” WebSocket的host,"0.0.0.0"即是所有本地地址
port 80 WebSocket绑定端口号。如果为0,则使用随机端口(端口获取可见 多端点服务)
bossLoopGroupThreads 0 bossEventLoopGroup的线程数
workerLoopGroupThreads 0 workerEventLoopGroup的线程数
useCompressionHandler false 是否添加WebSocketServerCompressionHandler到pipeline
optionConnectTimeoutMillis 30000 与Netty的ChannelOption.CONNECT_TIMEOUT_MILLIS一致
optionSoBacklog 128 与Netty的ChannelOption.SO_BACKLOG一致
childOptionWriteSpinCount 16 与Netty的ChannelOption.WRITE_SPIN_COUNT一致
childOptionWriteBufferHighWaterMark 64*1024 与Netty的ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK一致,但实际上是使用ChannelOption.WRITE_BUFFER_WATER_MARK
childOptionWriteBufferLowWaterMark 32*1024 与Netty的ChannelOption.WRITE_BUFFER_LOW_WATER_MARK一致,但实际上是使用 ChannelOption.WRITE_BUFFER_WATER_MARK
childOptionSoRcvbuf -1(即未设置) 与Netty的ChannelOption.SO_RCVBUF一致
childOptionSoSndbuf -1(即未设置) 与Netty的ChannelOption.SO_SNDBUF一致
childOptionTcpNodelay true 与Netty的ChannelOption.TCP_NODELAY一致
childOptionSoKeepalive false 与Netty的ChannelOption.SO_KEEPALIVE一致
childOptionSoLinger -1 与Netty的ChannelOption.SO_LINGER一致
childOptionAllowHalfClosure false 与Netty的ChannelOption.ALLOW_HALF_CLOSURE一致
readerIdleTimeSeconds 0 IdleStateHandler中的readerIdleTimeSeconds一致,并且当它不为0时,将在pipeline中添加IdleStateHandler
writerIdleTimeSeconds 0 IdleStateHandler中的writerIdleTimeSeconds一致,并且当它不为0时,将在pipeline中添加IdleStateHandler
allIdleTimeSeconds 0 IdleStateHandler中的allIdleTimeSeconds一致,并且当它不为0时,将在pipeline中添加IdleStateHandler
maxFramePayloadLength 65536 最大允许帧载荷长度
useEventExecutorGroup true 是否使用另一个线程池来执行耗时的同步业务逻辑
eventExecutorGroupThreads 16 eventExecutorGroup的线程数
sslKeyPassword “”(即未设置) 与spring-boot的server.ssl.key-password一致
sslKeyStore “”(即未设置) 与spring-boot的server.ssl.key-store一致
sslKeyStorePassword “”(即未设置) 与spring-boot的server.ssl.key-store-password一致
sslKeyStoreType “”(即未设置) 与spring-boot的server.ssl.key-store-type一致
sslTrustStore “”(即未设置) 与spring-boot的server.ssl.trust-store一致
sslTrustStorePassword “”(即未设置) 与spring-boot的server.ssl.trust-store-password一致
sslTrustStoreType “”(即未设置) 与spring-boot的server.ssl.trust-store-type一致
corsOrigins {}(即未设置) 与spring-boot的@CrossOrigin#origins一致
corsAllowCredentials “”(即未设置) 与spring-boot的@CrossOrigin#allowCredentials一致

通过application.properties进行配置

所有参数皆可使用${...}占位符获取application.properties中的配置。如下:

  • 首先在@ServerEndpoint注解的属性中使用${...}占位符
1
2
3
4
@ServerEndpoint(host = "${ws.host}",port = "${ws.port}")
public class MyWebSocket {
...
}
  • 接下来即可在application.properties中配置
1
2
ws.host=0.0.0.0
ws.port=80

自定义Favicon

配置favicon的方式与spring-boot中完全一致。只需将favicon.ico文件放到classpath的根目录下即可。如下:

1
2
3
4
5
6
src/
+- main/
+- java/
| + <source code>
+- resources/
+- favicon.ico

自定义错误页面

配置自定义错误页面的方式与spring-boot中完全一致。你可以添加一个 /public/error 目录,错误页面将会是该目录下的静态页面,错误页面的文件名必须是准确的错误状态或者是一串掩码,如下:

1
2
3
4
5
6
7
8
9
10
src/
+- main/
+- java/
| + <source code>
+- resources/
+- public/
+- error/
| +- 404.html
| +- 5xx.html
+- <other public assets>

多端点服务

  • 快速启动的基础上,在多个需要成为端点的类上使用@ServerEndpoint@Component注解即可
  • 可通过ServerEndpointExporter.getInetSocketAddressSet()获取所有端点的地址
  • 当地址不同时(即host不同或port不同),使用不同的ServerBootstrap实例
  • 当地址相同,路径(path)不同时,使用同一个ServerBootstrap实例
  • 当多个端点服务的port为0时,将使用同一个随机的端口号
  • 当多个端点的port和path相同时,host不能设为"0.0.0.0",因为"0.0.0.0"意味着绑定所有的host