netty创建tcp服务和websocket服务

一、引入netty依赖

 <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.25.Final</version>
        </dependency>

二、创建Tcp服务

使用线程创建服务类ReadCodeServer.java

package com.kaicom.mes.netty.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;


/**
 * @Author: BillYu
 * @Description:websocket服务端
 * @Date: Created in 下午2:04 2018/6/1.
 */
public class ReadCodeServer implements Runnable{

    private final Logger log = LoggerFactory.getLogger(ReadCodeServer.class);




    @Override
    public void run() {
        // 服务端启动辅助类,用于设置TCP相关参数
        ServerBootstrap bootstrap = new ServerBootstrap();
        // 获取Reactor线程池
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        // 设置为主从线程模型
        bootstrap.group(bossGroup, workGroup)
                // 设置服务端NIO通信类型
                .channel(NioServerSocketChannel.class)
                // 设置ChannelPipeline,也就是业务职责链,由处理的Handler串联而成,由从线程池处理
                .childHandler(new ChannelInitializer<Channel>() {
                    // 添加处理的Handler,通常包括消息编解码、业务处理,也可以是日志、权限、过滤等
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        // 获取职责链
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast("handler", new ReadCodeTestHandler());
                    }
                })

                // bootstrap 还可以设置TCP参数,根据需要可以分别设置主线程池和从线程池参数,来优化服务端性能。
                // 其中主线程池使用option方法来设置,从线程池使用childOption方法设置。
                // backlog表示主线程池中在套接口排队的最大数量,队列由未连接队列(三次握手未完成的)和已连接队列
                .option(ChannelOption.SO_BACKLOG, 1024)
                // 表示连接保活,相当于心跳机制,默认为7200s
                .childOption(ChannelOption.SO_KEEPALIVE, true);

        try {
            // 绑定端口,启动select线程,轮询监听channel事件,监听到事件之后就会交给从线程池处理
            Channel channel = bootstrap.bind(8081).sync().channel();

            // 等待服务端口关闭
            channel.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 优雅退出,释放线程池资源
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
        System.out.println("over read");
        return;
    }
}

消息处理类:ReadCodeTestHandler.java

    package com.kaicom.mes.netty.netty;
    
    import com.kaicom.mes.netty.kafka.KafkaUtil;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.*;
    import io.netty.handler.codec.http.*;
    import io.netty.handler.codec.http.websocketx.*;
    import io.netty.util.CharsetUtil;
    import io.netty.util.ReferenceCountUtil;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Component;
    
    import java.net.InetSocketAddress;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    import static io.netty.buffer.Unpooled.copiedBuffer;
    
    
    /**
     * @Author: BillYu
     * @Description: 读码头tcp服务处理
     * @Date: Created in 下午2:05 2018/6/1.
     */
    public class ReadCodeTestHandler extends ChannelInboundHandlerAdapter {
        private final Logger log = LoggerFactory.getLogger(ReadCodeTestHandler.class);
    
    
        /**
         * channelAction channel 通道 action 活跃的
         * 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
    //        ctx.channel().read();
            System.out.println(ctx.channel().localAddress().toString() + " 通道已激活!");
        }
    
        /**
         * channelInactive channel 通道 Inactive 不活跃的
         * 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
         */
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.out.println(ctx.channel().localAddress().toString() + " 通道不活跃!并且关闭。");
            // 关闭流
            ctx.close();
        }
    
        /**
         * 功能:读取服务器发送过来的信息
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                ByteBuf buf = (ByteBuf) msg;
                byte[] buffer = new byte[buf.readableBytes()];
                buf.readBytes(buffer, 0, buffer.length);
                String rev = new String(buffer);
                System.out.println("客户端收到服务器数据:" + rev);
                String address = ctx.channel().remoteAddress().toString();
                System.out.println(address);
                // 匹配规则
                String reg = "/(.*?):";
                Pattern pattern = Pattern.compile(reg);
                // 内容 与 匹配规则 的测试
                Matcher matcher = pattern.matcher(address);
                String ip = "0.0.0.0";
                if (matcher.find()) {
                    // 不包含前后的两个字符
                    ip = matcher.group(1);
                } else {
                    log.error("无法解析IP地址" + address + "  码:" + rev);
                }
                log.info("ip:" + ip + "   code:" + rev);
                rev = ip + "|" + System.currentTimeMillis() + "|" + rev;
                log.info(rev);
                KafkaUtil.send("code_info", rev);
                log.info("发送到kafka");
                ctx.writeAndFlush(rev);
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }
    
        /**
         * 功能:读取完毕客户端发送过来的数据之后的操作
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            System.out.println("服务端接收数据完毕..");
            // 第一种方法:写一个空的buf,并刷新写出区域。完成后关闭sock channel连接。
            // ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
            // ctx.flush();
            // ctx.flush(); //
            // 第二种方法:在client端关闭channel连接,这样的话,会触发两次channelReadComplete方法。
            // ctx.flush().close().sync(); // 第三种:改成这种写法也可以,但是这中写法,没有第一种方法的好。
        }
    
        /**
         * 功能:服务端发生异常的操作
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
            System.out.println("异常信息:\r\n");
            cause.printStackTrace();
    
        }
    
    
    }

三、创建websocket服务

使用线程创建websocket服务:WebsocketServer.java

package com.kaicom.mes.netty.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;


/**
 * @Author: BillYu
 * @Description:websocket服务端
 * @Date: Created in 下午2:04 2018/6/1.
 */
public class WebsocketServer implements Runnable{


    private final Logger log = LoggerFactory.getLogger(WebsocketServer.class);

    @Override
    public void run() {
        // 服务端启动辅助类,用于设置TCP相关参数
        ServerBootstrap bootstrap = new ServerBootstrap();
        // 获取Reactor线程池
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        // 设置为主从线程模型
        bootstrap.group(bossGroup, workGroup)
                // 设置服务端NIO通信类型
                .channel(NioServerSocketChannel.class)
                // 设置ChannelPipeline,也就是业务职责链,由处理的Handler串联而成,由从线程池处理
                .childHandler(new ChannelInitializer<Channel>() {
                    // 添加处理的Handler,通常包括消息编解码、业务处理,也可以是日志、权限、过滤等
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        // 获取职责链
                        ChannelPipeline pipeline = ch.pipeline();
                        //
                        pipeline.addLast("http-codec", new HttpServerCodec());
                        pipeline.addLast("aggregator", new HttpObjectAggregator(65535));
                        pipeline.addLast("http-chunked", new ChunkedWriteHandler());
                        pipeline.addLast("handler", new WebsocketHandler());
                    }
                })


                // bootstrap 还可以设置TCP参数,根据需要可以分别设置主线程池和从线程池参数,来优化服务端性能。
                // 其中主线程池使用option方法来设置,从线程池使用childOption方法设置。
                // backlog表示主线程池中在套接口排队的最大数量,队列由未连接队列(三次握手未完成的)和已连接队列
                .option(ChannelOption.SO_BACKLOG, 1024)
                // 表示连接保活,相当于心跳机制,默认为7200s
                .childOption(ChannelOption.SO_KEEPALIVE, true);

        try {
            // 绑定端口,启动select线程,轮询监听channel事件,监听到事件之后就会交给从线程池处理
            Channel channel = bootstrap.bind(8080).sync().channel();

            // 等待服务端口关闭
            channel.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 优雅退出,释放线程池资源
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }

}

消息处理类:WebsocketHandler.java

package com.kaicom.mes.netty.netty;

import com.kaicom.mes.netty.kafka.KafkaUtil;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import static io.netty.buffer.Unpooled.copiedBuffer;


/**
 * @Author: BillYu
 * @Description:
 * @Date: Created in 下午2:05 2018/6/1.
 */
@Component
public class WebsocketHandler extends ChannelInboundHandlerAdapter {
    private final Logger log = LoggerFactory.getLogger(WebsocketHandler.class);


    /**
     * 用于websocket握手的处理类
     */
    private WebSocketServerHandshaker handshaker;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        // ReferenceCountUtil.release(msg);在读数据时候,在结束时候,需要释放,将byteBuf回收到直接内存池中。
        try {
            if (msg instanceof FullHttpRequest) {
                System.out.println("===> full http request");
                // websocket连接请求
                handleHttpRequest(ctx, (FullHttpRequest) msg);
            } else if (msg instanceof WebSocketFrame) {
//            System.out.println("===> web socket frame");
                // websocket业务处理
                handleWebSocketRequest(ctx, (WebSocketFrame) msg);
            }
        } finally {

            ReferenceCountUtil.release(msg);
        }


//        System.out.println("==>over");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        ctx.close();
        System.out.println("===>exception:");
        System.out.println(cause);
    }

    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
        // Http解码失败,向服务器指定传输的协议为Upgrade:websocket
        if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            return;
        }
        // 握手相应处理,创建websocket握手的工厂类,
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://localhost:8081/ws", null, false);
        // 根据工厂类和HTTP请求创建握手类
        handshaker = wsFactory.newHandshaker(req);
        if (handshaker == null) {
            // 不支持websocket
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        } else {
            // 通过它构造握手响应消息返回给客户端
            handshaker.handshake(ctx.channel(), req);
        }
    }

    private void handleWebSocketRequest(ChannelHandlerContext ctx, WebSocketFrame req) {
        if (req instanceof CloseWebSocketFrame) {
            // 关闭websocket连接
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) req.retain());
            return;
        }

        if (req instanceof PingWebSocketFrame) {
            System.out.println("====> ping web socket frame");
            ctx.channel().write(new PongWebSocketFrame(req.content().retain()));
            return;
        }
        if (!(req instanceof TextWebSocketFrame)) {
            throw new UnsupportedOperationException("当前只支持文本消息,不支持二进制消息");
        }
        if (ctx == null || this.handshaker == null || ctx.isRemoved()) {
            System.out.println("null");
//            throw new Exception("尚未握手成功,无法向客户端发送WebSocket消息");
        }

        String request = ((TextWebSocketFrame) req).text();

        log.info("收到socket msg=" + request);

        KafkaUtil.send("equipment_info", request);

        log.info("发送到kafka");

        //消息回复
        ctx.channel().writeAndFlush(new TextWebSocketFrame(((TextWebSocketFrame) req).text()));

    }


    private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
        // BAD_REQUEST(400) 客户端请求错误返回的应答消息
        if (res.status().code() != 200) {
            // 将返回的状态码放入缓存中,Unpooled没有使用缓存池
            ByteBuf buf = copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
            HttpUtil.setContentLength(res, res.content().readableBytes());
        }
        // 发送应答消息
        ChannelFuture cf = ctx.channel().writeAndFlush(res);
        // 非法连接直接关闭连接
        if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) {
            cf.addListener(ChannelFutureListener.CLOSE);
        }
    }

}

四、启动服务线程

@Component
public class StartApplicationRunner implements ApplicationRunner {
    private Logger logger = LoggerFactory.getLogger(StartApplicationRunner.class);
    @Override
    public void run(ApplicationArguments args) throws Exception {
        logger.info("start ReadCodeServer");
        new Thread(new ReadCodeServer()).start();
        logger.info("start WebsocketServer");
        new Thread(new WebsocketServer()).start();


    }
}

项目目录结构:
DC3A0BE8-22E9-41AA-8723-F8740541C7D1.png

五、备注

group :设置SeverBootstrap要用到的EventLoopGroup,也就是定义netty服务的线程模型,处理Acceptor链接的主"线程池"以及用于I/O工作的从"线程池";

channel:设置将要被实例化的SeverChannel类;

option :指定要应用到新创建SeverChannel的ChannelConfig的ChannelOption.其实也就是服务本身的一些配置;

chidOption:子channel的ChannelConfig的ChannelOption。也就是与客户端建立的连接的一些配置;

childHandler:设置将被添加到已被接收的子Channel的ChannelPipeline中的ChannelHandler,其实就是让你在里面定义处理连接收发数据,需要哪些ChannelHandler按什么顺序去处理;

ServerChannelInitializer这个类继承实现自netty的ChannelInitializer抽象类,这个类的作用就是对channel(连接)的ChannelPipeline进行初始化工作,说白了就是你要把处理数据的方法添加到这个任务链中去,netty才知道每一步拿着socket连接和数据去做什么。

之前遇到handle里面channelRead()方法不执行,发现原因是在任务链中加了其他的handle类

发表新评论