本文共 5609 字,大约阅读时间需要 18 分钟。
声明:本文是《Netty 权威指南》的样章,感谢博文视点授权发布样章,禁止以任何形式转载此文。
我们将在TimeServer例程中给出完整的NIO创建的时间服务器源码:
01 | public class TimeServer { |
02 |
03 | /** |
04 | * @param args |
05 | * @throws IOException |
06 | */ |
07 | public static void main(String[] args) throws IOException { |
08 | int port = 8080 ; |
09 | if (args != null && args.length > 0 ) { |
10 | try { |
11 | port = Integer.valueOf(args[ 0 ]); |
12 | } catch (NumberFormatException e) { |
13 | // 采用默认值 |
14 | } |
15 | } |
16 | MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port); |
17 | New Thread(timeServer, "NIO-MultiplexerTimeServer- 001 ").start(); |
18 | } |
19 | } |
我们对NIO创建的TimeServer进行下简单分析,8-15行跟之前的一样,设置监听端口。16-17行创建了一个被称为MultiplexerTimeServer的多路复用类,它是个一个独立的线程,负责轮询多路复用器Selctor,可以处理多个客户端的并发接入,现在我们继续看MultiplexerTimeServer的源码:
001 | public class MultiplexerTimeServer implements Runnable { |
002 |
003 | private Selector selector; |
004 |
005 | private ServerSocketChannel servChannel; |
006 |
007 | private volatile boolean stop; |
008 |
009 | /** |
010 | * 初始化多路复用器、绑定监听端口 |
011 | * |
012 | * @param port |
013 | */ |
014 | public MultiplexerTimeServer( int port) { |
015 | try { |
016 | selector = Selector.open(); |
017 | servChannel = ServerSocketChannel.open(); |
018 | servChannel.configureBlocking( false ); |
019 | servChannel.socket().bind( new InetSocketAddress(port), 1024 ); |
020 | servChannel.register(selector, SelectionKey.OP_ACCEPT); |
021 | System.out.println("The time server is start in port : " + port); |
022 | } catch (IOException e) { |
023 | e.printStackTrace(); |
024 | System.exit( 1 ); |
025 | } |
026 | } |
027 |
028 | public void stop() { |
029 | this .stop = true ; |
030 | } |
031 |
032 | /* |
033 | * (non-Javadoc) |
034 | * |
035 | * @see java.lang.Runnable#run() |
036 | */ |
037 | @Override |
038 | public void run() { |
039 | while (!stop) { |
040 | try { |
041 | selector.select( 1000 ); |
042 | Set<SelectionKey> selectedKeys = selector.selectedKeys(); |
043 | Iterator<SelectionKey> it = selectedKeys.iterator(); |
044 | SelectionKey key = null ; |
045 | while (it.hasNext()) { |
046 | key = it.next(); |
047 | it.remove(); |
048 | try { |
049 | handleInput(key); |
050 | } catch (Exception e) { |
051 | if (key != null ) { |
052 | key.cancel(); |
053 | if (key.channel() != null ) |
054 | key.channel().close(); |
055 | } |
056 | } |
057 | } |
058 | } catch (Throwable t) { |
059 | t.printStackTrace(); |
060 | } |
061 | } |
062 |
063 | // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源 |
064 | if (selector != null ) |
065 | try { |
066 | selector.close(); |
067 | } catch (IOException e) { |
068 | e.printStackTrace(); |
069 | } |
070 | } |
071 |
072 | private void handleInput(SelectionKey key) throws IOException { |
073 |
074 | if (key.isValid()) { |
075 | // 处理新接入的请求消息 |
076 | if (key.isAcceptable()) { |
077 | // Accept the new connection |
078 | ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); |
079 | SocketChannel sc = ssc.accept(); |
080 | sc.configureBlocking( false ); |
081 | // Add the new connection to the selector |
082 | sc.register(selector, SelectionKey.OP_READ); |
083 | } |
084 | if (key.isReadable()) { |
085 | // Read the data |
086 | SocketChannel sc = (SocketChannel) key.channel(); |
087 | ByteBuffer readBuffer = ByteBuffer.allocate( 1024 ); |
088 | int readBytes = sc.read(readBuffer); |
089 | if (readBytes > 0 ) { |
090 | readBuffer.flip(); |
091 | byte [] bytes = new byte [readBuffer.remaining()]; |
092 | readBuffer.get(bytes); |
093 | String body = new String(bytes, "UTF- 8 "); |
094 | System.out.println("The time server receive order : " |
095 | + body); |
096 | String currentTime = "QUERY TIME ORDER" |
097 | .equalsIgnoreCase(body) ? new java.util.Date( |
098 | System.currentTimeMillis()).toString() |
099 | : "BAD ORDER"; |
100 | doWrite(sc, currentTime); |
101 | } else if (readBytes < 0 ) { |
102 | // 对端链路关闭 |
103 | key.cancel(); |
104 | sc.close(); |
105 | } else |
106 | ; // 读到0字节,忽略 |
107 | } |
108 | } |
109 | } |
110 |
111 | private void doWrite(SocketChannel channel, String response) |
112 | throws IOException { |
113 | if (response != null && response.trim().length() > 0 ) { |
114 | byte [] bytes = response.getBytes(); |
115 | ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); |
116 | writeBuffer.put(bytes); |
117 | writeBuffer.flip(); |
118 | channel.write(writeBuffer); |
119 | } |
120 | } |
121 | } |
由于这个类相比于传统的Socket编程稍微复杂一些,在此我们进行详细分析,我们从如下几个关键步骤讲解多路复用处理类:
14-26行为构造方法,在构造方法中进行资源初始化,创建多路复用器Selector、ServerSocketChannel,对Channel和TCP参数进行配置,例如将ServerSocketChannel设置为异步非阻塞模式,它的backlog设置为1024。系统资源初始化成功后将ServerSocketChannel注册到Selector,监听SelectionKey.OP_ACCEPT操作位;如果资源初始化失败,例如端口被占用则退出
39-61行在线程的run方法的while循环体中循环遍历selector,它的休眠时间为1S,无论是否有读写等事件发生,selector每隔1S都被唤醒一次,selector也提供了一个无参的select方法。当有处于就绪状态的Channel时,selector将返回就绪状态的Channel的SelectionKey集合,我们通过对就绪状态的Channel集合进行迭代,就可以进行网络的异步读写操作
76-83行处理新接入的客户端请求消息,根据SelectionKey的操作位进行判断即可获知网络事件的类型,通过ServerSocketChannel的accept接收客户端的连接请求并创建SocketChannel实例,完成上述操作后,相当于完成了TCP的三次握手,TCP物理链路正式建立。注意,我们需要将新创建的SocketChannel设置为异步非阻塞,同时也可以对其TCP参数进行设置,例如TCP接收和发送缓冲区的大小等,作为入门的例子,例程没有进行额外的参数设置
84-109行用于读取客户端的请求消息,首先创建一个ByteBuffer,由于我们事先无法得知客户端发送的码流大小,作为例程,我们开辟一个1M的缓冲区。然后调用SocketChannel的read方法读取请求码流,注意,由于我们已经将SocketChannel设置为异步非阻塞模式,因此它的read是非阻塞的。使用返回值进行判断,看读取到的字节数,返回值有三种可能的结果:
1) 返回值大于0:读到了字节,对字节进行编解码;
2) 返回值等于0:没有读取到字节,属于正常场景,忽略;
3) 返回值为-1:链路已经关闭,需要关闭SocketChannel,释放资源。
当读取到码流以后,我们进行解码,首先对readBuffer进行flip操作,它的作用是将缓冲区当前的limit设置为position,position设置为0,用于后续对缓冲区的读取操作。然后根据缓冲区可读的字节个数创建字节数组,调用ByteBuffer的get操作将缓冲区可读的字节数组拷贝到新创建的字节数组中,最后调用字符串的构造函数创建请求消息体并打印。如果请求指令是”QUERY TIME ORDER”则把服务器的当前时间编码后返回给客户端,下面我们看看如果异步发送应答消息给客户端。
111-119行将应答消息异步发送给客户端,我们看下关键代码,首先将字符串编码成字节数组,根据字节数组的容量创建ByteBuffer,调用ByteBuffer的put操作将字节数组拷贝到缓冲区中,然后对缓冲区进行flip操作,最后调用SocketChannel的write方法将缓冲区中的字节数组发送出去。需要指出的是,由于SocketChannel是异步非阻塞的,它并不保证一次能够把需要发送的字节数组发送完,此时会出现“写半包”问题,我们需要注册写操作,不断轮询Selector将没有发送完的ByteBuffer发送完毕,可以通过ByteBuffer的hasRemain()方法判断消息是否发送完成。此处仅仅是个简单的入门级例程,没有演示如何处理“写半包”场景,后续的章节会有详细说明。
使用NIO创建TimeServer服务器完成之后,我们继续学习如何创建NIO客户端。首先还是通过时序图了解关键步骤和过程,然后结合代码进行详细分析。
转载地址:https://blog.csdn.net/weixin_34077371/article/details/90663167 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!