博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
netty ByteBuf源码分析
阅读量:4150 次
发布时间:2019-05-25

本文共 10203 字,大约阅读时间需要 34 分钟。

    在介绍netty常用的数据缓冲类之前,建议先熟悉下java NIO当中提供的类的分析,netty提供自己的缓冲类就是为了解决原生java当中ByteBuffer使用不方便的问题。这篇文章主要是分析netty当中的对ByteBuf的分析。

    这篇文章从以下几个方面来介绍ByteBuf

  • ByteBuf的使用,及常用接口
  • ByteBuf 源码分析
  • ByteBuf 相关实现类
  • ByteBuf 辅助类

  ByteBuf 使用及常用接口

        ByteBuf 的设计和实现和java NIO当中的ByteBuffer很大程度上是很相似的,他们都是底层维护一个字节数组或者数据缓冲区,以及相关的索引操作。

ByteBuf当中使用了readerIndex和writerIndex来区分可读和可写的索引,不同于ByteBuffer当中,使用position来标识,每次读写操作时,还需要进行filp() ,显然使用起来并不方便。

readerIndex:当前对象中,可以读取的数据的最大索引

writerIndex: 当前对象中,可以写入的最大索引

对于上图中的区间我们分别进行介绍:

discardable bytes : 对于已经读取过得内容,任务这些数据时可以被丢弃的,比如当wriable bytes空间比较小的情况下,可以丢弃一部分已经读的内容,从而增加writable bytes 的区间

执行之前,索引的位置关系:

执行完discardable bytes 之后

readable bytes: 已经写入的数据,但是还没有进行读取过

writable bytes : 从writerIndex到capactiy之间为可写缓冲区

在ByteBuf 当中接口以read 开口的,每执行一次,就会增加readerIndex的值,以write开头的每执行一次就会增加writeIndex的值。并且这两类操作都是进行顺序的读写操作。

discardable bytes 操作:java 中对于缓冲区的分配和释放是个耗时操作,我们需要尽量重复使用,ByteBuf 缓冲区在写入时,如果写入的数据量超过了capactiy,那么会进行动态的扩容,这个扩容的操作会进行字节数组的复制,是一个费时的操作,因此为了提升性能,应该最大限度的重复使用,discardable bytes 就是讲读取过得内容丢弃掉,来增加可写入的缓冲区大小,当然在丢弃已经读取过得内容时,其本质也是进行数组的复制,所以,频繁调用也是会造成新能的浪费。因此,我们在实际的开发过程中,如果确定以性能换取更大的内存空间,那么就可以使用discardable bytes 

clear(): 不会擦除底层数据,只是将readerIndex和writerIndex置为0

mark 和reset : 标记当前readerIndex,或者标记writerIndex ,以及清除readerIndex和writerIndex

duplicate(): 复制一份ByteBuf,拥有独立的readerIndex和writerIndex, 但是底层共用一份缓存数据

copy() :复制一份ByteBuf, 拥有独立的readerIndex 和writerIndex,并且是独立的缓存数据。

slice():  获取当前可读缓存区的内容作为新的byteBuf对象,拥有独立的索引,底层数据是同一份

nioBuffer() : 将当前byteBuf对象转化为NIO中byteBuffer对象

源码分析

    对于源码分析,ByteBuf的主要实现类AbstractByteBuf实现了大部分的内容,对于字节数组这部分,是在具体的实现类当中,不影响的分析

AbstracByteBuf:

读操作:从指定索引开始,读取指定长度的内容,到对应的byte数组当中

readBytes(byte[], int, int)

① 检查可读取得长度,如果是负数,直接抛出异常,如果readableBytes的长度,小于length的长度,也还会抛出异常,所以第一步就是校验操作

②从指定索引处,读取指定长度内容到数组当中,具体的实现,在各个子类当中

③读索引增加

写操作:从传入的字节数组中指定的索引处开始读取,读取指定长度的字节到当前ByteBuf对象中

writeBytes(byte[], int ,int) 

源码实现:

①第一步也是校验,不过和读操作不同的是,写操作,除了校验传入的参数,还需要判断当前可写空间是否足够,如果不够的话,需要进行扩容

对于校验操作可以分为以下几种情况:

<1>写入的长度,是否小于writableBytes 长度,如果小于,直接返回,说明是可以写入的

<2>如果写入的长度,要比最大容量(Integer.MAX_VALUE- writerIndex)还要大直接抛出异常

<3>以上两种情况都不满足,进行扩容操作

下面我们看下扩容的代码:

 
@Overridepublic int calculateNewCapacity(int minNewCapacity, int maxCapacity) {    if (minNewCapacity < 0) {        throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expected: 0+)");    }    if (minNewCapacity > maxCapacity) {        throw new IllegalArgumentException(String.format(                "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",                minNewCapacity, maxCapacity));    }    final int threshold = CALCULATE_THRESHOLD; // 4 MiB page    if (minNewCapacity == threshold) {  // NO. 1        return threshold;    }    // If over threshold, do not double but just increase by threshold.    if (minNewCapacity > threshold) { NO.2        int newCapacity = minNewCapacity / threshold * threshold;        if (newCapacity > maxCapacity - threshold) {            newCapacity = maxCapacity;        } else {            newCapacity += threshold;        }        return newCapacity;    }    // Not over threshold. Double up to 4 MiB, starting from 64.    int newCapacity = 64;    while (newCapacity < minNewCapacity) { NO.3        newCapacity <<= 1;    }    return Math.min(newCapacity, maxCapacity);}
NO.1 如果新写入的大小,刚好等于这个阙值(4M),直接返回
NO.2 如果当前新增的写入数据,大于4M,新增数据长度,以4M 递增,如果增加后,大于最大内存,直接返回,否则就是阙值加上新增的值,作为新的capacity

NO.3 如果新写入的数据量小于4M,采用直接 64 -》128翻倍的形式

总结:当写入的缓存的大小刚好等于4M,直接返回4M作为新的capacity。如果小于4M, 同时小于 64,那么直接返回64 作为capacity。如果小于4M,大于64 ,在64基础上翻倍到128,直到大于需要写入的缓存数据为止,如果大于4M,会以4M倍数增加,如果增量超过Integer.MAX_VALUE,直接返回maxCapacity, 否则就只增加4M。 这里体现netty在内存空间占用上的考量,当数据量不是很大的时候,就翻倍递增,当超过一定程度之后,考虑到内存使用浪费,就只是单步递增。这样防止空间的浪费。

重用缓冲区(discardReadBytes)

对于已经读取过得数据,将丢弃掉,将未读取的数据移动到数组起始位置。以下是实现代码

①如果readerIndex == 0 说明没有discardBytes 直接返回

② 将readableBytes移动到数组开启索引出

③见makerReaderIndex进行一定的偏移修改,如果本身小于readerIndex,直接置为0,writerIndex 丢弃的数组长度,直接置为0,如果大于的话移动丢弃数组的长度的偏移量

④readeIndex置为0

ReferenceCount(引用计数)

    netty中ByteBuf接口实现了ReferenctCount 接口,这个接口主要是用来跟踪对象的分配和销毁,做自动内存回收。
    其内部维护了一个引用计数,初始化的时候是1,调用retain() 会使计数器加 1,release() 会减 1,如果计数器为 0 ,将会显示的取消内存分配。

AbstractReferenceCountedByteBuf 中关于引用计数的一个现实

在这个类当中维护了一个可以执行原子操作的对象

private static final AtomicIntegerFieldUpdater
refCntUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
这个类主要是用来对一个Object对象当中,属于int类型的修改,保证其原子操作,是java 1.5当中提供的一个操作,类似于AutomicInteger类,主要是为了保证在多线程情况下的原子操作,

它提供的创建方式,就是制定需要原子更新的属性名称,及对象的类名,底层是通过反射的方式,来获取到字段的值

AtomicIntegerFieldUpdaterImpl 是这个抽象类的内部实现类,
 

以下是构造方法代码

private static class AtomicIntegerFieldUpdaterImpl
extends AtomicIntegerFieldUpdater
{ private static final Unsafe unsafe = Unsafe.getUnsafe(); private final long offset; private final Class
tclass; private final Class
cclass; AtomicIntegerFieldUpdaterImpl(final Class
tclass, final String fieldName, final Class
caller) { final Field field; final int modifiers; try { field = AccessController.doPrivileged( new PrivilegedExceptionAction
() { public Field run() throws NoSuchFieldException { return tclass.getDeclaredField(fieldName); } }); modifiers = field.getModifiers(); sun.reflect.misc.ReflectUtil.ensureMemberAccess( caller, tclass, null, modifiers); // 判断属性的访问修饰符,是否可以访问 ClassLoader cl = tclass.getClassLoader(); ClassLoader ccl = caller.getClassLoader(); if ((ccl != null) && (ccl != cl) && ((cl == null) || !isAncestor(cl, ccl))) { sun.reflect.misc.ReflectUtil.checkPackageAccess(tclass); } } catch (PrivilegedActionException pae) { throw new RuntimeException(pae.getException()); } catch (Exception ex) { throw new RuntimeException(ex); } Class
fieldt = field.getType(); if (fieldt != int.class) throw new IllegalArgumentException("Must be integer type"); if (!Modifier.isVolatile(modifiers)) throw new IllegalArgumentException("Must be volatile type"); this.cclass = (Modifier.isProtected(modifiers) && caller != tclass) ? caller : null; this.tclass = tclass; offset = unsafe.objectFieldOffset(field); }
 
从构造方法来看,如果要使用AutomicIntegerFieldUpdater ,那么这个对象必须满足以下几个条件

1、执行原子操作的属性必须是int类型的,虽然这个类名很容易误解成是Integer类型的

2、其次这个属性必须是volatile 修饰的

3、属性必须是可以访问到的,否则就会抛出异常,

4、属性必须是实例变量,不能是类变量,即不可以被static修饰的变量,unsafe.objectFieldOffset 不支持静态变量(cas操作的本质是通过对象实例的偏移量来进行赋值的,JVM的实现可以自用的进行java对象的布局,也就是在内存当中各个java各个对象放在哪里,包括对象的实例字段和元数据之类的,sun.misc.Unsafe是把对象布局抽象出来,而这个objectFieldOffset就是获取某个字段相对java对象的偏移量,staticFieldOffset是获取静态对象的偏移量

补充:在原生java代码中,如果想原子性修改对象当中的包装类,比如Integer,可以使用AutomicReferenceFieldUpdater 类

接着再回到AbstracReferenceCountedByteBuf当中,我们来看其对于retain() 和relase方法的实现

private ByteBuf retain0(int increment) {    for (;;) {        int refCnt = this.refCnt;        final int nextCnt = refCnt + increment;        // Ensure we not resurrect (which means the refCnt was 0) and also that we encountered an overflow.        if (nextCnt <= increment) {            throw new IllegalReferenceCountException(refCnt, increment);        }        if (refCntUpdater.compareAndSet(this, refCnt, nextCnt)) {            break;        }    }    return this;}
这里采用自旋锁的方式,来进行refCnt字段的修改,一直处于死循环当中,直到修改成功

如果当前refCnt 等于 0 说明对象当前已经被释放掉了,抛出异常,compareAndSet 会拿旧的值和要修改的值作比对,如果旧的值被其他线程已经修改了,那么comareAndSet就执行失败,会重新获取到 refCnt的值,尝试进行修改。

同样,一下是释放的操作

private boolean release0(int decrement) {    for (;;) {        int refCnt = this.refCnt;        if (refCnt < decrement) {            throw new IllegalReferenceCountException(refCnt, -decrement);        }        if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {            if (refCnt == decrement) {                deallocate();                return true;            }            return false;        }    }}
释放操作比新增操作多了一个条件,如果refCnt ==decrement 意味着,执行完compareAndSet 成功之后,refCnt 就比变为 0 了,需要显示的去释放掉资源。
UnpooledHeapByteBuf 

基于堆内存进行内存分配的字节缓冲区,每次进行I/O操作都会创建一个UnpooledHeapByteBuf ,频繁进行大块内存的分配和回收会对性能造成一定的影响。相比于堆外内存的申请和释放,它的管理成本还是会低一点的

这个类提供了非池化的方式去创建一个ByteBuf对象,并且以引用计数的方式进行对象内存的释放

这个类主要维护的是:

private final ByteBufAllocator alloc; //用于内存分配byte[] array; //保存的字节数组private ByteBuffer tmpNioBuf;
这个类相比较与jdk 当中提供的 ByteBuffer,最大的不同就时可以进行动态扩容,我们来看下关于扩容这部分的代码

@Overridepublic ByteBuf capacity(int newCapacity) {    checkNewCapacity(newCapacity);    int oldCapacity = array.length;    byte[] oldArray = array;    if (newCapacity > oldCapacity) {        byte[] newArray = allocateArray(newCapacity);        System.arraycopy(oldArray, 0, newArray, 0, oldArray.length);        setArray(newArray);        freeArray(oldArray);    } else if (newCapacity < oldCapacity) {        byte[] newArray = allocateArray(newCapacity);        int readerIndex = readerIndex();        if (readerIndex < newCapacity) {            int writerIndex = writerIndex();            if (writerIndex > newCapacity) {                writerIndex(writerIndex = newCapacity);            }            System.arraycopy(oldArray, readerIndex, newArray, readerIndex, writerIndex - readerIndex);        } else {            setIndex(newCapacity, newCapacity);        }        setArray(newArray);        freeArray(oldArray);    }    return this;}
①如果新的容量,大于老的容量,直接创建一个新的byte数组,将之前的byte数组的元素拷贝到新的数组当中,将UnpooledHeapByteBuf中的array指向新创建的数组

②如果新的容量,小于旧的容量大小,截取部分数据作为新的缓冲区数组,先判断,读索引是否小于新的容量值,如果是,判断写索引是否大于新的容量值,如果是,将新的容量值设置成写索引,将当前可读的数组复制到新创建的子缓冲区中去

③如果新的容量,小于旧的容量,而且,读索引大于新的容量,说明没有可读的数组,直接将读写索引设置成新的容量

UnpooledDirectByteBuf 是基于jdk当中 DirectByteBuffer 实现的,不展开进行分析。

ByteBuf最佳实践:在I/O通信线程的读写缓冲区使用DirectByteBuf,因为涉及到网络数据的传递,减少了数据拷贝,后端的业务消息的编解码模块使用HeapByteBuf,

netty中ByteBuf和JDK中ByteBuffer对比

1、byteBuf 中采用读写索引分离,byteBuffer 中使用position 来控制读写

2、byteBuf 当中底层数据时可以扩容的,最大容量Integer.MAX_VALUE,而ByteBuffer一旦在创建的时候确定好容量,就不可以再更改,(ByteBuffer 底层维护的是 final类型的数组)

CompositeByteBuf

    可以将多个ByteBuf 存放在一起,可以包含directByteBuf,也可以包含HeapByteBuf,  主要适用于某个协议包含两个部分消息头和消息体。

以上就是关于Netty中ByteBuf 的相关内容,如有问题,欢迎指正~

参考:netty 权威指南

你可能感兴趣的文章
android组件化架构!我了解到的面试的一些小内幕!成功入职腾讯
查看>>
android自学!连续四年百度Android岗必问面试题!看这一篇就够了!
查看>>
android获取当前日期!骚年你的屏幕适配方式该升级了,知乎上已获万赞
查看>>
android菜鸟教程!Android面试题集2021版,真香!
查看>>
android菜鸟!阿里P8架构师的Android大厂面试题总结,灵魂拷问
查看>>
android视频播放器!2021年你与字节跳动只差这份笔记,完整版开放下载
查看>>
android计步器!30岁以后搞Android已经没有前途?再不刷题就晚了!
查看>>
android软件开发!淘汰了80%的Android面试者,高级面试题+解析
查看>>
android指纹识别!给后辈的一点建议,实战解析
查看>>
android控制中心!全世界都在问Android开发凉了吗?在线面试指南
查看>>
android推送不及时!Android面试题集2021版,成功入职阿里
查看>>
android操作系统!金九银十怎么从中小企业挤进一线大厂?威力加强版
查看>>
android教程!Android面试真题解析火爆全网,薪资翻倍
查看>>
android数据库!轻松获得一线大厂面试offer,重难点整理
查看>>
android服务!不断提升自己创造溢价的能力,值得收藏!
查看>>
android权限大全!15分钟的字节跳动视频面试,源码+原理+手写框架
查看>>
android电子市场!熬夜肝完这份Framework笔记,不吃透都对不起自己
查看>>
Android面试总结,BAT这种大厂履历意味着什么?终获offer
查看>>
android音乐播放器!从零开始系统化学Android,含泪整理面经
查看>>
android项目实例!从零开始系统化学Android,大牛最佳总结
查看>>