基于 Netty 实现 websocket 消息发送笔记

2024-05-06 214点热度 0人点赞 0条评论

websocket

基于 netty 实现的 websocket,支持校验的demo。实测有效。

Java demo

pom.xml

 <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.109.Final</version>
</dependency>

WebsocketTest


import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;

import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;

public class WebsocketTest {

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup parentG = Epoll.isAvailable() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
        EventLoopGroup childrenG = Epoll.isAvailable() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap()
                    //group
                    .group(parentG, childrenG)
                    //channel
                    .channel(Epoll.isAvailable() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                    //address
                    .localAddress(new InetSocketAddress(8081))
                    //定义可连接的客户端队列大小
                    .option(ChannelOption.SO_BACKLOG, 512)
                    //有数据立即发送;积攒一定数据量再发送用TCP_CORK
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    //保持连接,2小时内无数据通信TCP发送探测报文
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    //允许重复使用本地地址和端口
                    .childOption(ChannelOption.SO_REUSEADDR, true)
                    //一个连接的远端关闭时本地端是否关闭,默认值为False。值为False时,
                    // 连接自动关闭;为True时,触发ChannelInboundHandler的userEventTriggered()方法,
                    // 事件为ChannelInputShutdownEvent
                    .childOption(ChannelOption.ALLOW_HALF_CLOSURE, true)

                    //
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                    //添加心跳机制 一分钟内没有回应则触发 userEventTriggered 事件的 IdleStateEvent 类型数据
                                    .addLast(new IdleStateHandler(6 * 10, 0, 0, TimeUnit.SECONDS))
                                    //http 编解码
                                    .addLast(new HttpServerCodec())
                                    //大对象出站
                                    .addLast(new ChunkedWriteHandler())
                                    //将HTTP消息的多个部分合成一条完整的HTTP消息 8192 / 65535 /64 * 1024
                                    .addLast(new HttpObjectAggregator(8192))

                                    // 初始http&鉴权
                                    .addLast(new HttpRequestHandler())
                                    //netty 代理 握手,处理 close ping pong
                                    .addLast(new WebSocketServerProtocolHandler("/ws"))
                                    //处理逻辑
                                    .addLast(new WebsocketTransferHandler());
                        }
                    });

            ChannelFuture f = bootstrap.bind().sync();
            f.channel().closeFuture().sync();
        } finally {
            parentG.shutdownGracefully();
            childrenG.shutdownGracefully();
        }
    }
}

HttpRequestHandler


import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.CharsetUtil;

public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

    @Override
    public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        String url = request.uri();
        System.out.println("url is " + url);
        //ws 格式形如 ws://127.0.0.1:8081?uid=123
        if (url.startsWith("/ws")) {
            String[] split = url.split("\\?");
            if (split.length >= 2) {
                String[] params = split[1].split("=");
                if (params.length == 2) {
                    //设置uid 属性
                    String uid = params[1];
                    ctx.channel().attr(ToolHelper.uid).set(uid);
                    // 传递到下一个handler:升级握手
                    request.setUri("/ws");
                    ctx.fireChannelRead(request.retain());
                    return;
                }
            }
        }

        //回复信息给浏览器 [http协议]
        ByteBuf content = Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8);
        //构造一个http的相应,即 httpResponse
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
        response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
        //将构建好 response返回
        ctx.writeAndFlush(response);
        //close
        ctx.close();
    }
}

ToolHelper


import io.netty.channel.Channel;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;

import java.util.Objects;

public class ToolHelper {

    public final static AttributeKey<String> uid = AttributeKey.valueOf("uid");

    public final static String UID = "uid";

    public static String getUid(Channel channel) {
        Attribute<String> attr = channel.attr(uid);
        if (Objects.nonNull(attr)) {
            return attr.get();
        }
        return null;
    }
}

WebsocketTransferHandler


import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;

public class WebsocketTransferHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        // 删除通道. 移除本地管理的chain
        super.handlerRemoved(ctx);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) throws Exception {
        //ping 消息直接返回 pong
        if ("ping".equals(frame.text())) {
            ctx.channel().writeAndFlush(new TextWebSocketFrame("pong"));
            return;
        }

        //todo
        String uid = ToolHelper.getUid(ctx.channel());
        System.out.println("receive msg : " + frame.text() + " from uid_ " + uid);

        //原样返回 todo
        ctx.channel().writeAndFlush(new TextWebSocketFrame(frame.text()));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //todo 移除本地管理的chain
        ctx.close();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            //如果读空闲 则 关闭
            if (event.state() == IdleState.READER_IDLE) {
                try {
                    //todo 移除本地管理的chain
                    ctx.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

        } else if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
            Channel ch = ctx.channel();
            //todo init client info for manager chain
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

}

测试

使用在线websocket页面 https://www.wetools.com/websocket

  • 输入 : ws://127.0.0.1:8081/ws?uid=1
  • 点击链接
  • 发送任意消息观察

nginx 配置

想要使用 域名访问以及使用 wss 访问都可以借助于 nginx实现。
wss 需要需要https。免费的https证书 见
免费启用HTTPS证书的简明教程

以下使用本地nginx测试

http {
    map $http_upgrade $connection_upgrade {
      default upgrade;
      '' close;
    }
...

server {
    listen 80;
    # 本地 hosts 文件 添加 127.0.0.1 ws.test.com
    server_name ws.test.com;

    location / {
        proxy_pass http://127.0.0.1:8081;
        proxy_set_header  Host $host;
        proxy_connect_timeout 5s;
        proxy_read_timeout 600s;
        proxy_send_timeout 12s;

        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection $connection_upgrade;
    }
}

}

index.html

https://www.wetools.com/websocket 不支持本地host域名 。 使用下述页面自行测试

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>websocket-msg</title>
    <script src="https://ajax.googleapis.com/ajax/libs/jquery/1.10.2/jquery.min.js"></script>

    <script type="text/javascript">

        var urlRoad = 'ws://ws.test.com/ws?uid=' ;

        var ws = null;

        function connect () {

            if(ws != null) return;

            ws = new WebSocket(urlRoad + $("#uid").val());

            ws.onopen = WSonOpen;
            ws.onmessage = WSonMessage;
            ws.onclose = WSonClose;
            ws.onerror = WSonError;

        }

        function WSonOpen() {
           alert("登陆成功,连接已经建立。");
       };

       function WSonMessage(event) {
           if(event.data == 'pong') {
               return;
           }
           $("#board").val($("#board").val()+ "[server] :"+event.data+"\n");
       };

       function WSonClose() {
            ws = null;
           alert("连接关闭。");

       };

       function WSonError() {
           alert("WebSocket错误。");
       };

       function send() {
            if(ws == null) {
                alert("未登录");
                return;
            }

            var text = $("#tosend").val();
            if(text.trim() == '') return;

            $("#board").val($("#board").val()+ "[client] :"+text+"\n");
            $("#tosend").val('');
            ws.send(text)
       }

    function sendPingMsg() {
         if(ws == null) {
                return;
         }
         ws.send("ping");
    }

    let intervalTimer = null;
    function  interval() {
        clearInterval(intervalTimer);
        intervalTimer = setInterval(async () => {
            clearInterval(intervalTimer);

            await sendPingMsg();

            interval();
        }, 10000);
    }
<!--   每10秒发送一次心跳 -->
     interval();
    </script>
</head>
<body>
用户名: <input type="text" id="uid"><input type="submit" onclick="connect()" value="登陆">
<br><textarea onchange="this.scrollTop=this.scrollHeight" style="overflow-x:auto" id="board" rows=20 cols=80></textarea>
<br><input id="tosend" type="text" style="width:400px"></input><input type="submit" onclick="send()" value="发送">

</body>
</html>

mylomen

本人从事 JAVA 开发10多年,将之前整理的笔记分享出来,希望能够帮助到努力的你。

文章评论