websocket
基于 netty 实现的 websocket,支持校验的demo。实测有效。
Java demo
pom.xml
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.109.Final</version>
</dependency>
WebsocketTest
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
public class WebsocketTest {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup parentG = Epoll.isAvailable() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
EventLoopGroup childrenG = Epoll.isAvailable() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap()
//group
.group(parentG, childrenG)
//channel
.channel(Epoll.isAvailable() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
//address
.localAddress(new InetSocketAddress(8081))
//定义可连接的客户端队列大小
.option(ChannelOption.SO_BACKLOG, 512)
//有数据立即发送;积攒一定数据量再发送用TCP_CORK
.childOption(ChannelOption.TCP_NODELAY, true)
//保持连接,2小时内无数据通信TCP发送探测报文
.childOption(ChannelOption.SO_KEEPALIVE, true)
//允许重复使用本地地址和端口
.childOption(ChannelOption.SO_REUSEADDR, true)
//一个连接的远端关闭时本地端是否关闭,默认值为False。值为False时,
// 连接自动关闭;为True时,触发ChannelInboundHandler的userEventTriggered()方法,
// 事件为ChannelInputShutdownEvent
.childOption(ChannelOption.ALLOW_HALF_CLOSURE, true)
//
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
//添加心跳机制 一分钟内没有回应则触发 userEventTriggered 事件的 IdleStateEvent 类型数据
.addLast(new IdleStateHandler(6 * 10, 0, 0, TimeUnit.SECONDS))
//http 编解码
.addLast(new HttpServerCodec())
//大对象出站
.addLast(new ChunkedWriteHandler())
//将HTTP消息的多个部分合成一条完整的HTTP消息 8192 / 65535 /64 * 1024
.addLast(new HttpObjectAggregator(8192))
// 初始http&鉴权
.addLast(new HttpRequestHandler())
//netty 代理 握手,处理 close ping pong
.addLast(new WebSocketServerProtocolHandler("/ws"))
//处理逻辑
.addLast(new WebsocketTransferHandler());
}
});
ChannelFuture f = bootstrap.bind().sync();
f.channel().closeFuture().sync();
} finally {
parentG.shutdownGracefully();
childrenG.shutdownGracefully();
}
}
}
HttpRequestHandler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.CharsetUtil;
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
@Override
public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
String url = request.uri();
System.out.println("url is " + url);
//ws 格式形如 ws://127.0.0.1:8081?uid=123
if (url.startsWith("/ws")) {
String[] split = url.split("\\?");
if (split.length >= 2) {
String[] params = split[1].split("=");
if (params.length == 2) {
//设置uid 属性
String uid = params[1];
ctx.channel().attr(ToolHelper.uid).set(uid);
// 传递到下一个handler:升级握手
request.setUri("/ws");
ctx.fireChannelRead(request.retain());
return;
}
}
}
//回复信息给浏览器 [http协议]
ByteBuf content = Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8);
//构造一个http的相应,即 httpResponse
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
//将构建好 response返回
ctx.writeAndFlush(response);
//close
ctx.close();
}
}
ToolHelper
import io.netty.channel.Channel;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import java.util.Objects;
public class ToolHelper {
public final static AttributeKey<String> uid = AttributeKey.valueOf("uid");
public final static String UID = "uid";
public static String getUid(Channel channel) {
Attribute<String> attr = channel.attr(uid);
if (Objects.nonNull(attr)) {
return attr.get();
}
return null;
}
}
WebsocketTransferHandler
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
public class WebsocketTransferHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// 删除通道. 移除本地管理的chain
super.handlerRemoved(ctx);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) throws Exception {
//ping 消息直接返回 pong
if ("ping".equals(frame.text())) {
ctx.channel().writeAndFlush(new TextWebSocketFrame("pong"));
return;
}
//todo
String uid = ToolHelper.getUid(ctx.channel());
System.out.println("receive msg : " + frame.text() + " from uid_ " + uid);
//原样返回 todo
ctx.channel().writeAndFlush(new TextWebSocketFrame(frame.text()));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//todo 移除本地管理的chain
ctx.close();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
//如果读空闲 则 关闭
if (event.state() == IdleState.READER_IDLE) {
try {
//todo 移除本地管理的chain
ctx.close();
} catch (Exception e) {
e.printStackTrace();
}
}
} else if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
Channel ch = ctx.channel();
//todo init client info for manager chain
} else {
super.userEventTriggered(ctx, evt);
}
}
}
测试
使用在线websocket页面 https://www.wetools.com/websocket
- 输入 : ws://127.0.0.1:8081/ws?uid=1
- 点击链接
- 发送任意消息观察
nginx 配置
想要使用 域名访问以及使用 wss 访问都可以借助于 nginx实现。
wss 需要需要https。免费的https证书 见
免费启用HTTPS证书的简明教程
以下使用本地nginx测试
http {
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
...
server {
listen 80;
# 本地 hosts 文件 添加 127.0.0.1 ws.test.com
server_name ws.test.com;
location / {
proxy_pass http://127.0.0.1:8081;
proxy_set_header Host $host;
proxy_connect_timeout 5s;
proxy_read_timeout 600s;
proxy_send_timeout 12s;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
}
}
}
index.html
https://www.wetools.com/websocket 不支持本地host域名 。 使用下述页面自行测试
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>websocket-msg</title>
<script src="https://ajax.googleapis.com/ajax/libs/jquery/1.10.2/jquery.min.js"></script>
<script type="text/javascript">
var urlRoad = 'ws://ws.test.com/ws?uid=' ;
var ws = null;
function connect () {
if(ws != null) return;
ws = new WebSocket(urlRoad + $("#uid").val());
ws.onopen = WSonOpen;
ws.onmessage = WSonMessage;
ws.onclose = WSonClose;
ws.onerror = WSonError;
}
function WSonOpen() {
alert("登陆成功,连接已经建立。");
};
function WSonMessage(event) {
if(event.data == 'pong') {
return;
}
$("#board").val($("#board").val()+ "[server] :"+event.data+"\n");
};
function WSonClose() {
ws = null;
alert("连接关闭。");
};
function WSonError() {
alert("WebSocket错误。");
};
function send() {
if(ws == null) {
alert("未登录");
return;
}
var text = $("#tosend").val();
if(text.trim() == '') return;
$("#board").val($("#board").val()+ "[client] :"+text+"\n");
$("#tosend").val('');
ws.send(text)
}
function sendPingMsg() {
if(ws == null) {
return;
}
ws.send("ping");
}
let intervalTimer = null;
function interval() {
clearInterval(intervalTimer);
intervalTimer = setInterval(async () => {
clearInterval(intervalTimer);
await sendPingMsg();
interval();
}, 10000);
}
<!-- 每10秒发送一次心跳 -->
interval();
</script>
</head>
<body>
用户名: <input type="text" id="uid"><input type="submit" onclick="connect()" value="登陆">
<br><textarea onchange="this.scrollTop=this.scrollHeight" style="overflow-x:auto" id="board" rows=20 cols=80></textarea>
<br><input id="tosend" type="text" style="width:400px"></input><input type="submit" onclick="send()" value="发送">
</body>
</html>
文章评论