本文共 7226 字,大约阅读时间需要 24 分钟。
前言
如果说http对应着《》,那为tcp就对应着netty.Netty对于程序员的重要性可想而知
Netty架构
netty主要有以下三部份构成:
- ByteBuf内存
- Channel管道
- Pipeline事件模式
ByteBuf内存篇
《》与《》中详细介绍了buff操作。分别对应Netty中的
- UnpooledHeapByteBuf(堆内)
- UnpooledDirectByteBuf (堆外)
- UnpooledUnsafeHeapByteBuf(unsafe堆内)
- UnpooledUnsafeDirectByteBuf(unsafe堆外)
- Netty对ByteBuf增加了基本控制,诸如 readerIndex, writerIndex, markedReaderIndex, markedWriterIndex, maxCapacity。
- 使用虚拟机提但的UNSAFE类可以直接跳过虚拟机的检测,可提交内存分配数据
- 使用ResourceLeakDetector封装RefQueue来检测堆外内存的回收
通过阅读AbstractNioByteChannel中的代码,read()方法发生在nio的op_read事件时触发
@Override public final void read() { final ChannelConfig config = config(); if (shouldBreakReadReady(config)) { clearReadPending(); return; } //每个channel对应一个PPLine final ChannelPipeline pipeline = pipeline(); //ByteBuf分配器 final ByteBufAllocator allocator = config.getAllocator(); //容量计算器 final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); //重置,把之前计数的值全部清空 allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { //分配内存,关键在于计算分配内存的大小(小了不够,大了浪费) byteBuf = allocHandle.allocate(allocator); //doReadBytes,从socket读取字节到byteBuf,返回真实读取数量 //更新容量计算器 allocHandle.lastBytesRead(doReadBytes(byteBuf)); //如果小于0 则socket关闭,如果等于0则没读取到数据 if (allocHandle.lastBytesRead() <= 0) { //释放资源 byteBuf.release(); byteBuf = null; //如果小于0则意味着socket关闭 close = allocHandle.lastBytesRead() < 0; if (close) { readPending = false; } break; } //增加循环计数器 allocHandle.incMessagesRead(1); readPending = false; //把读取到的数据,交给管道去处理 pipeline.fireChannelRead(byteBuf); byteBuf = null; //判断是否继续从socket读取数据 } while (allocHandle.continueReading()); //读取完成后调用readComplete,重新估算内存分配容量 allocHandle.readComplete(); //事件激发 pipeline.fireChannelReadComplete(); //如果需要关闭,则处理关闭 if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { //根据情况移除OP_READ事件 if (!readPending && !config.isAutoRead()) { removeReadOp(); } } }
- ByteBufAllocator 是内存分配者,用于申请内存
- RecvByteBufAllocator.Handle 内存读取策略,不可能一次性读完所有的数据
- allocHandle.lastBytesRead方法标记已读取的内容大下
- doReadBytes是真正的从管道读取数据
- 从上述代码可知,如果读取数据过大,会多次触发pipeline.fireChannelRead
ByteToMessageDecoder解决多次触发问题
从上述源码分析得,如果NIO一次性传取的数据过大会多次触发fireChannelRead。可在代码编写的时候感觉不出来,Netty在ByteToMessageDecoder做处理. ByteToMessageDecoder往往是头部pipeline.
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //只处理字节缓冲区类型的 if (msg instanceof ByteBuf) { CodecOutputList out = CodecOutputList.newInstance(); try { first = cumulation == null; // 如果是第一次读取则创建一个,不是则累加 cumulation = cumulator.cumulate(ctx.alloc(), first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg); // 具体解析数据,到数据符合规则解析到out对象中。 // 如果不合规定,不操作,等待后面的数据累加 callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Exception e) { throw new DecoderException(e); } finally { //当数据不为空也不可读,要释放。证明数据有丢失不处理 if (cumulation != null && !cumulation.isReadable()) { numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) { //读取数据的次数大于阈值,则尝试丢弃已读的,避免占着内存 numReads = 0; discardSomeReadBytes(); } int size = out.size(); //有被添加或者设置,表是有读过了 firedChannelRead |= out.insertSinceRecycled(); // 如果size > 0 尝试向下面Handler传递数据 fireChannelRead(ctx, out, size); out.recycle(); } } else { ctx.fireChannelRead(msg);//其他类型继续传递 } }
Channel通道
《》中epoll框架.Netty框架运行例子如下
EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.option(ChannelOption.SO_BACKLOG, 1024); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer(){ @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline p = ch.pipeline(); // http加解码器 p.addLast(new HttpServerCodec()); // 自定义handler p.addLast(new HttpHelloWorldServerHandler()); } }); Channel ch = b.bind(PORT).sync().channel(); System.err.println("Open your web browser and navigate to http://127.0.0.1:" + PORT + '/'); ch.closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }
其中bind调用过程如下所示
- initAndRegister创建的管道并调用init做初始化
- init对channel进行了pipeline的ServerBootstrapAcceptor设置及bossGroup的使用
- register0将channel事件注册并获取 selectionKey
- run 循环遍历监听nio事件,并分配pipeline处理
- 通过pipeline事件,ServerBootstrapAcceptor获取连接进来的childChannel,对其注册并配置workerGroup
- childChannel注册流程和channel相似
其中bossGroup,workerGroup的设计为传统Reactor反应堆设计模式。
Pipeline事件模型
在Netty 中每个Channel 都有且仅有一个ChannelPipeline 与之对应。
Pipeline的元素包括2部份:- Context 标记Handler的位置,状态及提供上下文的依赖,事件传播
- Handler 专门处理事件
Pipeline 的事件传播机制
Netty 中的传播事件可以分为两种:Inbound 事件和Outbound 事件。
从上图可以看出,inbound 事件和outbound 事件的流向是不一样的,inbound 事件的流行是从下至上,而outbound刚好相反,是从上到下。通过 AbstractChannelHandlerContext 中 executionMask属性来标识MASK_ALL_INBOUND,MASK_ALL_OUTBOUND 所支持的事务。
// 向下链路查找符合当前事件传播规则的Context// mask 标识当前要传递的事件private AbstractChannelHandlerContext findContextInbound(int mask) { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while ((ctx.executionMask & mask) == 0); return ctx; }
链表中head 是一个ChannelOutboundHandler,而tail则是一个ChannelInboundHandler.表示当Inbound事件没人处理tail空处理,反之Outbound没人处理head空处理
总结
通过netty掌握,我手撕《》项目。
当然netty官方也给我们实现了各种协议的例子(netty.handler.codec包):- http
- http2
- reids
- …
主要参考
《》
《》 《》 《》转载地址:https://blog.csdn.net/y3over/article/details/114108504 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!