当前位置 博文首页 > my java博客园:Reactor模型

    my java博客园:Reactor模型

    作者:[db:作者] 时间:2021-08-22 15:14

    Reactor是一种设计模式。基于事件驱动,然后通过事件分发器,将事件分发给对应的处理器进行处理。

    ??? Reactor:监听网络端口,分发网络连接事件给Acceptor,具体的感兴趣读写事件handler
    ??? Acceptor:接受新的连接,连接的读写事件操作交给相应的Handler
    ??? Handler:注册为callback对象,并且注册自己感兴趣的读事件或者写事件等等,然后再相应的方法内进行业务操作内容

    1.单线程版

    参考代码:

    package com.ddcx.utils;
    
    /**
     * @author: xc
     * @ClassName: Test
     * @Date: 2021-03-03 12:47
     * @Description:
     */
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    class Reactor implements Runnable {
        final Selector selector;
        final ServerSocketChannel serverSocket;
    
        Reactor(int port) throws IOException { //Reactor初始化
            selector = Selector.open();
            serverSocket = ServerSocketChannel.open();
            //要监听的网络端口号
            serverSocket.socket().bind(new InetSocketAddress(port));
            //非阻塞
            serverSocket.configureBlocking(false);
            //分步处理,第一步,接收accept事件
            SelectionKey sk =
                    serverSocket.register(selector, SelectionKey.OP_ACCEPT);
            //attach callback object, Acceptor
            sk.attach(new Acceptor());
        }
    
        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    //阻塞到至少有一个通道在你注册的事件上就绪了。 
                    selector.select();
                    Set selected = selector.selectedKeys();
                    Iterator it = selected.iterator();
                    while (it.hasNext()) {
                        //Reactor负责dispatch收到的事件
                        dispatch((SelectionKey) (it.next()));
                    }
                    selected.clear();
                }
            } catch (IOException ex) { /* ... */ }
        }
    
        void dispatch(SelectionKey k) {
            Runnable r = (Runnable) (k.attachment());
            //调用之前注册的callback对象
            if (r != null) {
                //这里是Acceptor的run方法
                r.run();
            }
        }
    
        // inner class
        class Acceptor implements Runnable {
    
            @Override
            public void run() {
                try {
                    //阻塞到获取网络连接通道
                    SocketChannel channel = serverSocket.accept();
                    if (channel != null) {
                        //连接已经就绪,将相应的感兴趣的读写事件注册到回调中
                        new ReadHander(selector, channel);
                    }
                } catch (IOException ex) { /* ... */ }
            }
        }
    
    
        public static void main(String[] args) throws IOException {
            Reactor reactor = new Reactor(9000);
            reactor.run();
        }
    }
    
    
    package com.ddcx.utils;
    
    /**
     * @author: xc
     * @ClassName: ReadHander
     * @Date: 2021-03-03 12:48
     * @Description:
     */
    
    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    
    class ReadHander implements Runnable {
        final SocketChannel channel;
        final SelectionKey sk;
        ByteBuffer input = ByteBuffer.allocate(90);
        ByteBuffer output = ByteBuffer.allocate(400);
        static final int READING = 0, SENDING = 1;
        int state = READING;
    
        ReadHander(Selector selector, SocketChannel c) throws IOException {
            channel = c;
            c.configureBlocking(false);
            // Optionally try first read now
            sk = channel.register(selector, 0);
    
            //将Handler作为callback对象
            sk.attach(this);
    
            //第二步,注册Read就绪事件
            sk.interestOps(SelectionKey.OP_READ);
            selector.wakeup();
        }
    
        boolean inputIsComplete() {
            /* ... */
            return false;
        }
    
        boolean outputIsComplete() {
    
            /* ... */
            return false;
        }
    
        void process() {
            /* ... */
            return;
        }
    
        @Override
        public void run() {
            try {
                if (state == READING) {
                    read();
                } else if (state == SENDING) {
                    send();
                }
            } catch (IOException ex) { /* ... */ }
        }
    
        void read() throws IOException {
            channel.read(input);
            if (inputIsComplete()) {
                process();
                state = SENDING;
                // Normally also do first write now
                //第三步,接收write就绪事件
                sk.interestOps(SelectionKey.OP_WRITE);
            }
        }
    
        void send() throws IOException {
            channel.write(output);
    
            //write完就结束了, 关闭select key
            if (outputIsComplete()) {
                sk.cancel();
            }
        }
    }
    

    ①.通过Selector的select()方法可以选择已经准备就绪的通道

    ②.通过ServerSocketChannel.accept()方法监听新进来的连接。当accept()方法返回的时候,它返回一个包含新进来的连接的SocketChannel。因此, accept()方法会一直阻塞到有新连接到达。通常不会仅仅只监听一个连接

    单线程版 Reactor模型,其实就是做了一件事情,就是把要监听的socket端口注册到selector中去,并且轮询线程内可以获取到多个已经准备就绪的socket连接通道,同时进行处理这些事件

    ?

    2. 多线程Reactor模型

    多线程主要体现在handler处理的时候,因为处理的事件可能耗时相对于久一些,这样做可以更快的处理感兴趣的事件

    selectionKey.attach(new HandlerThreadPool(socketChannel));

    3.主从模式多线程

    1. mainReactor负责监听socket连接,用来处理新连接的建立和就绪,将建立的socketChannel指定注册给subReactor。网络连接的建立一般很快,所以这里一个主线程就够了

    2.subReactor 一般是cpu的核心数,将连接加入到连接队列进行监听,并创建handler进行各种事件处理;当有新事件发生时, subreactor 就会调用对应的handler处理,而对具体的读写事件业务处理的功能交给handler线程池来完成。

    ?

    cs
    下一篇:没有了