Netty概述
发布日期:2021-05-16 10:23:20 浏览次数:11 分类:技术文章

本文共 7226 字,大约阅读时间需要 24 分钟。

前言

如果说http对应着《》,那为tcp就对应着netty.Netty对于程序员的重要性可想而知

Netty架构

netty主要有以下三部份构成:

  1. ByteBuf内存
  2. Channel管道
  3. Pipeline事件模式
    在这里插入图片描述

ByteBuf内存篇

《》与《》中详细介绍了buff操作。分别对应Netty中的

  • UnpooledHeapByteBuf(堆内)
  • UnpooledDirectByteBuf (堆外)
  • UnpooledUnsafeHeapByteBuf(unsafe堆内)
  • UnpooledUnsafeDirectByteBuf(unsafe堆外)
  1. Netty对ByteBuf增加了基本控制,诸如 readerIndex, writerIndex, markedReaderIndex, markedWriterIndex, maxCapacity。
  2. 使用虚拟机提但的UNSAFE类可以直接跳过虚拟机的检测,可提交内存分配数据
  3. 使用ResourceLeakDetector封装RefQueue来检测堆外内存的回收

通过阅读AbstractNioByteChannel中的代码,read()方法发生在nio的op_read事件时触发

@Override        public final void read() {
final ChannelConfig config = config(); if (shouldBreakReadReady(config)) {
clearReadPending(); return; } //每个channel对应一个PPLine final ChannelPipeline pipeline = pipeline(); //ByteBuf分配器 final ByteBufAllocator allocator = config.getAllocator(); //容量计算器 final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); //重置,把之前计数的值全部清空 allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try {
do {
//分配内存,关键在于计算分配内存的大小(小了不够,大了浪费) byteBuf = allocHandle.allocate(allocator); //doReadBytes,从socket读取字节到byteBuf,返回真实读取数量 //更新容量计算器 allocHandle.lastBytesRead(doReadBytes(byteBuf)); //如果小于0 则socket关闭,如果等于0则没读取到数据 if (allocHandle.lastBytesRead() <= 0) {
//释放资源 byteBuf.release(); byteBuf = null; //如果小于0则意味着socket关闭 close = allocHandle.lastBytesRead() < 0; if (close) {
readPending = false; } break; } //增加循环计数器 allocHandle.incMessagesRead(1); readPending = false; //把读取到的数据,交给管道去处理 pipeline.fireChannelRead(byteBuf); byteBuf = null; //判断是否继续从socket读取数据 } while (allocHandle.continueReading()); //读取完成后调用readComplete,重新估算内存分配容量 allocHandle.readComplete(); //事件激发 pipeline.fireChannelReadComplete(); //如果需要关闭,则处理关闭 if (close) {
closeOnRead(pipeline); } } catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally {
//根据情况移除OP_READ事件 if (!readPending && !config.isAutoRead()) {
removeReadOp(); } } }
  1. ByteBufAllocator 是内存分配者,用于申请内存
  2. RecvByteBufAllocator.Handle 内存读取策略,不可能一次性读完所有的数据
  3. allocHandle.lastBytesRead方法标记已读取的内容大下
  4. doReadBytes是真正的从管道读取数据
  5. 从上述代码可知,如果读取数据过大,会多次触发pipeline.fireChannelRead
ByteToMessageDecoder解决多次触发问题

从上述源码分析得,如果NIO一次性传取的数据过大会多次触发fireChannelRead。可在代码编写的时候感觉不出来,Netty在ByteToMessageDecoder做处理. ByteToMessageDecoder往往是头部pipeline.

@Override   public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//只处理字节缓冲区类型的 if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance(); try {
first = cumulation == null; // 如果是第一次读取则创建一个,不是则累加 cumulation = cumulator.cumulate(ctx.alloc(), first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg); // 具体解析数据,到数据符合规则解析到out对象中。 // 如果不合规定,不操作,等待后面的数据累加 callDecode(ctx, cumulation, out); } catch (DecoderException e) {
throw e; } catch (Exception e) {
throw new DecoderException(e); } finally {
//当数据不为空也不可读,要释放。证明数据有丢失不处理 if (cumulation != null && !cumulation.isReadable()) {
numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) {
//读取数据的次数大于阈值,则尝试丢弃已读的,避免占着内存 numReads = 0; discardSomeReadBytes(); } int size = out.size(); //有被添加或者设置,表是有读过了 firedChannelRead |= out.insertSinceRecycled(); // 如果size > 0 尝试向下面Handler传递数据 fireChannelRead(ctx, out, size); out.recycle(); } } else {
ctx.fireChannelRead(msg);//其他类型继续传递 } }

Channel通道

《》中epoll框架.Netty框架运行例子如下

EventLoopGroup bossGroup = new NioEventLoopGroup(1);        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {
ServerBootstrap b = new ServerBootstrap(); b.option(ChannelOption.SO_BACKLOG, 1024); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer(){
@Override protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline(); // http加解码器 p.addLast(new HttpServerCodec()); // 自定义handler p.addLast(new HttpHelloWorldServerHandler()); } }); Channel ch = b.bind(PORT).sync().channel(); System.err.println("Open your web browser and navigate to http://127.0.0.1:" + PORT + '/'); ch.closeFuture().sync(); } finally {
bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }

其中bind调用过程如下所示

在这里插入图片描述

  1. initAndRegister创建的管道并调用init做初始化
  2. init对channel进行了pipeline的ServerBootstrapAcceptor设置及bossGroup的使用
  3. register0将channel事件注册并获取 selectionKey
  4. run 循环遍历监听nio事件,并分配pipeline处理
  5. 通过pipeline事件,ServerBootstrapAcceptor获取连接进来的childChannel,对其注册并配置workerGroup
  6. childChannel注册流程和channel相似

其中bossGroup,workerGroup的设计为传统Reactor反应堆设计模式。

在这里插入图片描述

Pipeline事件模型

在Netty 中每个Channel 都有且仅有一个ChannelPipeline 与之对应。

在这里插入图片描述
Pipeline的元素包括2部份:

  • Context 标记Handler的位置,状态及提供上下文的依赖,事件传播
  • Handler 专门处理事件
Pipeline 的事件传播机制

Netty 中的传播事件可以分为两种:Inbound 事件和Outbound 事件。

在这里插入图片描述
从上图可以看出,inbound 事件和outbound 事件的流向是不一样的,inbound 事件的流行是从下至上,而outbound刚好相反,是从上到下。

通过 AbstractChannelHandlerContext 中 executionMask属性来标识MASK_ALL_INBOUND,MASK_ALL_OUTBOUND 所支持的事务。

// 向下链路查找符合当前事件传播规则的Context// mask 标识当前要传递的事件private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this; do {
ctx = ctx.next; } while ((ctx.executionMask & mask) == 0); return ctx; }

链表中head 是一个ChannelOutboundHandler,而tail则是一个ChannelInboundHandler.表示当Inbound事件没人处理tail空处理,反之Outbound没人处理head空处理

总结

通过netty掌握,我手撕《》项目。

当然netty官方也给我们实现了各种协议的例子(netty.handler.codec包):

  • http
  • http2
  • reids

主要参考

《》

《》
《》
《》

转载地址:https://blog.csdn.net/y3over/article/details/114108504 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:http2概述
下一篇:MyBatis Generator简介

发表评论

最新留言

路过按个爪印,很不错,赞一个!
[***.219.124.196]2024年04月11日 10时45分53秒