netty无疑在Java网络编程生态处于一个统治级的地位,他的生态位与spring之于Java服务端编程是一样的。

FD (FileDescriptor)

FD,File Descriptor,文件描述符,是对数据资源的抽象。它是一个唯一的数字标识,用来标识一个打开的文件,它用来描述一个数据资源,同时描述该资源的访问方式。在Windows操作系统下对应着FileHandle,文件句柄。一个打开的文件至少有一个文件描述符。
值得注意的的是,文件描述符的唯一标识不是指针地址,而是在操作系统Global File Table上的唯一标识。为什么呢?因为指针地址无法存储操作权限,访问方式等等额外信息。在类Unix系统下,FileDescriptor几乎可以描述一切资源,不管是磁盘文件还是Socket还是进程。
Java的FileDescriptor沿用了类Unix系统的文件描述符的概念。,FileDescriptor文件描述符类的实例作为一个不透明的句柄,指向代表一个open file、一个open socket或其他byte的source或sink(术语source表示一个数据输入源,术语sink表示一个数据输出槽)的底层机器特定结构
总结起来就是,文件描述符是一个唯一数字(非负整型)表标识,指向操作系统的一个数据结构,该数据结构存有实际地址,文件访问方式等等信息。关联有一个Too many open files的错误。

解析
FileDescriptor

public final class FileDescriptor {
    // 文件描述符的唯一标识,在windows下面是-1,windows使用文件句柄操作文件而不是fd
    private int fd;
    // 文件句柄,windows下的普通文件有该属性,其他情况为-1
    private long handle;
    // 
    private Closeable parent;
    private List<Closeable> otherParents;
    // 是否已关闭
    private boolean closed;
    /**
     * true, if file is opened for appending.
     * 如果以追加模式打开则为true
     */
    private boolean append;
    // 虚引用在会在对象被垃圾回收之后触发回调通知,用来做清理,在这里是做关闭资源操作
    private PhantomCleanable<FileDescriptor> cleanup;
}

注:虚引用做清理工作在DirectByteBuffer类里面也有用到,用来回收直接内存。

我们的FileInputStream、FileOutputStream、SocketInputStream(继承自FileInputStream)、SocketOutputStream(继承自FileOutputStream),ServerSocketChannel,SocketChannel都绑定有一个FD文件描述符,读写操作都是基于文件描述符的。

文件描述符:0是标准输入、1是标准输出、2是标准异常,这是操作系统启动初始化的时候创建的。这三个FD可以直接取用,不需要用户创建。

IO

我在面试理论那篇文章有讲到,IO包括网络IO与磁盘IO。这里主要讲Java网络IO。

BIO、NIO、AIO

总的来说 Java IO有三类,BIO(Blocking I/O,阻塞IO)、NIO(Non-Blocking I/O,非阻塞IO)、AIO(Asynchronous I/O,异步非阻塞IO,也叫做NIO2)。

BIO

Java在最初只支持BIO,最初的Tomcat也是基于BIO实现的,所以早古时代的Tomcat的吞吐其实并不高。下面实现通过BIO实现一个Echo程序。

  • server
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class BioServer {

    static class BioThreadFactory implements ThreadFactory {
        final AtomicInteger NUM = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r,"socket-executor-" + NUM.getAndIncrement());
        }
    }

    private static final ExecutorService executor = new ThreadPoolExecutor(8, 8, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), new BioThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());

    public static void main(String[] args) {
        start(9527);
    }

    private static void start(int port){
        try {
            ServerSocket serverSocket = new ServerSocket(port);
            while (true){
                Socket socket = serverSocket.accept();
                executor.submit(()->{
                    try(
                            InputStreamReader inputStreamReader = new InputStreamReader(socket.getInputStream());
                            BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
                            PrintWriter printWriter = new PrintWriter(socket.getOutputStream(),true);
                    ){

                        String content = bufferedReader.readLine();
                        System.out.println("context="+content);
                        printWriter.println(content);
                    }catch (Exception e){
                        e.printStackTrace();
                    }finally {
                        if(socket != null){
                            try{
                                socket.close();
                            }catch (Exception e){}
                        }
                    }
                });
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

代码很简单,ServerSocket绑定一个端口,然后阻塞等待接受Socket连接,在接受连接之后交给线程池处理连接。

private native int poll0(long pollAddress, int numfds, int[] readFds, int[] writeFds, int[] exceptFds, long timeout, long fdsBuffer) // select调用的本地方法
  • client
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

public class BioClient {

    public static void main(String[] args) {
        sendSome("127.0.0.1",9527,"hello world!!");
    }

    private static void sendSome(String address,int port,String content){
        try {
            Socket socket = new Socket(address, port);
            InputStreamReader inputStreamReader = new InputStreamReader(socket.getInputStream());
            BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
            PrintWriter printWriter = new PrintWriter(socket.getOutputStream(),true);
            printWriter.println(content);
            String echo = bufferedReader.readLine();
            System.out.println("echo="+echo);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

客户端打开一个socket连接,写入内容,读取响应。

阻塞在哪? 一方面是每一个socket都需要和一个线程相绑定,如果不使用线程池则在大量连接到来的时候容易资源耗尽而打死服务,另一方面如果使用线程池,由于读写都是阻塞的,在前一个Socket读写未完成之前,后续的Socket只能在队列中等待。并且由于TCP窗口的大小变化会影响到写端速度,在读或者写慢的时候都会影响到整个链路的速度,如果这样的连接多了就导致线程池线程长期被阻塞,会导致整个服务端变慢,最终导致连接积压在队列中,如果队列满了达到线程池最大线程数了,最终会拒绝连接。

文件的FD是在调用open0()(new FileInputStream()) native方法设置的,而socket的FD是通过socket0()(new Socket(...))本地方法创建的。对于文件来说创建了fd就表示已经打开了一个文件,通过fd可以对文件进行读写,但是对于socket来说,创建fd之后还需要bind0()或者connect0()才能读写,这两个都是本地方法。

    /**
     * Reads up to <code>b.length</code> bytes of data from this input
     * stream into an array of bytes. This method blocks until some input
     * is available.该方法会阻塞,直到有输入为止。
     *
     * @param      b   the buffer into which the data is read.
     * @return     the total number of bytes read into the buffer, or
     *             <code>-1</code> if there is no more data because the end of
     *             the file has been reached.
     * @exception  IOException  if an I/O error occurs.
     */
    public int read(byte b[]) throws IOException {
        return readBytes(b, 0, b.length);
    }

NIO

BIO最大可连接数量又线程池最大连接数量与达到队列容量决定,实际上达不到这个数量就会资源耗尽。同时由于读写是阻塞的,在tcp窗口过小,读写本身太慢的情况下会一直阻塞浪费资源。为了解决这两个问题Java在jdk1.4推出了NIO相关的API。下面是一个NIO版本的Echo程序:

  • server

import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class NioServer {

    static class BioThreadFactory implements ThreadFactory {
        final AtomicInteger NUM = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r,"socket-executor-" + NUM.getAndIncrement());
        }
    }

    private static final ExecutorService executor = new ThreadPoolExecutor(8, 8, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), new BioThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());

    public static void main(String[] args) {
        start(9527);
    }

    private static void start(int port){
        try {
            Handler handler = new Handler();
            Selector selector = Selector.open();
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

            serverSocketChannel.bind(new InetSocketAddress(port),1024);
            serverSocketChannel.configureBlocking(false);

            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT,handler);
            while (true){
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                if(!CollectionUtils.isEmpty(selectionKeys)){
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()){
                        SelectionKey next = iterator.next();
                        Handler attachment = (Handler) next.attachment();
                        attachment.handle(next);
                        iterator.remove();
                    }
                }
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    private static class Handler {

        public void handle(SelectionKey selectionKey){
            if (selectionKey.isAcceptable()){
                handleAccept(selectionKey);
                return;
            }
            if (selectionKey.isReadable()){
                handleRead(selectionKey);
                return;
            }
        }

        private void handleAccept(SelectionKey selectionKey){
            try{
                ServerSocketChannel channel =(ServerSocketChannel) selectionKey.channel();
                Selector selector = selectionKey.selector();
                // 获取连接channel,设置为非阻塞,注册read
                SocketChannel accept = channel.accept();
                accept.configureBlocking(false);
                accept.register(selector,SelectionKey.OP_READ,selectionKey.attachment());
            }catch (Exception e){
                e.printStackTrace();
            }

        }

        private void handleRead(SelectionKey selectionKey){
            try {
                SocketChannel channel =(SocketChannel) selectionKey.channel();
                ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
                // 读取数据
                int read = channel.read(byteBuffer);
                if ( read > 0 ){
                    byteBuffer.flip();
                    byte[] bytes = new byte[byteBuffer.remaining()];
                    byteBuffer.get(bytes);
                    String content = new String(bytes, StandardCharsets.UTF_8);
                    System.out.println("content="+content);
                    if(!StringUtils.isEmpty(content)){
                        byte[] respBytes = content.getBytes(StandardCharsets.UTF_8);
                        byteBuffer = ByteBuffer.allocateDirect(respBytes.length);
                        byteBuffer.put(respBytes);
                        byteBuffer.flip();
                        channel.write(byteBuffer);
                    }
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        }

    }



}

打开Channel和Selecter,设置为非阻塞且注册感兴趣的操作。对不同的操作进行不同的处理。

  • client
import org.springframework.util.CollectionUtils;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;

public class NioClient {

    public static void main(String[] args) {
        sendSome("127.0.0.1",9527,"hello world!!");
    }

    private static void sendSome(String address,int port,String content){
        try {
            Selector selector = Selector.open();
            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
            Handler handler = new Handler();
            handler.setContent(content);

            if (socketChannel.connect(new InetSocketAddress(address, port))) {
                socketChannel.register(selector, SelectionKey.OP_READ,handler);
                handler.doWrite(socketChannel);
            }else {
                socketChannel.register(selector, SelectionKey.OP_CONNECT,handler);
            }

            while (true){
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                if(!CollectionUtils.isEmpty(selectionKeys)){
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey next = iterator.next();
                        Handler attachment = (Handler) next.attachment();
                        attachment.handle(next);
                        iterator.remove();
                    }
                }
            }

        }catch (Exception e){
            e.printStackTrace();
        }
    }

    private static class Handler {

        private String content;

        public void setContent(String content) {
            this.content = content;
        }

        public void handle(SelectionKey selectionKey){
            if (selectionKey.isConnectable()){
                handleConnect(selectionKey);
                return;
            }
            if (selectionKey.isReadable()){
                handleRead(selectionKey);
                return;
            }
        }

        private void handleConnect(SelectionKey selectionKey){
            try{
                SocketChannel channel =(SocketChannel) selectionKey.channel();
                if (channel.finishConnect()) {
                    Selector selector = selectionKey.selector();
                    channel.configureBlocking(false);
                    channel.register(selector,SelectionKey.OP_READ,selectionKey.attachment());
                    doWrite(channel);
                }

            }catch (Exception e){
                e.printStackTrace();
            }

        }

        private void handleRead(SelectionKey selectionKey){
            try {
                SocketChannel channel =(SocketChannel) selectionKey.channel();
                ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
                // 读取数据
                int read = channel.read(byteBuffer);
                if ( read > 0 ){
                    byteBuffer.flip();
                    byte[] bytes = new byte[byteBuffer.remaining()];
                    byteBuffer.get(bytes);
                    String content = new String(bytes, StandardCharsets.UTF_8);
                    System.out.println("echo="+content);
                }
                channel.close();
                System.exit(0);
            }catch (Exception e){
                e.printStackTrace();
            }
        }

        public void doWrite(SocketChannel socketChannel){
            try{
                byte[] bytes = content.getBytes(StandardCharsets.UTF_8);
                ByteBuffer byteBuffer = ByteBuffer.allocateDirect(bytes.length);
                byteBuffer.put(bytes);
                byteBuffer.flip();
                socketChannel.write(byteBuffer);
            }catch (Exception e){
                e.printStackTrace();
            }
        }

    }
}

client和server的代码差不多,需要注意的是在发送之前需要通过channel.finishConnect()或者channel.isConnected()判断连接是否完成。finishConnect()在阻塞模式下,未完成连接的Channel会先尝试连接知道成功或者异常。

代码中可以看到,这里并没有使用多线程,与accept()不同的是,每次select()之后返回的是一个Set,也就是说如果有多个Channel的多个操作已经就绪,则可以批量处理这些操作,而不是和accept一样每次处理一个连接,后续的读写需要接着处理。也就是说有大量连接的时候我们并不需要自己维护一个队列去存储连接,且NIO可以保证返回的Set里面的元素的对应操作是就绪的(除非一些意外情况,连接断开之类的),省去了读写等待的时间。

而做到这一点的关键就是IO多路复用技术,在BIO的情形下,我们需要自己轮询(或者阻塞)每个Socket是否连接,是否有数据,虽然Java封装了,但是操作系统实现的阻塞IO并不会因为Java封装而变得性能优越。在JDK1.4之前java只提供了BIO的封装,Java在网络编程性能一直比C与C++有巨大的差距。操作系统层实际上已经提供了IO复用技术用来解决BIO的弊端。Java只需要对操作系统进行屏蔽封装成JDK开放给Java开发者就OK了。NIO就是基于epoll的IO多路复用的封装。

Selector,IO多路复用器。Selector类是Java开放的多路复用器的抽象封装。每个SelectableChannel可以调用register方法将当前Channel感兴趣的操作(OP_READ、OP_WRITE、OP_CONNECT、OP_ACCEPT)注册到Selector,这样,selector会侦测所有已注册的SelectableChannel是否处于就绪状态,这样也就实现了IO多路复用。原本需要在每个SelectableChannel都阻塞的操作收归到了Selector.select方法,并且select方法提供了超时机制,长时间没有就绪可以针对空闲连接进行处理。

和前面文件流对象创建和Socket/ServerSocket对象创建一样,Selector.open()、ServerSocketChannel.open()和SocketChannel.open()也会在初始化的时候将Selector对象、SelectableSocket对象与对应的FD相绑定。jdk只给出了平台相关的实现,也就是说在windows下面只能找到WindowsSelectorImpl而找不到Linux对应的EpollSelectorImpl。

IO多路复用的方案:

select

就Linux来说,最早提供的IO多路复用技术就是select,可以看出来由于这个历史原因我们的Java Selector类的命名也是受此影响,作为多路复用技术,select提供了将多个fd注册到selector的fd的能力,当我们想要监控某些注册在selector上的fd的状态的时候只需要监听selector就行了。而select是通过顺序循环遍历每个注册在selector上的fd的状态做到的,并且selector所能注册的最大的fd的数量是1024,这对于大型服务服务器,什么游戏服务器来说是不够用的,且由于是遍历fd,对cpu的消耗也大。还牵扯到注册的fd数据在用户态和内核态的来回复制与网络数据从内核态复制到用户态。目前windows系统下面的nio实现是基于select方案的。

poll

上面说到,select有三个问题:可复用IO太少,需要循环遍历耗CPU,需要用户态与内核态数据复制。Linux上poll解决了注册在select上的fd的最大数量太少的问题,select的存储fd使用set结构,他的最大数量写死在FD_SETSIZE上了,如果需要扩容,需要从源码重新编译内核。而poll使用pollfd结构体它没有最大数量的限制,可以注册大量的fd到多路复用器上面。当时它没有解决遍历问题也没有解决数据复制问题,其IO效率和fd数量是线性相关的。
目前mac系统下面的nio默认使用kqueue方案,可以使用poll方案。

epoll

为了解决上述问题,Linux实现了epoll方案,首先epoll使用红黑树存储fd,其次epoll监测fs状态改遍历为事件触发,这是使得效率大大提高了,同时也不会空转浪费cpu。当然,红黑树是比前面的方案更加耗内存的。同时epoll使用mmap内存映射技术省去了大量的内核态与用户态数据复制的成本。
目前linux系统下面的nio是默认使用epoll方案,可以使用select和poll方案。

红黑树在linux进程管理,linux fs和这里的epoll都使用到了,在Java HashMap也使用了这个数据结构。这种数据结构在软件上十分重要。

于是BIO的两个问题,吞吐与阻塞都得到了较好的解决。当然它并不是完美的,或者说epoll本身还是有一些问题,其中最大的问题是事件触发之后需要用户主动获取数据。

AIO

在大多是时候,等待数据是一个浪费时间的过程。所以我们在磁盘数据读入内存的时候需要使用DMA以释放CPU。AIO的思想类似,在完成之后将回调CompletionHandler,做业务操作,比如服务端接收接收到数据完成之后会回调对于的handler做业务操作,我们不需要自己将channel里的数据读到buffer里面。它更加像是Promise(也就是Java里面的Future) then这样的形式,更加符合异步编程的实践。下面的AIO版本的Echo程序:

  • server

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;

public class AioServer {

    public static void main(String[] args) {
        start(9527);
        System.out.println("---service exit--");
    }

    private static void start(int port){
        try {
            AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel
                    .open()
                    .bind(new InetSocketAddress(port),1024);
            AcceptHandler handler = new AcceptHandler();
            serverSocketChannel.accept(serverSocketChannel,handler);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

   private static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel,AsynchronousServerSocketChannel> {
       @Override
       public void completed(AsynchronousSocketChannel result, AsynchronousServerSocketChannel attachment) {
           var byteBuffer = ByteBuffer.allocateDirect(1024);
           attachment.accept(attachment,this);
           result.read(byteBuffer,byteBuffer,new ReadHandler(result));
       }

       @Override
       public void failed(Throwable exc, AsynchronousServerSocketChannel attachment) {
           exc.printStackTrace();
       }
   }

   private static class ReadHandler implements CompletionHandler<Integer,ByteBuffer> {
        private final AsynchronousSocketChannel asynchronousSocketChannel;

        public ReadHandler(AsynchronousSocketChannel asynchronousSocketChannel){
            this.asynchronousSocketChannel = asynchronousSocketChannel;
        }


        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            attachment.flip();

            byte[] bytes = new byte[attachment.remaining()];
            attachment.get(bytes);
            String content = new String(bytes, StandardCharsets.UTF_8);
            System.out.println("content="+content);
            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
            byteBuffer.put(content.getBytes(StandardCharsets.UTF_8));
            byteBuffer.flip();
            asynchronousSocketChannel.write(byteBuffer,byteBuffer,new WriteHandler(asynchronousSocketChannel));

        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            exc.printStackTrace();
            try{
                asynchronousSocketChannel.close();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    private static class WriteHandler implements CompletionHandler<Integer,ByteBuffer> {
        private final AsynchronousSocketChannel asynchronousSocketChannel;

        public WriteHandler(AsynchronousSocketChannel asynchronousSocketChannel){
            this.asynchronousSocketChannel = asynchronousSocketChannel;
        }
        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            if (attachment.hasRemaining()){
                asynchronousSocketChannel.write(attachment,attachment,this);
            }
        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            exc.printStackTrace();
            try{
                asynchronousSocketChannel.close();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

}

跑这个服务,需要在System.out.println("---service exit--");加一个Thread断点,不然程序会自行退出。基本模式是绑定端口后accept完成后的回调,在accept回调处理中设置read完成后的回调,在read回调处理中设置write的回调。当然顺序可以变更,不过一般情况下都是接收连接后读取请求数据,根据读取数据处理写回什么。

  • client

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;

public class AioClient {

    public static void main(String[] args) {
        for (int i = 0; i < 100; i++) {
            sendSome("127.0.0.1",9527,"hello world!!");
        }
        System.out.println("client exit");
    }

    private static void sendSome(String address,int port,String content){
        try{
            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
            byteBuffer.put(content.getBytes(StandardCharsets.UTF_8));
            byteBuffer.flip();
            AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel
                    .open();
            socketChannel
                    .connect(new InetSocketAddress(address,port),byteBuffer,new ConnectHandler(socketChannel) );
        }catch (Exception e){

        }
    }

    private static class ConnectHandler implements CompletionHandler<Void, ByteBuffer> {
        private AsynchronousSocketChannel asynchronousSocketChannel;

        public ConnectHandler(AsynchronousSocketChannel asynchronousSocketChannel){
            this.asynchronousSocketChannel = asynchronousSocketChannel;
        }

        @Override
        public void completed(Void result, ByteBuffer attachment) {
            asynchronousSocketChannel.write(attachment,attachment,new WriteHandler(asynchronousSocketChannel));
        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            exc.printStackTrace();
            try{
                asynchronousSocketChannel.close();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }


    private static class WriteHandler implements CompletionHandler<Integer,ByteBuffer> {
        private final AsynchronousSocketChannel asynchronousSocketChannel;

        public WriteHandler(AsynchronousSocketChannel asynchronousSocketChannel){
            this.asynchronousSocketChannel = asynchronousSocketChannel;
        }
        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            if (attachment.hasRemaining()){
                asynchronousSocketChannel.write(attachment,attachment,this);
            }else {
                ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
                asynchronousSocketChannel.read(byteBuffer,byteBuffer,new ReadHandler(asynchronousSocketChannel));
            }
        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            exc.printStackTrace();
            try{
                asynchronousSocketChannel.close();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }


    private static class ReadHandler implements CompletionHandler<Integer,ByteBuffer> {
        private final AsynchronousSocketChannel asynchronousSocketChannel;

        public ReadHandler(AsynchronousSocketChannel asynchronousSocketChannel){
            this.asynchronousSocketChannel = asynchronousSocketChannel;
        }


        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            attachment.flip();
            byte[] bytes = new byte[attachment.remaining()];
            attachment.get(bytes);
            String content = new String(bytes, StandardCharsets.UTF_8);
            System.out.println("echo="+content);
            try{
                asynchronousSocketChannel.close();
            }catch (Exception e){
                e.printStackTrace();
            }
        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            exc.printStackTrace();
            try{
                asynchronousSocketChannel.close();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

}

客户端与服务端类似,连接完成后写入请求数据,之后等待服务响应数据完成处理响应数据。

AIO是我们自己写Java原生网络编程最优先考虑的,相较于nio,AIO的编程模型更加简洁,不易出错。当然,世界上没有银弹,AIO需要预先分配内存。同样的Java Buffer容易出错的flip操作等等都无法避免,粘包半包问题也是需要解决。

离不开底层支撑

到这里,我们碰到了Java的边界,尽管Java已经为我们屏蔽了很多细节,但是高级的实现离不开操作系统的支持,它不像Unsafe的cas接口一样具有平台通用性,只是对底层细节的包装,在NIO这里,使用IO多路复用的方案直接依赖操作系统提供的已实现的方案。windows就是不能使用epoll,没有的东西变不出来。于是我们只能使用更高一层的封装,直接使用Selector.open(),屏蔽操作系统实现细节。

Reactor模式(线程模型)

基于IO多路复用的编程模式普遍存在于各种服务端软件,比如redis,比如nginx。epoll基于事件为事件分派与事件处理提供了前置条件。Reactor模式将事件的分派与事件的处理分离开来以提高吞吐,对于服务端应用来说,及时响应请求是重中之重。在这个模式中分派(Dispatch)事件的角色叫做Reactor(也有叫Dispatcher的),处理事件的角色叫做Acceptor与Handler,Acceptor处理连接事件,Handler处理读写事件。这样,在处理连接事件的时候,Reactor会将其分发给Acceptor处理,读写事件Reactor会分发给Handler处理,这样做的好处是Reactor不需要处理业务逻辑,只需要接收事件分派事件,提高Reactor的吞吐与处理效率,使得我们可以同时接收更多的连接,Reactor、Acceptor和Handler可以在不同的线程与线程池中运行,大大提高了我们服务端软件的并行效率。
这样的模式与普遍存在与我们的框架中,不论是springmvc的DispatcherServlet还是springwebflux的DispatcherHandler都是基于这样的分发模式。

Proactor模式

Linux的AIO是通过Epoll模拟的。AIO的思想Epoll NIO的不同在于,AIO是在一个事件完成之后回调业务,而Epoll NIO是在一个事件触发之后通知业务。比如一个读事件触发之后,业务需要自己做读数据操作,读完成后对读出的数据进行业务操作,而AIO触发读完成事件,读操作有操作系统完成,回调的业务只需要关系如何处理数据做业务就行了。Proactor是更优解吗?这个是十分依赖平台的,在各操作系统完善的情况下,是的,Proactor简化了很多东西,在应用上可以省去很多事情,但是在linux环境下由于都是基于epoll的实现,AIO并没有更优秀。

Netty的选择

netty是一个基于reactor模式的NIO框架。由于AIO在linux的性能不高且与reactor模式不一致,目前的netty不支持aio。在默认的情况下netty使用java提供的默认方案(windows使用select,linux使用epoll,mac使用kqueue)。netty通过JNI实现一个边缘触发的Epoll多路复用方案,提供的是水平触发的。

  • 边缘触发(edge-trggered)
    当文件描述符关联的读内核缓冲区由空转化为非空的时候,则发出可读信号进行通知,
    当文件描述符关联的内核写缓冲区由满转化为不满的时候,则发出可写信号进行通知
  • 水平触发(level-trggered)
    只要文件描述符关联的读内核缓冲区非空,有数据可以读取,就一直发出可读信号进行通知,当文件描述符关联的内核写缓冲区不满,有空间可以写入,就一直发出可写信号进行通知

两者的区别在哪里呢?水平触发是只要读缓冲区有数据,就会一直触发可读信号,而边缘触发仅仅在空变为非空的时候通知一次。


使用Netty是实现一个简单的网络协议

定义协议标准

定义协议格式

长度(length)版本(version)内容(data)
4 bytes1 bytelength bytes

说明:
4字节(一个int的大小)Frame长度用来标识数据长度,用来解决粘包与半包问题,1字节版本(0到127),用来标识协议版本号,剩下的变长数据是json消息体。

定义数据体(消息体)格式:

{"name":"name","value":"value"}

粘包与半包问题
由于系统socket缓冲区大小的有限的,所以每次flush发送给其他主机的数据并不一定是本次请求的全部数据也不一定全是本次请求需要的数据。常见的解决方案有分隔符标识,基于行标识,定长消息,长度字段。

  • 分隔符标识与基于行标识基于同一个理念处理:协议约定一个或者多个分隔符标识作为一个完整消息的分割标识,在读到分割标识的时候表示该消息已经被完整接受,截断然后丢弃多余数据即可。如果未达到消息最大长度且未读取分隔符,则表示需要继续读取数据。行标识就是用换行符作为分隔标识的特异情况。
  • 定长消息:定义一个消息的的固定长度,在数据长度不足时补齐0,长度不能过长,或者添加数据去标识数据还未都到末尾。
  • 长度字段:这个是最常用的处理方式,我也是使用这个方式,思路是在数据开头定义一个定长的数据去存储当前消息的长度,因此接受端知道数据读到什么地步为止。这种解决方式最大的优势是消息是灵活可变长的。

实现标准

实现服务器

  • 消息

      @Data
      public class MsgContent{
          private String name;
          private String value;
      }
  • 帧解码器

      public class FrameDecoder extends LengthFieldBasedFrameDecoder {
          public FrameDecoder(){
              // 最大长度 跳过0长度 4长度字段 纠错0 裁剪4字节(4位长度)快速失败
              super(Integer.MAX_VALUE,0,4,0,4,true);
          }
      }
  • 消息解码(带版本校验)

      public class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {
    
          @Override
          protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
              byte b = msg.readByte();
              // 校验版本
              if(!supportVersion(b)){
                  throw new RuntimeException("msg version not support");
              }
              // 做成对象继续传播处理
              byte[] msgAll = new byte[msg.readableBytes()];
              msg.readBytes(msgAll);
              MsgContent content =JSON.parseObject(msgAll,MsgContent.class);
              out.add(content);
          }
    
          protected boolean supportVersion(byte version){
              return version <= 100;
          }
      }
  • 业务消息处理

      public class MsgHandler extends SimpleChannelInboundHandler<MsgContent>{
    
          @Override
          protected void channelRead0(ChannelHandlerContext ctx, MsgContent msg)  {
              System.out.println(msg.getName()+":"+msg.getValue());
              msg.setName("response");
              msg.setValue("hello netty");
              // 响应数据
              ctx.writeAndFlush(msg);
          }
      }
  • 消息编码(带版本填充)

      public class MessageEncoder extends  MessageToMessageEncoder<MsgContent>{
          @Override
          protected void encode(ChannelHandlerContext ctx, MsgContent msg, List<Object> out) throws Exception {
              String s = JSON.toJSONString(msg);
              byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
              ByteBuf buffer = ctx.alloc().buffer();
              // 填充版本号
              buffer.writeByte(100);
              buffer.writeBytes(bytes,0,bytes.length);
              out.add(buffer);
          }
      }
  • 帧编码器/封帧

      public class FrameEncoder extends LengthFieldPrepender {
          public FrameEncoder() {
              // 前4个字节为长度
              super(4);
          }
      }
  • 编排

      public static void main(String[] args) throws InterruptedException {
          EventLoopGroup boss = new NioEventLoopGroup();
          EventLoopGroup worker = new NioEventLoopGroup();
          ServerBootstrap bootstrap = new ServerBootstrap();
          ChannelFuture channelFuture = bootstrap
                  .group(boss, worker)
                  .channel(NioServerSocketChannel.class)
                  .option(CONNECT_TIMEOUT_MILLIS, 60000)
                  // 日志
                  .handler(new LoggingHandler(LogLevel.DEBUG))
                  .childOption(SO_KEEPALIVE, true)
                  .childHandler(new ChannelInitializer<Channel>() {
                      @Override
                      protected void initChannel(Channel ch) throws Exception {
                          ch.pipeline()
                                  // V型传播处理 帧解码 ——》消息解码 -》业务处理 -》
                                  // 消息编码 -》 封帧 
                                  .addLast(new FrameDecoder())
                                  .addLast(new FrameEncoder())
                                  .addLast(new MessageDecoder())
                                  .addLast(new MessageEncoder())
                                  .addLast(new MsgHandler());
    
    
    
                      }
                  })
                  .bind(8080);
          channelFuture.sync();
      }

实现客户端
客户由于协议消息的一致的,所以编解码实现是一致的。通常情况下,我们的请求和响应是一问一答的模式,在客户端接收到响应之后不在写入消息。

  • 编排

      public static void main(String[] args) throws InterruptedException {
          EventLoopGroup worker = new NioEventLoopGroup();
          Bootstrap bootstrap = new Bootstrap();
          ChannelFuture channelFuture = bootstrap
                  .group(worker)
                  .channel(NioSocketChannel.class)
                  .option(CONNECT_TIMEOUT_MILLIS, 60000)
                  .handler(new LoggingHandler(LogLevel.DEBUG))
                  .handler(new ChannelInitializer<>() {
                      @Override
                      protected void initChannel(Channel ch)  {
                          ch.pipeline().addLast(
                                  new LoggingHandler(LogLevel.DEBUG),
                                  new FrameDecoder(),
                                  new FrameEncoder(),
                                  new MessageDecoder(),
                                  new MessageEncoder(),
                                  new MsgHandler()
                          );
                      }
                  })
                  .connect("127.0.0.1",8080);
          MsgContent msgContent = new MsgContent();
          msgContent.setName("request");
          msgContent.setValue("hello netty");
          channelFuture.sync().channel().writeAndFlush(msgContent);
      }

注意:

  1. 你可能会希望客户端请求的时候阻塞等待获取到对应的响应。这需要每个消息添加一个消息id字段,在发送之前将id和存响应的Future(在其他库可能叫Promise)做绑定,在处理完响应之后将响应存入Future。

添加高级特性

心跳检查与活跃性检查

Netty自带IdleStateHandler空闲状态处理器,可以设置在空闲多长时间之后触发空闲事件。我们通过这个事件添加心跳检查与活跃性检查。

客户端心跳检测

  • 定义响应事件

    public class PongEvent {}
  • 处理用户事件

    import com.example.MsgContent;
    import io.netty.channel.ChannelDuplexHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.timeout.IdleStateEvent;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    @Slf4j
    public class HeartbeatEventHandler extends ChannelDuplexHandler {
      private AtomicInteger IDLE_TIMES = new AtomicInteger(0);
    
      @Override
      public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
          if(evt instanceof IdleStateEvent){
              int i = IDLE_TIMES.getAndIncrement();
              if(i > 10){
                  ctx.close();
                  log.info("channel error ,channel close");
              }else {
                  MsgContent msgContent = new MsgContent();
                  msgContent.setName("ping");
                  msgContent.setValue("need pong");
                  ctx.writeAndFlush(msgContent);
                  log.info("heartbeat ,channel send ping");
              }
              return;
          }
    
          if (evt instanceof PongEvent){
              IDLE_TIMES.set(0);
              log.info("handle pong event");
              return;
          }
    
          super.userEventTriggered(ctx, evt);
      }
    }
    
  • 添加处理器

    ch.pipeline().addLast(
          new FrameDecoder(),
          new FrameEncoder(),
          new MessageDecoder(),
          new MessageEncoder(),
          new MsgHandler(),
          // 5秒没有写数据则触发空闲事件
          new IdleStateHandler(0,5,0),
          new HeartbeatEventHandler()
    
    );
  • MsgHandler添加对于ping,pong消息的特殊处理

      public class MsgHandler extends SimpleChannelInboundHandler<MsgContent> {
    
          @Override
          protected void channelRead0(ChannelHandlerContext ctx, MsgContent msg) throws Exception {
              if("pong".equals(msg.getName())){
                  ctx.fireUserEventTriggered(new PongEvent());
                  return;
              }
              if("ping".equals(msg.getName())){
                  msg.setName("pong");
                  ctx.writeAndFlush(msg);
                  return;
              }
              System.out.println(msg.getName()+":"+msg.getValue());
          }
      }

客户端主要做心跳检测,需要上报心跳到服务端保持活跃状态。业务流程大概如下:

  1. 客户端5s未写数据则触发空闲事件
  2. 客户端空闲事件处理有两种情况,第一种是未达到最大空闲次数则当前空闲次数增加,写入ping消息发送到服务端;第二种是已经达到最大次数则需要关闭连接。这里总共需要10 * 5s也就是50秒的空闲才会触发关闭。
  3. 在收到服务端pong消息之后,触发PongEvent用户事件。
  4. 客户端处理pong事件只有一种方式:重置当前空闲次数。
  5. 另外服务端会发送ping消息对客户端进行活跃新检查,客户端在收到ping消息后也需要回复pong消息。

服务端活跃性检查
服务端的基本流程是差不多这里不在赘述。

  • 添加编解码器与事件处理器

    ch.pipeline()
          // 解码
          .addLast(new FrameDecoder())
          .addLast(new FrameEncoder())
          .addLast(new MessageDecoder())
          .addLast(new MessageEncoder())
          .addLast(new MsgHandler())
          // 添加空闲处理
          .addLast(new IdleStateHandler(10,0,0))
          .addLast(new HeartbeatEventHandler());

服务端主要做活跃性检测,业务流程大概如下:

  1. 十秒没有收到读数据则触发空闲事件
  2. 服务端处理事件的方式有两种,一是达到最大次数,关闭连接(10 * 10s = 100s),二是没有达到最大次数做正常的心跳检测。
    后面的处理与客户端相同。

日志

  • 添加LoggingHandler

    ch.pipeline()
          // 解码
          .addLast(new FrameDecoder())
          .addLast(new FrameEncoder())
          .addLast(new MessageDecoder())
          .addLast(new MessageEncoder())
          // 添加日志
          .addLast(new LoggingHandler(LogLevel.DEBUG))
          .addLast(new MsgHandler())
          .addLast(new IdleStateHandler(10,0,0))
          .addLast(new HeartbeatEventHandler());

LoggingHandler输出的内容与格式与编排的位置有关,这里示例的位置会调用MsgContent的toString方法。在我们需要输出出入参数的时候这个处理器可以派上用场。

SSL/TLS

在绝大多数情况下,Netty自身的ssl是派不上用场的。在默认情况下内网被认为是安全的,自然不需要加密。在其他需要哦暴漏的服务上,基本上都是需要通过nginx做反向代理的,所以的nginx上我们我们已经做了加解密了。
jdk在安装后自带有一个keytool,可以用来生成我们需要的密钥。

  • 生成密钥
keytool -genkey -alias serverKey -keysize 2048 -validity 365 -keyalg RSA -dname "CN=bigbrotherlee" -keypass serverPass -storepass serverPass -keystore serverKey.jks

解释:keytool -genkey -alias 别名 -keysize 长度 -validity 过期天数 -keyalg 加密算法 -dname 证书所有者信息 -keypass 密钥密码 -storepass 密钥库密码 -keystore 存储密钥库文件

执行完成后会得到一个警告:

JKS 密钥库使用专用格式。建议使用 "keytool -importkeystore -srckeystore serverKey.jks -destkeystore serverKey.jks -deststoretype pkcs12" 迁移到行业标准格式 PKCS12。
  • 迁移行业标准格式
keytool -importkeystore -srckeystore serverKey.jks -destkeystore serverKey.jks -deststoretype pkcs12

输出:

已将 "serverKey.jks" 迁移到 Non JKS/JCEKS。将 JKS 密钥库作为 "serverKey.jks.old" 进行了备份。
  • 生成证书
keytool -export -alias serverKey -keystore serverKey.jks -storepass serverPass -file server.cer
  • 生成客户端密钥
keytool -genkey -alias clienKey -keysize 2048 -validity 365 -keyalg RSA -dname "CN=bigbrotherlee" -keypass clientPass -storepass clientPass -keystore clientKey.jks
  • 迁移客户端密钥至行业标准格式
keytool -importkeystore -srckeystore clientKey.jks -destkeystore clientKey.jks -deststoretype pkcs12
  • 导入正式至客户端密钥
keytool -import -trustcacerts -alias serverCer -file server.cer -storepass clientPass -keystore clientKey.jks

输出:

是否信任此证书? [否]:  y
证书已添加到密钥库中
  • 服务端

    SslContext sslContext = null;
    try{
      // 我们使用的是pkcs12格式,刚刚已经转换过了
      KeyStore pkcs12 = KeyStore.getInstance("pkcs12");
      // 密码是我们创建密钥的时候的密码
      pkcs12.load(new FileInputStream("D:/key/serverKey.jks"),"serverPass".toCharArray());
      KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance("SunX509");
      keyManagerFactory.init(pkcs12,"serverPass".toCharArray());
      // 构建context
      sslContext = SslContextBuilder.forServer(keyManagerFactory).build();
    }catch (KeyStoreException | IOException | NoSuchAlgorithmException | CertificateException | UnrecoverableKeyException exception){
      throw  new RuntimeException(exception);
    }
    SSLEngine sslEngine = sslContext.newEngine(ch.alloc());
    // 服务模式
    sslEngine.setUseClientMode(false);
    ch.pipeline()
          // 添加ssl
          .addLast(new SslHandler(sslEngine))
          .addLast(new FrameDecoder())
          .addLast(new FrameEncoder())
          .addLast(new MessageDecoder())
          .addLast(new MessageEncoder())
          // 添加日志
          .addLast(new LoggingHandler(LogLevel.DEBUG))
          .addLast(new MsgHandler())
          .addLast(new IdleStateHandler(10,0,0))
          .addLast(new HeartbeatEventHandler());
  • 客户端

    SslContext sslContext = null;
    try{
      KeyStore pkcs12 = KeyStore.getInstance("pkcs12");
      pkcs12.load(new FileInputStream("D:/key/clientKey.jks"),"clientPass".toCharArray());
      TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance("SunX509");
      trustManagerFactory.init(pkcs12);
      // 添加受信任的证书
      sslContext = SslContextBuilder.forClient().trustManager(trustManagerFactory).build();
    }catch (KeyStoreException | IOException | NoSuchAlgorithmException | CertificateException  exception){
      throw  new RuntimeException(exception);
    }
    SSLEngine sslEngine = sslContext.newEngine(ch.alloc());
    // 客户端模式
    sslEngine.setUseClientMode(true);
    
    ch.pipeline().addLast(
          // 添加ssl
          new SslHandler(sslEngine),
          new LoggingHandler(LogLevel.DEBUG),
          new FrameDecoder(),
          new FrameEncoder(),
          new MessageDecoder(),
          new MessageEncoder(),
          new MsgHandler(),
          new IdleStateHandler(0,5,0),
          new HeartbeatEventHandler()
    
    );

注意:在默认情况下keytool生成的是jsk格式的密钥,由于我们已经转换成了pkcs12格式的密钥,所以我们导入的时候,也需要设置格式为pkcs12,否则KeyStore.load会解析失败报错。

断线重连

断线重连是一个客户端行为,属于一种自愈措施,一方面断线重连可以在服务器节点故障的时候重新连接备用节点,另一方面在网络切换之后可以自动重连继续任务。

@Slf4j
public class HeartbeatEventHandler extends ChannelDuplexHandler {
    private AtomicInteger IDLE_TIMES = new AtomicInteger(0);

    private final boolean isClient;
    private final Bootstrap bootstrap;
    public HeartbeatEventHandler(boolean isClient,Bootstrap bootstrap){
        this.isClient = isClient;
        this.bootstrap = bootstrap;
    }
    // 重连执行器
    private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
    // 断线回调
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // 重置心跳
        IDLE_TIMES.set(0);
        if (!isClient) {
            super.channelInactive(ctx);
        }else {
            // 10秒后重新连接
            scheduledExecutorService.schedule(()-> reconnect(bootstrap),10, TimeUnit.SECONDS);
        }

    }

    private void reconnect(Bootstrap bootstrap){
         // 重新连接
        bootstrap.connect("127.0.0.1",8080).addListener(future -> {
            if (!future.isSuccess()) {
                // 10秒后重新连接,如果失败10s后再次尝试。
                scheduledExecutorService.schedule(()-> reconnect(bootstrap),10, TimeUnit.SECONDS);
            }
        });
    }
// 。。。。。。。。
}

// new HeartbeatEventHandler(true,bootstrap)

这里只是简单的演示。通常情况下,在无法重新连接的时候需要逐渐加长重连时间间隔,这里只是固定十秒间隔重连,是不合理的,同时重线连接得得有最大次数,最终尝试后不能重连需要抛出异常。我们需要区分是否需要重连,在用户主动断开连接的情况下不需要重新连接,在这里只考虑的断开连接的情况,在异常的情况下也是需要重新连接的。

重连是一个客户端行为,我们需要构建自己的重连策略满足需求。在前面的代码种可以看到Netty不只是性能高,可扩展性也非常强下面会详细解析其高性能与高扩展性的是如何做到的。


Netty是如何工作的

与绝大多数框架一样,Netty也包括这两个过程:配置过程与将配置在运行时应用的过程。下面详解这两个过程。

创建与配置Bootstrap/ServerBootstrap

Bootstrap与ServerBootstrap都是配置的帮助类,可以方便的通过方法链配置我们的服务端与客户端。他们都是AbstractBootstrap的子类。Bootstrap用于创建客户端,AbstractBootstrap提供了bind方法,再UDP协议的情况下客户端也是需要调用bind方法绑定一个地址的,在TCP协议下,客户端调用connect方法连接服务器。

创建一个Bootstrap或者ServerBootstrap对象很简单,只需要调用有参构造方法,传入相应的Bootstrap或者ServerBootstrap对象,传入对象的属性也就是配置;或者也可以像上面示例代码一样使用无参构造,在后面使用方法链传入你的配置就行。

配置包括以下几个部分:

EventLoopGroup

使用Bootstrap/ServerBootstrap.group方法传入,在Bootstrap情况下只有一个EventLoopGroup,用来用来处理所有的Channel事件。在ServerBootstrap情况下由于Reactor模式使得Acceptor与Handler分别在不同的线程池上执行,所以在一般情况下ServerBootstrap会像上面示例一样使用两个线程池boss和worker分别负责SocketChannel的接收和数据的处理。在构造EventLoopGroup对象的时候,前文NIO提到的Selecter对象也会在此时创建,在构造在NioEventLoopGroup时提供SelectorProvider,在newChild构建NioEventLoop时打开一个Selector

    public NioEventLoopGroup(int nThreads, Executor executor) {
        // SelectorProvider.provider()前文Selector.open()是一致的(SelectorProvider.provider().openSelector())
        this(nThreads, executor, SelectorProvider.provider());
    }

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                 EventLoopTaskQueueFactory queueFactory) {
        super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
                rejectedExecutionHandler);
        // 上面设置的SelectorProvider.provider()
        this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
        this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
        // 调用provider.openSelector
        final SelectorTuple selectorTuple = openSelector();
        this.selector = selectorTuple.selector;
        this.unwrappedSelector = selectorTuple.unwrappedSelector;
    }

此时每个NioEventLoop对象都绑定了自己的Selector对象。

本质上一个EventLoopGroup一个ScheduledExecutorService也就是一个ExecutorService线程池。在默认未设置io.netty.eventLoopThreads的情况下,线程数为处理器数量*2,这个很好理解,IO密集型的场景下线程池大小一般都这么设置,在CPU密集型的场景下一般设置n+1的线程数量,在此处很显然是IO密集型场景。旧的shutdown与shutdownNow方法被废弃了,使用shutdownGracefully(long, long, TimeUnit)或者shutdownGracefully()替代,因为关闭线程池需要关闭回收Channel资源与Buffer资源,旧的shutdown方法无法满足需求。

ChannelFactory

设置ChannelFactory有两种方式,一是和上面示例一样传入NioSocketChannel.class这样的Channel类对象,二是传入ChannelFactory对象。在上面的示例中,传入NioSocketChannel.class和NioServerSocketChannel.class都被包装成的ChannelFactory。

    public B channel(Class<? extends C> channelClass) {
        return channelFactory(new ReflectiveChannelFactory<C>(
                ObjectUtil.checkNotNull(channelClass, "channelClass")
        ));
    }

ReflectiveChannelFactory反射获取无参构造器,在newChannel方法返回构造的Channel实例。

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Constructor<? extends T> constructor;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        ObjectUtil.checkNotNull(clazz, "clazz");
        try {
            this.constructor = clazz.getConstructor();
        } catch (NoSuchMethodException e) {
            throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
                    " does not have a public non-arg constructor", e);
        }
    }

    @Override
    public T newChannel() {
        try {
            return constructor.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
        }
    }

    @Override
    public String toString() {
        return StringUtil.simpleClassName(ReflectiveChannelFactory.class) +
                '(' + StringUtil.simpleClassName(constructor.getDeclaringClass()) + ".class)";
    }
}

Options

ChannelOption可以使用类型安全的方式配置ChannelConfig,也就是说配置ChannelOption的目的就是为了设置ChannelConfig,ChannelConfig与一个Channel相绑定。对于Bootstrap来说,它只有自己的一个SocketChannel,所以他也只有一个从AbstractBootstrap继承过来的option()方法来设置ChannelOption;对于ServerBootstrap来说,它有一个用来接受连接的ServerSocketChannel,在接受连接之后需要处理对应连接的SocketChannel,所以它对这两个Channel有单独的设置,对于ServerSocketChannel会直接应用option方法设置的ChannelOption,对于SocketChannel他会使用childOption方法设置的ChannelOption,childOption与option方法配置上并无差别。option()与childOption()方法可以调用多次以设置多个不同的ChannelOption,使用null值value来移除一个Options。

    public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
        ObjectUtil.checkNotNull(childOption, "childOption");
        synchronized (childOptions) {
            // 如果value为null,移除一个childOption
            if (value == null) {
                childOptions.remove(childOption);
            } else {
                childOptions.put(childOption, value);
            }
        }
        return this;
    }

ChannelOption封装了NIO的连接配置,一些操作系统TCP/IP的配置与Netty自身的配置。最终使用是使用的ChannelConfig,使用的时候一般需要使用子类型,比如ServerChannelConfig以获得对于特定Channel的配置。

attr

前文在介绍NIO的时候可以看到在注册事件的时候可以添加一个attachment附件,它实际是绑定的selectionKey上的一个任意对象,在常规使用的时候会用来保存事件Handler。在Netty里面有一个类似的功能attr,他的配置与Options是及其相似的,不过在使用上ChannelOption是为了方便设置ChannelConfig,目的是为了方便设置Channel。而attr一般是在处理事件(本质是执行管道处理数据)的时候取出使用,一个attr是与一个Channel绑定的,在管道处理过程中可以存取该对象,这有点类似于Servlet标准的Request、Session、Application三个作用域带有的Attribute一样,是为了方便在整个生命周期共享数据使用的。同样的对于客户端的Bootstrap来说,它只有一个SocketChannel,也只有一个attr方法用来在创建的时候为这个Channel设置attribute,而对于ServerBootstrap来说,有一个attr方法用来给ServerSocketChannel设置属性,有一个childAttr用来在接受到连接之后为SocketChannel设置属性。


    /**
     * 为新创建的Channel设置Attribute
     */
    public <T> B attr(AttributeKey<T> key, T value) {
        ObjectUtil.checkNotNull(key, "key");
        if (value == null) {
            attrs.remove(key);
        } else {
            attrs.put(key, value);
        }
        return self();
    }

需要注意的是,在通常情况下ChannelOption只能设置预先定义好的选项,比如上面例子的SO_KEEPALIVE,我们自定义的选项在绝大多数时候是不会被处理的;而Attributebe不同,它完全由我们自定义AttributeKey,然后设置值和取值。ChannelOption和AttributeKey都是类型安全的,可以实现类型安全的存取数据。
配置Bootstrap/ServerBoostrap阶段可以设置attr,处理阶段我们也可以从当前Channel上面读写Attribute,下面会讲到handler。

ChannelHandler

ChannelHandler是处理业务的核心。ChannelHandler负责处理Channel的各种事件,在上文提到Epoll通过事件触发用户程序的处理逻辑。当然ChannelHandler的功能是经过增强的,它不止处理读写注册等等事件。与上文attr和option不同的是,ChannelHandler虽然也可以通过handler或者childHandler方法设置handler,但是不能重复调用以添加多个handler。在初始化过程中使用的是一个特殊的ChannelHandler ———— ChannelInitializer的派生子类,他是一个特殊的ChannelInboundHandler,它内部有一个抽象方法initChannel,在Channel初始化完成注册事件完成后回被调用。而ChannelPipeline也是随着Channel一起初始化且绑定在一起的,所以通常情况下ChannelInitializer是专门用来bootstrap何serverbootstrap的handler与childHandler的方法使用的,毕竟除了初始化完成这个时机回会调用到initChannel方法其他时候都不会触发这个ChannelInitializer专门新增的抽象方法,而Channel的初始化只会在调用bind和connect方法都触发一次(特别的是,服务端接收的每一个SocketChannel都会在初始化完成后触发,这意味着ChannelInitializer实例应当是一个可共享的对象,需要做好同步处理)。通常情况下ChannelInitializer都是用来为Channel绑定的pipeline设置ChannelHandler的。

@Override
protected void initChannel(Channel channel) {
// 该方法会在每次接收SocketChannel初始化完成后后调用一次
   ch.pipeline().addLast(
      new SslHandler(sslEngine),
      new LoggingHandler(LogLevel.DEBUG),
      new FrameDecoder(),
      new FrameEncoder(),
      new MessageDecoder(),
      new MessageEncoder(),
      new MsgHandler(),
      new IdleStateHandler(0,5,0),
      new HeartbeatEventHandler(true,bootstrap)
   );
}

绑定与连接

对于一个ServerBootstrap它的构建配置完成的最后一步便是调用bind方法绑定一个地址以准备接受连接;当然这并不代表该方法是ServerBootstrap特有的,在UDP协议,客户端也是需要绑定一个地址用以收发广播。相反Bootstrap的最后一步connect()去连接一个地址这个connect方法确是Bootstrap特有的,客户端才有connect一说。
调用bind与connect方法都会返回一个ChannelPromise对象,可以检查绑定或者连接是否完成,ChannelPromise关联有一个Channel对象,具体的类型与前文ChannelFactory的设置有关,该Channel对象可以获取前文提到的配置,比如attr。同时调用bind与connect方法初始化Channel也会初始化与Channel相绑定的ChannelPipeline与ChannelHandlerContext。

bind()

调用bind会创建一个Channel对象且绑定到对应地址与端口。在只提供的端口的情况下会绑定到本地任意一个IP上(通过InetAddress.anyLocalAddress()获取),同时校验端口是否在合法范围内(0 到 65535)。
在获取到正确的端口与地址(InetSocketAddress对象)后。校验EventLoopGroup与ChannelFactory是否设置,前文提到过前者是执行处理器的线程池后者用来创建Channel对象。
io.netty.bootstrap.AbstractBootstrap#validate

    public B validate() {
        if (group == null) {
            throw new IllegalStateException("group not set");
        }
        if (channelFactory == null) {
            throw new IllegalStateException("channel or channelFactory not set");
        }
        return self();
    }

而后正式调用doBind方法正式执行绑定流程在doBind方法内部首先调用io.netty.bootstrap.AbstractBootstrap#initAndRegister方法初始化Channel:

  1. 通过channelFactory初始化一个Channel

默认使用SelectorProvider.provider().openServerSocketChannel()打开一个ServerSocketChannel,在上文的ServerSocketChannel.open();内部也是使用SelectorProvider.provider().openServerSocketChannel()打开一个ServerSocketChannel。
在获取到ServerSocketChannel(jdk原生)需要对其就行包装成NioServerSocketChannel(netty)对象。

    public NioServerSocketChannel() {
        // DEFAULT_SELECTOR_PROVIDER就是SelectorProvider.provider();
        // newSocket() 会调用provider.openServerSocketChannel()
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

调用父类初始化方法:

// 首先调用AbstractChannel
    protected AbstractChannel(Channel parent) {
        // 此处是null
        this.parent = parent;
        // DefaultChannelId.newInstance();获取ChannelId对象
        id = newId(); 
        // new NioMessageUnsafe();用来操作底层对象
        unsafe = newUnsafe(); 
        // new DefaultChannelPipeline(this);初始化pipeline
        pipeline = newChannelPipeline(); 
    }

// 调用AbstractNioChannel
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        // SelectorProvider.provider().openServerSocketChannel()获得的java底层ServerSocketChannel对象
        this.ch = ch;
        // 此处是 SelectionKey.OP_ACCEPT 前文提到,NIO事件接收到客户端连接后触发
        this.readInterestOp = readInterestOp;
        try {
            // 设置Channel为非阻塞
            ch.configureBlocking(false);
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                logger.warn(
                            "Failed to close a partially initialized socket.", e2);
            }

            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }

初始化配套的ChannelConfig对象,此处是NioServerSocketChannelConfig,管理初始的Buffer分配器,是否自动读,自动关闭,连接超时时间等等配置。

        // 默认自动读 private volatile int autoRead = 1;
        private NioServerSocketChannelConfig(NioServerSocketChannel channel, ServerSocket javaSocket) {
            super(channel, javaSocket);
        }

注意: 设置自动关闭后会调用clearReadPending => clearReadPending0清除readInterestOp,现在该值是SelectionKey.OP_ACCEPT

    // NioServerSocketChannelConfig.autoReadCleared
    protected void autoReadCleared() {
        clearReadPending();
    }
  1. 上文Channel对象已经构建了,接下来需要为这个Channel初始化
    @Override
    void init(Channel channel) {
        // 将上文option()设置的ChannelOptions设置进channel,和上文提到的一样,其实是设置ChannelConfig
        setChannelOptions(channel, newOptionsArray(), logger);

        // 将上文attr设置的ChannelAttribute设置进Channel,此处会通过channel.attr(attribute)与channel绑定
        setAttributes(channel, newAttributesArray());
        // 前文初始化NioServerSocketChannel对象是创建的绑定在Channel对象上的pipeline
        ChannelPipeline p = channel.pipeline();
        
        //通过group设置的
        final EventLoopGroup currentChildGroup = childGroup;
        // 通过childHandler设置的
        final ChannelHandler currentChildHandler = childHandler;
        // 通过childOption设置的
        final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
        // 通过childAttr设置的
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);
        // 添加初始化Handler到NioServerSocketChannel的pipeline
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                // 此处ch是还是上文创建的NioServerSocketChannel对象
                final ChannelPipeline pipeline = ch.pipeline();
                // 通过上文提到的handler()方法设置的ChannelHandler
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        // 添加一个ServerBootstrapAcceptor(也是一个ChannelHandler对象)用来处理收到的客户端连接
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

到此我们已经创建完成了NioServerSocketChannel对象斌对他完成了初始化,设置Option,Attribute、Handler,并设置了接收到客户端连接需要执行的处理器ServerBootstrapAcceptor。

  1. 我们已经准备好接收连接了,但是还未将Selector对象与Channel对对象相关联,接下来我们需要像上文NIO一样将ServerSocketChannel的感兴趣的事件注册到Selector。
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            // 前文1、2两步就是执行这两步
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
           // 创建Channel失败或者初始化失败的处理逻辑
           return new DefaultChannelPromise(...).setFailure(t);
        }
        // 注册Channel;这里的group就是上文group方法设置的NioEventLoopGroup
        ChannelFuture regFuture = config().group().register(channel);
        ...
        return regFuture;
    }

该register方法会调用到io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)方法

    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        // 前文提到的Channel初始化的时候创建的NioMessageUnsafe对象,用来操作底层对象。
        promise.channel().unsafe().register(this, promise);
        return promise;
    }

在Unsafe对象内部最终会调用register0方法(eventLoop.inEventLoop()时同步调用,否则异步调用)

private void register0(ChannelPromise promise) {
            try {
                // ensureOpen返回ServerSocketChannel.isOpen()的结果
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                // 内部调用javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                // 将Channel和Selector相关联起来了,但是由于事件0 并非一个合法事件所以并不会处理事件
                doRegister();
                neverRegistered = false;
                registered = true;

                // 此时已经还未正式设置感兴趣的事件
                // 在此会回调在2也就是初始化init阶段设置的initChannel回调
                // handler方法设置的ChannelHandler与ServerBootstrapAcceptor对象会在此时设置到pipeline
                pipeline.invokeHandlerAddedIfNeeded();
                // promise设置为成功,注册在该promise上的监Listener将会被触发,比如绑定地址的Listener
                safeSetSuccess(promise);

                // 触发ChannelRegistered事件,回调ChannelInboundHandler.channelRegistered方法
                pipeline.fireChannelRegistered();
                // isOpen() && javaChannel().socket().isBound()。也就是ServerSocketChannel打开且Socket已经绑定了地址。
                if (isActive()) {
                    if (firstRegistration) {
                        // 触发ChannelActive事件,回调ChannelInboundHandler.channelActive方法
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // 重新注册且自动读取的情况下需要在此时为doRegister获取的SelectionKey添加readInterestOp(也就是SelectionKey.OP_ACCEPT)事件正式开始处理请求
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // 移除处理,关闭Channel之类的
                ...
            }
        }
  1. 到这里initAndRegister已经处理完成了,在前文已经初始化好了Channel,Selector并将他们关联了,但是Channel还未绑定地址,并且SelectionKey并未设置好感兴趣的事件,目前还是没有意义的0.

接下来需要绑定地址,在前文NIO部分程序可以看到,NIO得register与bind其实是分离得,他们并没有什么强制的先后顺序。不过一个已经绑定地址的Channel实际要求它的handler与Config是需要提前设置好的,因为bind之后实际是可以接收连接了。

    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }
        // 注册完成:指的是javaChannel().register(eventLoop().unwrappedSelector(), 0, this)调用成功
        if (regFuture.isDone()) {
            // 已经注册完成可以直接绑定地址
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // 还未注册完成则需要在注册完成之后绑定地址
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // 如果注册异常则抛错
                        promise.setFailure(cause);
                    } else {
                        promise.registered();
                        // 绑定地址
                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

可以看到,在initAndRegister方法之后都是调用doBind0方法。这是一个异步操作,将逻辑交给eventLoop执行,最终会调用channel.bind()。

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        // 执行绑定逻辑
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

在这里Channel是NioServerSocketChannel对象,其bind会委派给内部绑定的pipeline执行,最终pipeline中默认的Head,也就是HeadContext对象会执行具体的绑定逻辑,而HeadContext会委派给unsafe对象(NioMessageUnsafe)以调用底层逻辑。最终unsafe会执行下面的绑定逻辑:

        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
            assertEventLoop();
            // 已取消或者Channel已关闭则直接返回
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }

            // 非root用户无法接收通配符广播
            if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
                localAddress instanceof InetSocketAddress &&
                !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
                !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
                logger.warn(
                        "A non-root user can't receive a broadcast packet if the socket " +
                        "is not bound to a wildcard address; binding to a non-wildcard " +
                        "address (" + localAddress + ") anyway as requested.");
            }

            boolean wasActive = isActive();
            try {
                // 绑定地址内部调用javaChannel().bind(localAddress, config.getBacklog());或者javaChannel().socket().bind(localAddress, config.getBacklog());
                doBind(localAddress);
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                closeIfClosed();
                return;
            }
            // 如果是doBind后执行则此时已经就绪了(channel已经打开且已经绑定了地址。)则触发ChannelActive事件
            if (!wasActive && isActive()) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireChannelActive();
                    }
                });
            }

            safeSetSuccess(promise);
        }

值得注意的是,在3中 safeSetSuccess(promise)提前设置的,这也是为什么触发channelActive的逻辑在3、4都有的原因。netty大量使用异步处理导致无法按照一种固定代码顺序处理业务。

  1. 前文我们已经初始化了NioServerSocketChannel(绑定有jdk的ServerSocketChannel),初始化了EventLoop(绑定有Selector),将Channel注册到了EventLoop(也就是channel注册到selector,创建SelectionKey对象),Channel也绑定了地址。还差最后一步,selectionkey再在创建的时候设置的事件是0,也就是还没有注册任何事件,现在需要注册OP_ACCEPT事件到selectionkey

设置感兴趣的事件(OP_ACCEPT)是在Channel状态正常流程是第一次变为active状态时(在3是isActive() && firstRegistration 而在4中是!wasActive && isActive())在处理ChanneActive事件时注册的。这个事件自然也是交给pipeline的特殊节点HeadContext处理的:

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) {
            ctx.fireChannelReadComplete();
            // 自动读
            readIfIsAutoRead();
        }

        private void readIfIsAutoRead() {
            // private volatile int autoRead = 1;默认就是自动读
            if (channel.config().isAutoRead()) {
                // 开始读
                channel.read();
            }
        }

io.netty.channel.DefaultChannelConfig#autoRead属性为1开启自动读。和其他操作一样channel.read()也是会委派给pipeline执行,而pipeline自然还是交给节点执行,最终会流转会HeadContext节点的的read方法,而在HeadContext里面依然是交给底层netty的unsafe对象操作:

        public void read(ChannelHandlerContext ctx) {
            unsafe.beginRead();
        }

在内部最终会调用到io.netty.channel.nio.AbstractNioChannel#doBeginRead,在此会将readInterestOp(也就是在初始化Channel的时候设置的OP_ACCEPT)设置到关联时设置为0的SelectionKey对象:

    @Override
    protected void doBeginRead() throws Exception {
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;

        // 添加事件
        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

注意:你可以看到,在3,在非第一次调用且isActive为true你并且开启自动读的情况下beginRead()也被调用了,这说明该通道之前已经注册过并且已经设置了 autoRead()。这意味着我们需要重新开始读取,以便处理传入的数据。最终都是殊途同归调用beginRead()

自此,我们已经完成bind的所有工作准备好接收连接了。连接将会交由io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor处理。后续章节会详解。

connect()

在构造EventLoopGroup的时候创建Selector对象,在initAndRegister的时候Channel对象的时候创建jdk自带的Channel对象,初始化并将Selector和Channel关联到一起,在在Active之后将感兴趣的事件注册。这个大致流程是一样的。在细节上可能有一些变化。

  1. 创建Channel

相较于服务端校验,Bootstrap的handler也是一个必填属性,和ServerBootstrap的ChildHandler一样,毕竟核心框架核心就在ChannelHandler上面。

    @Override
    public Bootstrap validate() {
        super.validate();
        if (config.handler() == null) {
            throw new IllegalStateException("handler not set");
        }
        return this;
    }

校验完成户需要创建NioSocketChannel对象,和上面的NioServerSocketChannel一样,都是使用ChannelFactory通过反射创建的实例。同时他也是一样,通过SelectorProvider.provider().openSocketChannel()打开一个新的SocketChannel对象,他和前文NIO使用SocketChannel.open()的底层调用时一致的

    // 此处的provider是SelectorProvider.provider()返回的对象
    public NioSocketChannel(SelectorProvider provider) {
        this(newSocket(provider));
    }

值得注意的是,在其调用父类构造器的时候设置的默认的readInterestOp是SelectionKey.OP_READ,这意味着在Active时注册的事件将是SelectionKey.OP_READ,而在NioServerSocketChannel设置的是SelectionKey.OP_ACCEPT。

    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        super(parent, ch, SelectionKey.OP_READ);
    }

和前文NioServerSocketChannel一样,创建NioSocketChannel对象会关联pipeline与unsafe对象,也会关联对应的NioSocketChannelConfig对象。值得注意的是这里的unsafe对象是NioSocketChannelUnsafe的实例。

    public NioSocketChannel(Channel parent, SocketChannel socket) {
        super(parent, socket);
        config = new NioSocketChannelConfig(this, socket.socket());
    }

    private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {
        super(channel, javaSocket);
        calculateMaxBytesPerGatheringWrite();
    }
  1. 同样的,在Channel创建完成后需要对Channel对象进行初始化,也是就是将上面Bootstrap设置的attr、option、handler设置到Channel。这里不在赘述。
    @Override
    void init(Channel channel) {
        ChannelPipeline p = channel.pipeline();
        p.addLast(config.handler());

        setChannelOptions(channel, newOptionsArray(), logger);
        setAttributes(channel, newAttributesArray());
    }
  1. 还是同样的,在创建并初始化完成NioSocketChannel对象之后需要将EventLoop内部的Selector对象于NioSocketChannel内部的SocketChannel关联起来(将Channel注册到Selector)。这个过程与上面一致,不在赘述。
ChannelFuture regFuture = config().group().register(channel);

需要注意的是在这里register(channel)会register任务提交到EventLoop,因为EvenLoop关联了Selector,而在此时也就是eventloop.executet提交任务的时候会调用io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread方法,在该方法会向executor提交一个task,该task会执行io.netty.channel.nio.NioEventLoop#run方法

   private void execute(Runnable task, boolean immediate) {
        boolean inEventLoop = inEventLoop();
        addTask(task);
        if (!inEventLoop) {
            // 如果不再eventloop则提交任务,内部会调用SingleThreadEventExecutor#doStartThread方法
            startThread();
            if (isShutdown()) {
                boolean reject = false;
                try {
                    if (removeTask(task)) {
                        reject = true;
                    }
                } catch (UnsupportedOperationException e) {
                    //
                }
                if (reject) {
                    reject();
                }
            }
        }

        if (!addTaskWakesUp && immediate) {
            wakeup(inEventLoop);
        }
    }

而在io.netty.channel.nio.NioEventLoop#run会死循环使用selectNowSupplier调用selector.selectNow()阻塞获取事件,在获取到之后处理selectionKey,processSelectedKeys()。

@Override
    protected void run() {
        int selectCnt = 0;
        for (;;) {
            try {
                ....
                int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                boolean ranTasks;
                if (ioRatio == 100) {
                } else if (strategy > 0) {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {}
                } 
                .....
    }
  1. 接下来,我们需要连接地址操作。

与bind操作不同的是,绑定本地ip与端口不需要做更多操作,而在connect操作需要对地址先进行解析才能连接。所以多了一个域名解析的过程。

private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();

        if (regFuture.isDone()) {
            if (!regFuture.isSuccess()) {
                return regFuture;
            }
            // 解析域名,连接地址
            return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
        } else {
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        promise.setFailure(cause);
                    } else {
                        // 解析域名,连接地址
                        promise.registered();
                        doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

解析域名默认使用jdk内置的InetAddress.getByName(hostname)获取域名对应的IP。在获取到对应的地址后再进行连接操作。

    private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
                                               final SocketAddress localAddress, final ChannelPromise promise) {
        try {
            final EventLoop eventLoop = channel.eventLoop();
            AddressResolver<SocketAddress> resolver;
            try {
                // 这里的this.resolver就是DefaultAddressResolverGroup.INSTANCE
                // 这里的getResolver会调用new DefaultNameResolver(executor).asAddressResolver();
                // 而DefaultNameResolver使用的就是JDK内建的域名服务InetAddress.getByName
                resolver = this.resolver.getResolver(eventLoop);
            } catch (Throwable cause) {
                channel.close();
                return promise.setFailure(cause);
            }

            if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
                // 不支持解析或者已经是ip则直接连接
                doConnect(remoteAddress, localAddress, promise);
                return promise;
            }

            final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);

            if (resolveFuture.isDone()) {
                final Throwable resolveFailureCause = resolveFuture.cause();

                if (resolveFailureCause != null) {
                    channel.close();
                    promise.setFailure(resolveFailureCause);
                } else {
                    // 连接
                    doConnect(resolveFuture.getNow(), localAddress, promise);
                }
                return promise;
            }

            // 等待域名解析完成后连接
            resolveFuture.addListener(new FutureListener<SocketAddress>() {
                @Override
                public void operationComplete(Future<SocketAddress> future) throws Exception {
                    if (future.cause() != null) {
                        channel.close();
                        promise.setFailure(future.cause());
                    } else {
                        doConnect(future.getNow(), localAddress, promise);
                    }
                }
            });
        } catch (Throwable cause) {
            promise.tryFailure(cause);
        }
        return promise;
    }

连接操作依然是一样的套路:调用NioSocketChannel对象connect方法,Channel对象委派给pipeline,pipeline默认使用tail节点connect方法连接,而默认没有重写connect方法的情况下,会向前查找支持connect方法的节点,最终都会回到之前说的特殊的HeadContext节点,而这个特殊的HeadContext节点的connect操作自然还是调用unsafe的connect方法来连接。

unsafe.connect(remoteAddress, localAddress, promise);

其流程:连接然后设置对应的promise对象为success,如果是active状态则触发channeActive事件。流程不再赘述这里只看连接操作

    @Override
    protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
        if (localAddress != null) {
            // 和前文一样都是channel调用bind方法,我们的案例此处localAddress是null
            doBind0(localAddress);
        }

        boolean success = false;
        try {
            // 内部socketChannel.connect(remoteAddress) 连接地址
            boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
            if (!connected) {
                // 如果连接没有连接成功,添加OP_CONNECT事件等待后续触发OP_CONNECT
                selectionKey().interestOps(SelectionKey.OP_CONNECT);
            }
            success = true;
            return connected;
        } finally {
            if (!success) {
                doClose();
            }
        }
    }

前面提到在active事件触发会将SelectionKey.OP_READ注册到SelectionKey,而在此处,如果第一时间没有获取到连接成功结果则将OP_CONNECT注册进去了,而如果已经连接成功则只需要关注OP_READ事件就行了。


                boolean wasActive = isActive();
                // 上面的连接逻辑
                if (doConnect(remoteAddress, localAddress)) {
                    // 如果第一时间连接成功了则promise设置为success,触发channelActive事件
                    fulfillConnectPromise(promise, wasActive);
                } else {
                    connectPromise = promise;
                    requestedRemoteAddress = remoteAddress;

                    // 如果第一时间没有连接成功则需要检查是否连接超时
                    int connectTimeoutMillis = config().getConnectTimeoutMillis();
                    if (connectTimeoutMillis > 0) {
                        connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                            @Override
                            public void run() {
                                ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                                if (connectPromise != null && !connectPromise.isDone()
                                        && connectPromise.tryFailure(new ConnectTimeoutException(
                                                "connection timed out: " + remoteAddress))) {
                                    // 如果超时则设置promise为失败且关闭Channel
                                    close(voidPromise());
                                }
                            }
                        }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
                    }

                    promise.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            // 如果取消也需要关闭channel
                            if (future.isCancelled()) {
                                if (connectTimeoutFuture != null) {
                                    connectTimeoutFuture.cancel(false);
                                }
                                connectPromise = null;
                                close(voidPromise());
                            }
                        }
                    });
                }

我们可以看到在第一时间连接成功的情况下会通过fulfillConnectPromise触发channelActive事件从而将OP_READ设置到SelectionKey,而在需要第一时间无法连接的情况下是从哪里调用的fulfillConnectPromise呢?为了探索这个问题我们需要回到3,在3中提到在register阶段提交了一个task死循环selectNow获取事件,然后处理SelectionKey。

        try {
            int readyOps = k.readyOps();
            // 处理OP_CONNECT,
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // 首先去除OP_CONNECT这个事件不然会一直返回OP_CONNECT
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                // 
                unsafe.finishConnect();
            }

            // 处理OP_WRITE
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                ch.unsafe().forceFlush();
            }

            // 处理OP_READ与OP_ACCEPT
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }

在unsafe.finishConnect()会回调的fulfillConnectPromise

        @Override
        public final void finishConnect() {
            assert eventLoop().inEventLoop();

            try {
                boolean wasActive = isActive();
                doFinishConnect();
                // 在内部触发了channelactive事件
                fulfillConnectPromise(connectPromise, wasActive);
            } catch (Throwable t) {
                // 在内部触发了channelactive事件
                fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
            } finally {}
        }
  1. 接下来就是处理channelactive事件将OP_READ注册到SelectionKey对象。这里不再赘述。

请求与响应处理流程

上文我们已经解析完了connect与bind,我们已经准备好接收请求或者已经连接上了服务。接下来就是处理连接处理数据的过程。

触发事件

前文解析connect的第3部分提到,在register的时候会添加一个task死循环selectNow获取OP,获取到以后processSelectedKeys()。在前文connect第4部分可以看到:

  • OP_CONNECT会回调unsafe.finishConnect()处理
  • OP_WRITE会回调ch.unsafe().forceFlush()处理
  • OP_READ与OP_ACCEPT会回调unsafe.read()处理
unsafe.finishConnect()前文已经讲过了,其最重要的逻辑就是触发了ChannelActive事件(fireChannelActive)使得OP_READ可以加入到SelectionKey当中这里不在赘述。
unsafe.read()在前文有讲到,对于NioServerSocketChannel对象来说其关联的是NioMessageUnsafe对象,而NioSocketChannel对象说,其关联的是一个NioSocketChannelUnsafe对象,他们对read的处理是不一样的。

NioMessageUnsafe是这样处理的:首先需要读取数据到readBuf,然后触发读取事件交给pipeline处理数据readBuf里面的数据,在所有的数据处理完成后触发读取完成事件,最后如果在处理过程中发生了异常这需要触发异常捕获事件。

        @Override
        public void read() {
            ...
            try {
                try {
                    do {
                        // 读取出数据放到readBuf (List<Object>)里面,在服务端会收到SocketChannel
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }

                        allocHandle.incMessagesRead(localRead);
                    } while (continueReading(allocHandle));
                } catch (Throwable t) {}
                ...
                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    // 触发读取事件
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                readBuf.clear();
                allocHandle.readComplete();
                // 触发读取完成事件
                pipeline.fireChannelReadComplete();

                if (exception != null) {
                    closed = closeOnReadError(exception);
                    // 触发异常事件
                    pipeline.fireExceptionCaught(exception);
                }
                ...
            } finally {...}
        }
    }

不同的Channel的doReadMessages实现是不一样的没,对于NioServerSocketChannel来说,首先它处理的肯定是OP_ACCEPT,收到该事件之后需要接受到对应的SocketChannel(jdk的),然后将其包装成NioSocketChannel对象,并将他们添加到readBuf里面。最终通过触发的read事件将NioSocketChannel对象交由ServerBootstrapAcceptor处理,处理事件部分将在后展开说明。在此处我们需要注意的是在前文提到过的NioSocketChannel对象在默认需要注册的感兴趣的事件是OP_READ。很显然在服务端是不需要通过OP_CONNECT触发ChannelActive从而触发将OP_READ注册到SelectionKey的逻辑的。

    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = SocketUtils.accept(javaChannel());

        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {...}

        return 0;
    }

NioSocketChannelUnsafe是这样处理的:首先对于NioSocketChannel来说,它处理的肯定是OP_READ,获取到Buffer分配器之后分配Buffer然后从Channel中读取数据,每次读取触发ChannelRead事件,知道全部读取完成,全部读取完成后触发读取完成事件。

@Override
        public final void read() {
            ...
            final ChannelPipeline pipeline = pipeline();
            // ByteBuf分配器,此处是PooledByteBufAllocator
            final ByteBufAllocator allocator = config.getAllocator();
            // 接收数据处理,用来判断是否需要继续读,读了多少次多少数据等等
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    // 分配ByteBuf在此是PooledUnsafeDirectByteBuf内部使用DirectByteBuffer存储数据
                    byteBuf = allocHandle.allocate(allocator);
                    // 从channel中读取数据,最终会和NIO一样调用Channel.read(ByteBuffer)方法
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    // 什么也没有读到
                    if (allocHandle.lastBytesRead() <= 0) {
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        if (close) {
                            // 已经没有可读的内容,因为我们收到了 EOF(文件结束符).
                            readPending = false;
                        }
                        break;
                    }

                    allocHandle.incMessagesRead(1);
                    readPending = false;
                    // 触发读事件
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;

                // 是否继续读:如果上次读取到的数据恰好等于Buf长度(也就是上次读满了)
                // 则表示还要继续读
                } while (allocHandle.continueReading());

                allocHandle.readComplete();
                // 触发ChannelReadComplete事件
                pipeline.fireChannelReadComplete();

                if (close) {
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {...} finally {...}
        }

至此,我们已经触发了事件,这些事件都是交给前文配置的ChannelHandler处理的。

处理事件

pipeline.fireXXX方法用来触发XXX事件,与bind和connect这样的非事件处理不同,事件处理是从head节点向下传播的,而bind,connect和close这样的底层操作是从tail节点向上传播的。前文提到ChannelHandler的编排顺序会影响编解码,像bind,connect和write这样的是从tail尾节点向上一步步处理的,是一个逆序过程,而对于read这样的是从head头节点一步步向下处理的,这是一个正序过程。

ChannelHandler

ChannelHandler派生了ChannelInboundHandler和ChannelOutboundHandler两个方向的处理器用来处理管道两端的操作。
我们处理数据经常需要使用到的的ByteToMessageDecoder、MessageToMessageDecoder和SimpleChannelInboundHandler就属于ChannelInboundHandler。MessageToMessageEncoder属于ChannelOutboundHandler。还有一个ChannelDuplexHandler属于双工的Handler,在我们的Handler既可以处理读又可以处理写的情况下可以派生它。

ChannelPipeline与ChannelHandlerContext

ChannelPipeline是在Channel构造的时候创建并于Channel相绑定的,而ChannelHandlerContext是在向pipeline添加ChannelHandler时创建并添加到pipeline链表的。上文提到的这个pipeline.addLast就是添加过程

                        ch.pipeline()
                                // 解码
                                .addLast(new SslHandler(sslEngine))
                                .addLast(new FrameDecoder())
                                .addLast(new FrameEncoder())
                                .addLast(new MessageDecoder())
                                .addLast(new MessageEncoder())
                                // 添加日志
                                .addLast(new LoggingHandler(LogLevel.DEBUG))
                                .addLast(new MsgHandler())
                                .addLast(new IdleStateHandler(10,0,0))
                                .addLast(new HeartbeatEventHandler(false,null));

在addLast内部,会将ChannelHandler对象包装成ChannelHandlerContext,然后将这个context对象添加到pipeline链表中,操作完成后会回调这个ChannelHandler对象的handlerAdded方法。

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);
            // 构建ChannelHandlerContext
            // 这里是new DefaultChannelHandlerContext(this, childExecutor(group), name, handler)
            newCtx = newContext(group, filterName(name, handler), handler);
            // 添加到链表
            addLast0(newCtx);

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                // 执行添加成功的回调handler().handlerAdded(this);
                callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            }
        }
        // 执行添加成功的回调handler().handlerAdded(this);
        callHandlerAdded0(newCtx);
        return this;
    }

前文中有提到ChannelPipline在默认情况下AbstractChannel我们绑定的是DefaultChannelPipeline,他是随着Channel构造而创建并绑定到Channel的。DefaultChannelPipeline维护着一个双向链表,内部持有头尾节点,其默认的头节点HeadContext是个特殊的ChannelHandlerContext,负责回调底层方法,在HeadContext内部持有Netty的Unsafe对象(和pipeline一样,Unsafe对象也是在创建构造Channel的时候创建并绑定到Channel的)。

    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        // 自动开启读写
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

在channel.bind或者channel.connect等等方法被调用后,都会委派给内部绑定的pipeline的bind与connect方法,然后再链表处理过程中都会调用到HeadContext的对应方法,上面的channelActive事件在HeadContext的处理就是会回调channel.read()最终又会执行HeadContext的read方法,最终调用unsafe.beginRead()在beginRead()里面将readInterestOp绑定到SelectionKey。

        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            ctx.fireChannelActive();

            readIfIsAutoRead();
        }

        @Override
        public void bind(
                ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
            unsafe.bind(localAddress, promise);
        }

        @Override
        public void connect(
                ChannelHandlerContext ctx,
                SocketAddress remoteAddress, SocketAddress localAddress,
                ChannelPromise promise) {
            unsafe.connect(remoteAddress, localAddress, promise);
        }

        @Override
        public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
            unsafe.disconnect(promise);
        }

        @Override
        public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
            unsafe.close(promise);
        }

        @Override
        public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
            unsafe.deregister(promise);
        }

        @Override
        public void read(ChannelHandlerContext ctx) {
            unsafe.beginRead();
        }

        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
            unsafe.write(msg, promise);
        }

        @Override
        public void flush(ChannelHandlerContext ctx) {
            unsafe.flush();
        }


        private void readIfIsAutoRead() {
            if (channel.config().isAutoRead()) {
                channel.read();
            }
        }

ServerBootstrapAcceptor也是一个特殊的ChannelHandler,他是在初始化的时候添加到NioServerSocketChannel的pipeline的,用来处理accept到的NioSocketChannel。前文提到,在接受到OP_ACCEPT之后最终会触发ChannelRead处理这个事件接受到的Channel连接(fireChannelRead),最终会调用ServerBootstrapAcceptor.channelRead方法。

ServerBootstrapAcceptor处理该事件首先讲配置ServerBootstrap阶段设置的childHandler、childOption和childAttr设置到接受到的NioSocketChannel对象(childHandler设置到其绑定的pipeline)。然后向childGroup注册该channel(也是就是将jdk的SocketChannel对象注册到EventLoop绑定的Selector对象,),注册完成生成SelectionKey之后调用pipeline.fireChannelActive(),最终HeadContext将NioSocketChannel初始化的设置的readInterestOp(OP_READ)设置到selectionkey。

    private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
        ...
        @Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;
            // childHandler添加到pipeline
            child.pipeline().addLast(childHandler);
            // 设置options
            setChannelOptions(child, childOptions, logger);
            // 设置attr
            setAttributes(child, childAttrs);

            try {
                // 注册channel,触发ChannelActive
                childGroup.register(child).addListener(...);
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }
        ...
    }

常规ChannelHandler由于需要向下传播,我们需要使用到ByteToMessageDecoder,MessageToMessageDecoder和MessageToMessageEncoder的编解码后的数据向下游ChannelHandler的传播特性,所以我们会通常选择继承他们实现他们的encode与decode方法以实现我们的业务。以read为例,回调channelRead后,会先获取CodecOutputList对象(该对象用来承接decode解码后的数据),然后判断该msg的数据类型是否和当前handler所支持的类型参数一致,如果不一致则将msg放入CodecOutputList,如果一致则调用decode方法解码,解码完成释放msg,如果解码后的CodecOutputList有数据则表示需要下游处理,则会调用下游channelRead方法,如此便能一步一步处理数据与业务。

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 用来存储需要向下游传播的数据
        CodecOutputList out = CodecOutputList.newInstance();
        try {
            // 判断当前handler是否支持该msg
            if (acceptInboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                try {
                    decode(ctx, cast, out);
                } finally {
                    // 释放对象
                    ReferenceCountUtil.release(cast);
                }
            } else {
                // 不支持则向下传播
                out.add(msg);
            }
        } catch {...} finally {
            try {
                int size = out.size();
                for (int i = 0; i < size; i++) {
                    // 向下游传播,触发下游读事件
                    ctx.fireChannelRead(out.getUnsafe(i));
                }
            } finally {
                // 回收
                out.recycle();
            }
        }
    }

读写都是一个套路,前文提到在收到OP_READ之后会读出数据放到ByteBuf触发ChannelRead处理,这是一个将ByteBuf处理成POJO对象然后对这个对象进行业务处理的过程。而写数据是一个相反的过程,他是一个将业务对象处理成ByteBuf然后写入到jdk的SocketChannel的过程,写入channel这样的底层操作自然又是由HeadContext使用Unsafe操作的。unsafe.flush会触发真实的写入channel的操作(NioSocketChannel.doWrite)。

至此我们已经清楚了Netty的处理流程。


Netty高性能原理

前文我们知道了Netty高扩展性来自于其优秀的设计,定义ChannelOption、ChannelAttribute与ChannelHandler使得Netty有着良好的标准化接口,有完善的生命周期,使得我们在使用Netty的时候不需要知道更多NIO细节就可以编写出功能良好的网络服务。我们常说Netty是一个高性能的框架,那它的高性能又来自于哪里呢。

NIO

首先就是它使用NIO作为其底层实现。虽然Netty内部有 OIO / NIO / EPOLL / KQUEUE 四类实现,但是OIO由于是BIO实现在现实中不可能使用,EPOLL由于不能跨平台在实际生产中也很少使用,而KQUEUE还不稳定,所以综合来说使用NIO几乎是必然选择。NIO的性能优势在前文已有表述,总的来说就是IO多路复用与非阻塞,这里不在赘述。

内存

这个包括两个方面,一是直接内存的使用,这个在前文NIO例子中ByteBuffer.allocateDirect就已经使用到了直接内存,Netty对其做了封装,暴露出来的类就是ByteBuf。第二个方面是缓冲技术,池化技术,零拷贝技术的使用。

首先就是缓冲技术的使用,在Unsafe类里面有一个属性ChannelOutboundBuffer outboundBuffer,这是一个链表,每次调用unsafe.write方法都是向后添加一个节点,而不是真正写入channel,调用unsafe.flush的时候才会调用flush0最终调用Channel的write将缓冲里面的数据写入到Channel。这使得不用每次写入数据都真实写入到SocketChannel,大大提高了写入性能。

private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
        public final void write(Object msg, ChannelPromise promise) {
            ...
            // 添加一个节点
            outboundBuffer.addMessage(msg, size, promise);
        }
        protected void flush0() {
            ...
            // 前文提到的正式写入channel
            try {
                doWrite(outboundBuffer);
            } catch (Throwable t) {
                handleWriteError(t);
            } finally {
                inFlush0 = false;
            }
        }

只有在调用了flush的情况下才会真正写入到SocketChannel。当ChannelOutboundBuffer的未flush数据大于我们配置的高水位(ChannelConfig里面的WriteBufferHighWaterMark)的时候会触发ChannelWritabilityChanged事件,一般情况下我们需要处理这个事件,判断是否可写(ctx.channel().isWritable())然后调用ctx.flush()。在我们的例子中没有接受这个事件,因为我们直接调用的writeAndFlush。

    public void addMessage(Object msg, int size, ChannelPromise promise) {
        Entry entry = Entry.newInstance(msg, size, total(msg), promise);
        if (tailEntry == null) {
            flushedEntry = null;
        } else {
            Entry tail = tailEntry;
            tail.next = entry;
        }
        tailEntry = entry;
        if (unflushedEntry == null) {
            unflushedEntry = entry;
        }

        // 增加未flush的字节数
        incrementPendingOutboundBytes(entry.pendingSize, false);
    }
    private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
        if (size == 0) {
            return;
        }

        long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
        // 如果未flush字节数达到高水位则设置为不可写状态
        if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
            setUnwritable(invokeLater);
        }
    }
    private void setUnwritable(boolean invokeLater) {
        for (;;) {
            final int oldValue = unwritable;
            final int newValue = oldValue | 1;
            if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
                if (oldValue == 0) {
                    // 触发写状态变更
                    fireChannelWritabilityChanged(invokeLater);
                }
                break;
            }
        }
    }

池化技术在Netty上大量被使用。首先是默认的PooledByteBufAllocator的内存分配器,它实现了jemalloc。在首先是内存分配,首先获取当前线程的内存池,这里是分配直接内存,所以使用directArena直接内存区域(如果是堆内存则使用heapArena),如果该内存池不存在则分配非池化内存。

    @Override
    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
        // 获取当前线程的内存缓存
        PoolThreadCache cache = threadCache.get();
        // 获取直接内存区
        PoolArena<ByteBuffer> directArena = cache.directArena;

        final ByteBuf buf;
        if (directArena != null) {
            //从内存池分配内存
            buf = directArena.allocate(cache, initialCapacity, maxCapacity);
        } else {
            // 直接分配内存
            buf = PlatformDependent.hasUnsafe() ?
                    UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                    new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }
        return toLeakAwareBuffer(buf);
    }

directArena将内存分为三种规格,small、normal和huge(在早期版本还有tiny,现在已经合并到small了),在分配内存时遵循以下策略小内存0到8K使用small内存分配,8k到16m使用normal分配,大于16m则由huge分配。

    private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
        // 计算大小
        final int sizeIdx = size2SizeIdx(reqCapacity);

        if (sizeIdx <= smallMaxSizeIdx) {
            tcacheAllocateSmall(cache, buf, reqCapacity, sizeIdx);
        } else if (sizeIdx < nSizes) {
            tcacheAllocateNormal(cache, buf, reqCapacity, sizeIdx);
        } else {
            int normCapacity = directMemoryCacheAlignment > 0
                    ? normalizeSize(reqCapacity) : reqCapacity;
            // 大内存不走缓存
            allocateHuge(buf, normCapacity);
        }
    }

directArena在分配的时候为了提高内存使用率与效率,定义了q075,q000,q025,q050,qInit,q100六个级别的PoolChunkList,分别表示[75%,100%)的使用率,[1,50)的使用率,[25,75)的使用率[50,100)的使用率和[0,25)的使用率和100%的使用率,它无法再分配内存只能等待回收。他们的迁移顺序是qInit、q000,q025,q050,q075,除了qInit,在小于使用率最小值时会向左迁移,除了q100,其他PoolChunk在大于最大值时会向右迁移。他们的分配顺序如下:首先q050尝试分配然后q025,q000,qInit,q075依次尝试分配,如果全部没空闲内存则需要新建一个Chunk申请内存后放入到chunkList.

    private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache threadCache) {
        // 尝试从内存池分配
        if (q050.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
            q025.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
            q000.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
            qInit.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
            q075.allocate(buf, reqCapacity, sizeIdx, threadCache)) {
            return;
        }

        // 增加新的chunk
        PoolChunk<T> c = newChunk(pageSize, nPSizes, pageShifts, chunkSize);
        boolean success = c.allocate(buf, reqCapacity, sizeIdx, threadCache);
        assert success;
        // 放入chunklist,此处可能发生迁移
        qInit.add(c);
    }

Netty将一大块内存抽象为PoolChunk对象,PoolChunk对象内部使用PoolSubpage[]管理该内存实际使用,内部使用Deque缓存内存避免GC。使用为每个IO线程绑定一个PoolThreadCache减少资源竞争,使用内存池避免重复申请和回收内存,内存分类提高内存使用率。最终我们使用完ByteBuf后都会显式调用ReferenceCountUtil.release(msg)(ByteBuf实现了ReferenceCounted)将该对象释放,内存会被归还到内存池。netty最复杂的部分就是这个内存管理,这里只是浅谈,详细请阅读jemalloc论文。

零拷贝,一方面是使用直接内存不需要将数据内存拷贝到堆区或者从堆复制到内存少了一次拷贝,另一方面是CompositeByteBuf这样的类可以组合多个ByteBuf从而在多个ByteBuf合并的时候不需要进行复制直接组合就行了,再一方面就是ByteBuffer的duplicate(复制),slice(分割)等方法无需申请新的内存而是使用原有内存产生新引用,这样在拷贝数据的时候就少了一次数据复制,再再一方面就是FileChannel.transfer()调用在操作系统层面可以将文件缓冲区的数据直接发送到到Channel(从Channel到Channel的数据传输),不使用byte[]作为中间临时数据中转,大大提高了效率。

    // For duplicates and slices
    DirectByteBuffer(DirectBuffer db,         // package-private
                               int mark, int pos, int lim, int cap,
                               int off){
        super(mark, pos, lim, cap);
        // 复用地址
        address = db.address() + off;
        cleaner = null;
        att = db;
    }

Reactor

在前文我们有提到,Netty使用reactor模型,在处理服务端连接的时候需要为其设置两个EventLoopGroup线程池(默认大小是 2*处理器数量),boss线程池用来处理NioServerSocketChannel的事件,主要是OP_ACCEPT,worker线程池用来处理接受到的NioSocketChannel事件,主要是OP_READ。这使得我们分离了事件分发与事件处理,提高了吞吐。


其他

以下是一些可跳过的补充

序列化

我们不可能使用java自带的序列化。序列化与反序化应当是协议的一部分,这里只是稍微提一嘴,所谓序列化就是将对象变成byte[]的过程,所谓反序列化就是将byte[]变成对象的过程,我们通常有以下考量:性能、可读、内存占用。在所谓性能就是序列化与反序列化的速度、所谓内存占用就是序列化后字节数组的大小、可读在排错的时候会有大用处。常规情况下我们使用JSON格式就ok了,虽然性能与内存占用不咋地,但是在前后端都能识别,且一旦出错容易排查与手动构建消息。protobuf也是常用的序列化方式,他的性能很优秀且内存占用也很低,在内部系统交换信息的时候是很值得考虑这个方式的。再有就是说明kryo、Thrift之类的就不再介绍了,工作这么久还没有遇到使用json和protobuf以外的公司。

TCP窗口与背压

TCP窗口解决了每次发送需要等待上次发送结果的问题,在一次TCP每次确认消息会携带有win字段用来表示接受端的TCP可用窗口大小,则在发送端下个包最大只能发送这么大的数据,当有大量的数据发送时,如果服务端处理数据处理不过来则会返回0窗口大小的消息,此时发送端就会停止发送数据,数据无法发送则在客户端线程积压,这样就形成了背压Backpressure,服务端的状态被客户端感知到了(消费端状态被生产端感知到了),在客户端接受到这个信号后就应该对降低生产速率给服务端时间消费掉积压的数据。

总结

本文从FD开始讲起,比较了BIO、NIO与AIO的差异与优缺点,了解了IO多路复用的原理与它与FD的关系,知晓了NIO时Java对于IO多路复用的封装。而后详解了Netty的使用和原理。
总的来说:netty的过程是这样的,在创建EventLoopGroup的时候会创建绑定在EventLoop上的Selector对象,在调用bind或者connect的时候会创建netty的Channel对象并对Channel对象初始化,创建对象的过程创建并绑定Config对象与Pipeline对象和底层的jdk Channel对象,在初始化channel过程会将options、attribute和handler设置进去,然后将channel注册(register)到EventLoop,注册过程中会将jdk channel注册到selector,在注册后,内部会向eventloop提交一个死循环selectNow获取事件的任务分发给各个channel方法处理;然后正式bind或者connect连接一个绑定一个本地地址或者连接远端地址;注册和连接完成后会触发ChannelActive事件,如果是客户端会在ChannelActive事件将OP_READ添加到注册时生成的selectionkey,如果是服务端会将OP_ACCEPT添加到selectionkey。由于在注册后有一个死循环任务会一直selectNow获取事件,然后分发到channel对应的处理方法,在channelactive一切就绪之后就可以处理。
这样我们已经将Netty与NIO连起来了,而ServerSocketChannel,SocketChannel通过封装系统调用实现NIO,这样就和操作系统连起来了。
在你需要使用Java做服务端网络编程的时候,netty几乎是最优解,性能强大上手简单,使用人多资料齐全。

注意:转载请表明作者与出处。

参考:
[1] Netty权威指南(第二版)
[2] Netty源码剖析与应用
[3] openjdk源码
[4] netty源码

标签: Java, netty

已有 2 条评论

  1. 怎么收藏这篇文章?

  2. 博主真是太厉害了!!!

评论已关闭