当前位置 博文首页 > huansky:Okio源码分析

    huansky:Okio源码分析

    作者:huansky 时间:2021-01-31 00:21

    概述

    Okio 作为 Okhttp 底层 io 库,它补充了 java.io 和 java.nio 的不足,使访问、存储和处理数据更加容易。Okio 的特点如下:
    • okio 是一个由 square 公司开发的开源库,它弥补了 Java.io 和 java.nio 的不足,能够更方便快速的读取、存储和处理数据。

    • okio 有自己的流类型 Source 和 Sink,对应于 java.io 的 InputStream 和 OutputStream。

    • okio 内部引入了 ByteString 和 Buffer,提升了效率和性能。

    • okio 引入了超时机制。

    • okio 规模不大,代码精巧,是源码学习的好素材 
    强烈建议大家阅读 okio 的文档说明:https://square.github.io/okio/ 。本文代码介绍基于版本 1.17.4。

    流(Stream)

    是指在计算机的输入输出操作中各部件之间的数据流动。按照数据的传输方向,流可分为输入流与输出流。Java语言里的流序列中的数据既可以是未经加工的原始二进制数据,也可以是经过一定编码处理后符合某种特定格式的数据。

    1.输入输出流

    在Java中,把不同类型的输入输出源抽象为流,其中输入和输出的数据称为数据流(Data Stream)。数据流是Java程序发送和接收数据的一个通道,数据流中包括输入流(Input Stream)和输出流(Output Stream)。通常应用程序中使用输入流读出数据,输出流写入数据。 流式输入、输出的特点是数据的获取和发送均沿数据序列顺序进行。相对于程序来说,输出流是往存储介质或数据通道写入数据,而输入流是从存储介质或数据通道中读取数据,一般来说关于流的特性有下面几点:

    • 先进先出,最先写入输出流的数据最先被输入流读取到。

    • 顺序存取,可以一个接一个地往流中写入一串字节,读出时也将按写入顺序读取一串字节,不能随机访问中间的数据。

    • 只读或只写,每个流只能是输入流或输出流的一种,不能同时具备两个功能,在一个数据传输通道中,如果既要写入数据,又要读取数据,则要分别提供两个流。

    2.缓冲流

    为了提高数据的传输效率,引入了缓冲流(Buffered Stream)的概念,即为一个流配备一个缓冲区(Buffer),一个缓冲区就是专门用于传送数据的一块内存。

    当向一个缓冲流写入数据时,系统将数据发送到缓冲区,而不是直接发送到外部设备。缓冲区自动记录数据,当缓冲区满时,系统将数据全部发送到相应的外部设备。当从一个缓冲流中读取数据时,系统实际是从缓冲区中读取数据,当缓冲区为空时,系统就会从相关外部设备自动读取数据,并读取尽可能多的数据填满缓冲区。 使用数据流来处理输入输出的目的是使程序的输入输出操作独立于相关设备,由于程序不需关注具体设备实现的细节(具体细节由系统处理),所以对于各种输入输出设备,只要针对流做处理即可,不需修改源程序,从而增强了程序的可移植性。

    Okio 关键类介绍

    ByteStrings and Buffers

    Okio 是围绕这两种类型构建的,它们将大量功能打包到一个简单的 API 中:

    • ByteString 是不可变的字节序列。对于字符数据,最基本的就是 String。而 ByteString 就像是 String 的兄弟一般,它使得将二进制数据作为一个变量值变得容易。这个类很聪明:它知道如何将自己编码和解码为十六进制、base64 和 utf-8。

    • Buffer 是一个可变的字节序列。像 Arraylist 一样,你不需要预先设置缓冲区的大小。你可以将缓冲区读写为一个队列:将数据写到队尾,然后从队头读取。

    在内部,ByteStringBuffer做了一些聪明的事情来节省CPU和内存。如果您将UTF-8字符串编码为ByteString,它会缓存对该字符串的引用,这样,如果您稍后对其进行解码,就不需要做任何工作。

    Buffer 是作为片段的链表实现的。当您将数据从一个缓冲区移动到另一个缓冲区时,它会重新分配片段的持有关系,而不是跨片段复制数据。这对多线程特别有用:与网络交互的子线程可以与工作线程交换数据,而无需任何复制或多余的操作。

    Sources and Sinks

    java.io 设计的一个优雅部分是如何对流进行分层来处理加密和压缩等转换。Okio 有自己的 stream 类型: Source 和 Sink,分别类似于 java 的 Inputstream Outputstream,但是有一些关键区别:

    • 超时(Timeouts)。流提供了对底层 I/O 超时机制的访问。与java.io 的 socket 字流不同,read() 和 write() 方法都给予超时机制。

    • 易于实施。source 只声明了三个方法:read()close() 和 timeout()。没有像available()或单字节读取这样会导致正确性和性能意外的危险操作。

    • 使用方便。虽然 source 和 sink 的实现只有三种方法可写,但是调用方可以实现 Bufferedsource 和 Bufferedsink 接口, 这两个接口提供了丰富API能够满足你所需的一切。

    • 字节流和字符流之间没有人为的区别。都是数据。你可以以字节、UTF-8 字符串、big-endian 的32位整数、little-endian 的短整数等任何你想要的形式进行读写;不再有InputStreamReader

    • 易于测试。Buffer 类同时实现了 BufferedSource 和 BufferedSink 接口,即是 source 也是 sink,因此测试代码简单明了。

    Sources 和 Sinks 分别与 InputStream 和 OutputStream 交互操作。你可以将任何 Source 看做 InputStream ,也可以将任何 InputStream 当做 Source。对于 Sink 和 Outputstream 也是如此。

    Segment

    Segment在 Okio 中作为数据缓冲的载体,一个 Segment 的数据缓冲大小为 8192,即 8k。每一个 Segment 都有前驱和后继结点,也就是说 Sement 是一个双向链表链表,准确的来说是一个双向循环链表。读取数据从 Segment 头结点读取,写数据从 Segment 尾结点写。

    Okio 中引入池的概念也就是源码中SegmentPool的实现。SegmentPool 负责 Segment 创建和销毁,SegmentPool 最大可以缓存 8 个 Segment。

    SegmentPool 是一个静态方法,因此也就是全局缓存只有 64 kb;

    整体设计

    前面说了介绍了很多关键的类,下面看下 Okio 的整体设计:

     图片摘自 Okio源码分析

     

     

    通过类图来看,整体设计是很简单明了的,可以结合前面介绍的关键类,这样你会更加理解这个设计图。

    Okio 读写流程

    在介绍 Okio 的读写流程的时候,还是得提一下一个关键的类:Okio。

    Okio 类是工具类,内部提供了很多静态方法,方便大家调用,减少大家写了很多重复的代码,使得整个调用变得更加简单。

    读文本文件

     public void readLines(File file) throws IOException {
          Source fileSource = Okio.source(file);
          BufferedSource bufferedSource = Okio.buffer(fileSource);
          for (String line; (line = bufferedSource.readUtf8Line()) != null; ) {
              System.out.println(line);
          }
          bufferedSource.close();
      }

    这个示例代码是用来读取文本文件的,Okio 通过 Okio.source(File) 的方式来读取文件流,它返回的是一个 Source 对象,但是 Source 对象的方法是比较少的(只有3个),因此 Okio 提供了一个装饰者对象接口 BufferedSource,通过 Okio.buffer(fileSource) 来生成,这个方法内部实际会生成一个 RealBufferedSource 类对象,RealBufferedSource 内部持有Buffer缓冲对象可使 IO 速度更快,该类实现了BufferedSource接口,而 BufferedSource 接口提供了大量丰富的接口方法:

        

    可以看到,几乎你想从输入流中读取任何的数据类型都可以,而不需要你自己去转换,可以说是非常强大而且人性化了,除了 read 方法以外,还有一些别的方法,可以说几乎可以满足很多需求。

    在上面的示例代码中,打开输入流对象的方法需要负责关闭对象资源,调用 close 方法,Okio 官方推荐使用 java 的 try-with-source 语法,上面示例代码可以写成下面这样:

    public void readLines(File file) throws IOException {
          try (BufferedSource bufferedSource = Okio.buffer(Okio.source(file))) {
               for (String line; (line = bufferedSource.readUtf8Line()) != null; ) {
                  System.out.println(line);
               }
          }
      }

    try-with-source 是 jdk1.4 开始提供的语法糖,在 try 语句 () 里面的资源对象,jdk 最终会自动调用它的 close 方法去关闭它, 即便 try 里有多个资源对象也是可以的,这样就不用你手动去关闭资源了。但是在 android 里面使用的话,会提示你要求 API level 最低为 19 才可以。 

    写文本文件 

    public void writeEnv(File file) throws IOException {
      try (BufferedSink sink = Okio.buffer(Okio.sink(file))) {
           sink.writeUtf8("啊啊啊")
                 .writeUtf8("=")
              .writeUtf8("aaa")
              .writeUtf8("\n");
      }
    }

    其中 Okio.buffer(fileSink) 内部返回的实现对象是一个 RealBufferedSink 类的对象, 跟 RealBufferedSource一 样它也是一个装饰者对象,具备 Buffer 缓冲功能。

    类似于读文件使用 Source 和 BufferedSource, 写文件的话,则是使用的 Sink 和 BufferedSink,同样的在 BufferedSink 接口中也提供了丰富的接口方法,这里就不展开了,具体可以查看代码。

    此处再次强烈建议去阅读官方文档:https://square.github.io/okio/ 。

    源码分析

    通过上面的介绍,大家对 Okio 的读取有了一个基本的了解。下面开始进入源码分析,深入去研究其实现,再介绍源码的时候,会先对一些接口做一些简单的介绍。

    Source & Sink 

    public interface Sink extends Closeable, Flushable {
      /** Removes {@code byteCount} bytes from {@code source} and appends them to this. */  从 source 中获取到的数据添加到 sink 自身
      void write(Buffer source, long byteCount) throws IOException;
    
      /** Pushes all buffered bytes to their final destination. */
      @Override void flush() throws IOException;
    
      /** Returns the timeout for this sink. */
      Timeout timeout();
    
      @Override void close() throws IOException;
    }
    
    public interface Source extends Closeable {
      /**
       * Removes at least 1, and up to {@code byteCount} bytes from this and appends
       * them to {@code sink}. Returns the number of bytes read, or -1 if this
       * source is exhausted.  将自身数据给 sink 
       */
      long read(Buffer sink, long byteCount) throws IOException;
    
      /** Returns the timeout for this source. */
      Timeout timeout();
    
       */
      @Override void close() throws IOException;
    }

     

    这两个是 Okio 中最基本的两个接口,分别对应 java 的 InputStream 和 OutputStream 即输入流和输出流,Source 是输入流,Sink 是输出流。接口提供的方法也是非常简单,大家一看就知道这几个方法的目的。

    BufferedSink & BufferedSource

    上面 Source和 Sink 提供了极简的接口,接着作者对这两个接口进行丰富的扩展。具体接口方法上文已介绍,这里也不在展开。

    这里简单提一点,这种设计风格是值得我们去学习的,设计接口的时候要简单,专一。然后可以再新建一个接口,去丰富扩展其功能。这样使用者可以选择自己想要的接口来进行实现。

    RealBufferedSource & RealBufferedSink

    在我们通过 Okio.source() 和 Okio.sink() 获取了 Souce 和 Sink 对象后,一般不会直接使用,而是会再调用一次 Okio.buffer() 生成一个实现 BufferedSource 和 BufferedSink 接口的对象:

     

      /**
       * Returns a new source that buffers reads from {@code source}. The returned
       * source will perform bulk reads into its in-memory buffer. Use this wherever
       * you read a source to get an ergonomic and efficient access to data.
       */
      public static BufferedSource buffer(Source source) {
        return new RealBufferedSource(source);
      }
    
      /**
       * Returns a new sink that buffers writes to {@code sink}. The returned sink
       * will batch writes to {@code sink}. Use this wherever you write to a sink to
       * get an ergonomic and efficient access to data.
       */
      public static BufferedSink buffer(Sink sink) {
        return new RealBufferedSink(sink);
      }

     

     

     

    内部分别返回的是 RealBufferedSource 和 RealBufferedSink 对象,他们分别实现了 BufferedSource BufferedSink接口,而这两个接口则是分别继承了 Source 和 Sink 接口的并基础上进行了方法扩展,提供了丰富的读写接口方法,几乎可以对各种基础数据类型进行读写。

    Segment 及 SegmentPool 

    Segment 是 Okio 中非常重要的一环,它可以说是 Buffer 中数据的载体。容量是 8kb,头结点为 head。

    final class Segment {
      //Segment的容量,最大为8kb
      static final int SIZE = 8192;
    
      //如果Segment中字节数 > SHARE_MINIMUM时(大Segment),就可以共享,不能添加到SegmentPool
      static final int SHARE_MINIMUM = 1024;
      //存储的数据
      final byte[] data;
    
      //下一次读取的开始位置
      int pos;
    
     //写入的开始位置
      int limit;
    
      //当前Segment是否可以共享
      boolean shared;
    
      //data是否仅当前Segment独有,不share
      boolean owner;
    
      //后继节点
      Segment next;
    
      //前驱节点
      Segment prev;
    
      ...
    
      //移除当前Segment
      public final @Nullable Segment pop() {
        Segment result = next != this ? next : null;
        prev.next = next;
        next.prev = prev;
        next = null;
        prev = null;
        return result;
      }
    
      //在当前节点后添加一个新的节点
      public final Segment push(Segment segment) {
        segment.prev = this;
        segment.next = next;
        next.prev = segment;
        next = segment;
        return segment;
      }
    
      //将当前Segment分裂成2个Segment结点。前面结点pos~limit数据范围是[pos..pos+byteCount),后面结点pos~limit数据范围是[pos+byteCount..limit)
      public final Segment split(int byteCount) {
        if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
        Segment prefix;
    
        //如果字节数大于SHARE_MINIMUM则拆分成共享节点
        if (byteCount >= SHARE_MINIMUM) {
          prefix = sharedCopy();
        } else {
          prefix = SegmentPool.take();
          System.arraycopy(data, pos, prefix.data, 0, byteCount);
        }
    
        prefix.limit = prefix.pos + byteCount;
        pos += byteCount;
        prev.push(prefix);
        return prefix;
      }
    
      //当前Segment结点和prev前驱结点合并成一个Segment,统一合并到prev,然后当前Segment结点从双向链表移除并添加到SegmentPool复用。当然合并的前提是:2个Segment的字节总和不超过8K。合并后可能会移动pos、limit
      public final void compact() {
        if (prev == this) throw new IllegalStateException();
        if (!prev.owner) return; // Cannot compact: prev isn't writable.
        int byteCount = limit - pos;
        int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
        if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.
        writeTo(prev, byteCount);
        pop();
        SegmentPool.recycle(this);
      }
    
      //从当前节点移动byteCount个字节到sink中
      public final void writeTo(Segment sink, int byteCount) {
        if (!sink.owner) throw new IllegalArgumentException();
        if (sink.limit + byteCount > SIZE) {
          // We can't fit byteCount bytes at the sink's current position. Shift sink first.
          if (sink.shared) throw new IllegalArgumentException();
          if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
          System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
          sink.limit -= sink.pos;
          sink.pos = 0;
        }
    
        System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
        sink.limit += byteCount;
        pos += byteCount;
      }
    }

    SegmentPool 是一个 Segment 池,内部维护了一个 Segment 单向链表,容量为64kb(8 个 Segment),回收不用的 Segment 对象。

    final class SegmentPool {
        //SegmentPool的最大容量
        static final