当前位置 博文首页 > 油多坏不了菜:Netty入门一:服务端应用搭建 & 启动过程源码

    油多坏不了菜:Netty入门一:服务端应用搭建 & 启动过程源码

    作者:油多坏不了菜 时间:2021-01-16 22:25

    最近周末也没啥事就学学Netty,同时打算写一些博客记录一下(写的过程理解更加深刻了)

    本文主要从三个方法来呈现:Netty核心组件简介、Netty服务端创建、Netty启动过程源码分析

    如果你对Netty有一定的了解, 那阅读起来应该会比较愉快

    Netty核心组件简介

    ByteBuf

    缓冲区ByteBuf是对JDK NIO类库中ByteBuffer的增强

    缓冲区直接连接通道两端( 通过通道发送数据时需要先转换为ByteBuf对象, 从通道直接获取的也是ByteBuf对象)

    Channel和Unsafe

    Channel聚合一组网络I/O操作 --- 读、写、客户端发起连接、关闭连接、链路关闭等

    UnSafe接口辅助Channel实现I/O操作(不应有用户代码直接调用)

    ChannelPipeline和ChannelHandler

    ChannelHandler:负责处理I/O事件,每个ChannelHanlder对需要关注的I/O事件实现自己的处理逻辑,一般职责较单一,如解码Handler只做解码操作。

    ChannelPipeline:一个ChannelPipeline由多个按一定顺序排列的ChannelHandler组成, I/O事件在pipeline中流动(入站事件从头到尾、出站事件从尾到头),每个handler会对事件进行处理。

    NioEventLoop和NioEventLoopGroup

    NioEventLoop: 事件循环(Reactor线程),负责监听多个通道的就绪状态,当通道就绪时产生相应的入站事件

    NioEventLoopGroup:事件循环池(Reactor线程池),当新的通道被创建时,NioEventLoopGroup会为其分配一个事件循环,后续该通道的所有I/O操作都在该事件循环进行。

    Future和Promise

    这两个类是Netty对异步的支持,Promise用于设置异步操作结果(写),Future用于获取异步操作结果(读)。

    Netty服务端创建

    我们从搭建一个简单的服务端程序开始

    下面是一个获取当前日期和时间的服务端程序:当客户端输入行为"today"时返回当天日期 "2020-12-11",输入行为"time"时返回当前时间 "03:11:11"。

       public static void main(String[] args) {
    				//1.线程池配置
            ServerBootstrap bootstrap = new ServerBootstrap();
            NioEventLoopGroup parentGroup = new NioEventLoopGroup(1);
            NioEventLoopGroup childGroup = new NioEventLoopGroup(4);
            bootstrap.group(parentGroup, childGroup);
            //2.服务端Channel配置
            bootstrap.channel(NioServerSocketChannel.class);
            bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
           
            //3.子Channel配置
            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    
                @Override
                protected void initChannel(SocketChannel channel) throws Exception {
                    //解码器
                    channel.pipeline().addLast(new LineBasedFrameDecoder(1024));
                    channel.pipeline().addLast(new StringDecoder(StandardCharsets.UTF_8));
                    //业务handler
                    channel.pipeline().addLast(new BusinessHandler());
                    //编码器
                    channel.pipeline().addLast(new StringEncoder(StandardCharsets.UTF_8));
                }
            });
    
             try {
                ChannelFuture future = bootstrap.bind(7001).syncUninterruptibly();
                future.channel().closeFuture().sync();
            } catch (Exception e) {
               	//todo
            }finally {
                parentGroup.shutdownGracefully();
                childGroup.shutdownGracefully();
            }
        }
    
    
        static class BusinessHandler extends ChannelInboundHandlerAdapter {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                String s = (String) msg;
                String ret = "";
                if ("today".equals(s)) {
                    ret = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
                } else if ("time".equals(s)) {
                    ret = new SimpleDateFormat("hh:mm:ss").format(new Date());
                }
                ret += "\r\n";
                ctx.channel().writeAndFlush(ret);
            }
        }
    

    整个应用搭建的过程很简单,归纳起来有四步

    1.Reactor线程池配置

    2.服务端Channel配置

    3.子Channel配置(通过服务端通道创建的子通道)

    4.绑定本地端口并启动服务

    Reactor线程池配置

    我们新建两个Reactor线程池parentGroup和childGroup

    parentGroup是服务端通道使用,用于接受新的客户端连接(accept)

    childGroup用于处理所有服务端通道创建子通道的网络I/O请求

    ServerBootstrap bootstrap = new ServerBootstrap(); 
    NioEventLoopGroup parentGroup = new NioEventLoopGroup(1);
    NioEventLoopGroup childGroup = new NioEventLoopGroup(4);
    bootstrap.group(parentGroup, childGroup);
    

    服务端Channel配置

    服务端Channel配置主要涉及:Channel类型、ChanelOption、AttributeKey(handler一般不用配置)

    1. Channel类型配置

      Channel的类型我们选用 NioServerSocketChannel -- 底层使用的是JDK NIO的 ServerSocketChannel.

    bootstrap.channel(NioServerSocketChannel.class);
    
    1. 设置ChannelOption 和 AttributeKey

    ChildOption:TCP选项, 如接受缓冲区大小(SO_RCVBUF)、发送缓冲区大小(SO_SNDBUF)、内核TCP连接队列大小(SO_BACKLOG)等

    AttributeKey:附在Channel上的对象, 可以在多个ChannelHandler之间进行数据共享

    bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
    bootstrap.attr(AttributeKey.newInstance("TEST"), new Object());
    

    备注:ChannelHandler 用于处理I/O事件,是通道所必须的。因为Netty提供了初始化客户端连接的handler(ServerBootstrapAcceptor),所以对于服务端Channel我们可以不用设置

    Channel配置

    Channel配置主要涉及:ChanelOption和AttributeKey、ChannelHandler

    1. 设置ChannelOption 和 AttributeKey

      针对每个Channel可以配置ChannelOption和AttributeKey,同服务端通道配置一样。

    2. ChannelHandler配置

      对于服务端Channel,Netty框架提供了用于接受连接的Handler,我们可以不用设置;但是对于服务端Channel创建的每个子Channel我们需要为其配置Handler,以处理I/O事件。

      首先:解码器是必须的。我们业务逻辑中流转的一般是对象,通过配置解码器将字节转换成Java对象(解码器同时需要处理TCP拆包、粘包)

      然后:自定义业务处理器用于处理具体的业务逻辑,如上面的BusinessHandler。

      最后:需要对结果进行返回时需要配置编码器,用于将输出对象编码成可用于通道传输的ByteBuf对象

      对于这个例子:

      LineBasedFrameDecoder和StringDecoder是解码器:将一行数据解码成Java中的String对象

      BusinessHandler是业务处理器:处理具体的业务逻辑(获取当前日期或者时间)

      StringEncoder是编码器:将String对象编码成ByteBuf对象,用于通道传输。

    绑定本地端口并启动服务

    配置就绪后直接绑定本地端口启动服务

    ChannelFuture future = bootstrap.bind(7001).syncUninterruptibly();
    

    到这里通过Netty创建一个服务端应用程序就完成了,下面我们从源码成面看看Netty的启动过程

    Netty服务端启动过程源码分析

    源码基于4.1分支:做了部分简化,只保留了核心逻辑

    从bind方法开始

     public ChannelFuture bind(int inetPort) {
            return bind(new InetSocketAddress(inetPort));
     }
    
     public ChannelFuture bind(SocketAddress localAddress) {
            this.validate();
            return this.doBind(localAddress);
     }
     
     private ChannelFuture doBind(final SocketAddress localAddress) {
       			//1. 初始化NioServerSocketChannel,并且注册到EventLoopGroup
            final ChannelFuture regFuture = initAndRegister();
            final Channel channel = regFuture.channel();
            //2.1. 注册失败,直接返回
            if (regFuture.cause() != null) {
                return regFuture;
            }
            if (regFuture.isDone()) {
                // At this point we know that the registration was complete and successful.
                ChannelPromise promise = channel.newPromise();
              	//2.2. 注册成功,直接bind本地端口
                doBind0(regFuture, channel, localAddress, promise);
                return promise;
            } else {
                // Registration future is almost always fulfilled already, but just in case it's not.
                final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
              //3.如果注册还未知(因为是异步操作),添加listener到regFuture对象上用于注册完成后进行回调处理
              regFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        Throwable cause = future.cause();
                        if (cause != null) {
                            promise.setFailure(cause);
                        } else {
                            promise.registered();
                            doBind0(regFuture, channel, localAddress, promise);
                        }
                    }
                });
                return promise;
            }
        }
    

    整个bind方法比较简单, 核心逻辑都在doBind方法里面,doBind里的逻辑主要有三步

    1. initAndRegister:实例化ServerSocketChannel(这里是NioServerSocketChannel)并注册到事件循环(EventLoopGroup)

    2. 如果第一步失败,直接返回;如果注册成功,调用doBind方法绑定本地端口启动服务器

    3. 如果注册结果还未知(reg是异步操作),添加ChannelFutureListener到regFuture对象上用于注册完成后的回调处理

    第二和第三个步都比较简单,我们主要需要看下第一步--initAndRegister

    initAndRegister(初始化NioServerSocketChannel并注册到EventLoopGroup)

    initAndRegister其实是个模版方法,也可以分成三步来分析

    1. 实例化,这里其实是通过基于反射的工厂方法实例化
    2. 初始化(由子类实现)
    3. 注册到EventLoopGroup
    final ChannelFuture initAndRegister() {
            Channel channel = null;
            try {
                //1. 实例化,基于反射的工厂方法
                channel = channelFactory.newChannel();
                init(channel);
            } catch (Throwable t) {
                //
            }
            ChannelFuture regFuture = config().group().register(channel);
            if (regFuture.cause() != null) {
                //
            }
            return regFuture;
        }
    

    第一步和第三步这里我们不做展开,主要看下第二步init做了什么事

    init(初始化通道)

    下面是ServerBootstrap中的init方法的源码

    void init(Channel channel) {
            setChannelOptions(channel, newOptionsArray(), logger);
            setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
    
            ChannelPipeline p = channel.pipeline();
    
            final EventLoopGroup currentChildGroup = childGroup;
            final ChannelHandler currentChildHandler = childHandler;
            final Entry<ChannelOption<?>, Object>[] currentChildOptions;
            synchronized (childOptions) {
                currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
            }
            final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
    
            p.addLast(new ChannelInitializer<Channel>() {
                @Override
                public void initChannel(final Channel ch) {
                    final ChannelPipeline pipeline = ch.pipeline();
                    ChannelHandler handler = config.handler();
                    if (handler != null) {
                        pipeline.addLast(handler);
                    }
                    ch.eventLoop().execute(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.addLast(new ServerBootstrapAcceptor(
                                    ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                        }
                    });
                }
            });
        }
    

    归纳起来其实是把我们通过ServerBootstarp引导类配置的一些参填充到NioServerSocketChannel实例中去了,没有问题。

    需要注意这里在socketChannel的pipeline中添加了一个ServerBootstrapAcceptor类型的handler(ServerBootstrapAcceptor用于初始化服务端接受的子通道,感兴趣的可以自己展开)

    总结

    通过对bind、doBind、initAndRegister、init的几个方法的分析,我们可以Netty的整个启动过程有个大致的认识

    1.实例化并初始化NioServerSocketChannel

    2.把初始化后的nioServerSocketChannel注册到EventLoopGroup(parentEventLoopGroup)

    3.注册成功之后调用绑定本地端口完成整个启动过程

    当然,只有对pipeline、handler、eventLoop等有一定的了解才能理解Netty的工作机制

    写在最后

    TO ME: 2021年第一篇博客,加油! 自己一个字一个字码出来的感觉很好!!

    TO YOU: 如果觉得有帮助记得点赞或者推荐哦!

    下一篇:没有了