0%

浅析Java之BIO、NIO、AIO

本文将介绍Java中几种常见的网络编程模型

abstract.jpeg

网络IO模型

对于一个网络IO而言,其操作流程大体可分为两个阶段。这里以read操作为例进行说明,write操作同理

  1. 数据准备阶段:等待数据从网络中到达,并将数据拷贝到内核的Socket接收缓冲区
  2. 数据拷贝阶段:将数据从内核空间拷贝到用户空间

对于阻塞IO、非阻塞IO而言,其描述的是在第一个阶段——数据准备阶段,发起IO请求的进程/线程是否会被阻塞。具体地,对于阻塞IO而言,如果内核接收缓冲区没有数据,则进程/线程会一直等待,直到内核接收缓冲区有数据为止;而对于非阻塞IO而言,如果内核接收缓冲区没有数据,则进程/线程会立即返回,而不是一直进行等待。故对于非阻塞IO而言,可通过轮询的方式检查当前是否有数据到达

而对于同步、异步IO而言,其描述的是在第二个阶段——数据拷贝阶段,是否需要由用户线程来执行。具体地,同步IO是由用户线程的内核态来执行第二阶段;而异步IO则是由内核自己完成第二个阶段。在内核完成数据拷贝后通知用户线程,同时将数据以回调的形式传递给用户线程。显然对于同步IO而言,第二个阶段的数据拷贝是由用户线程参与完成的,故在第二个阶段会发生阻塞;而对于异步IO而言,数据拷贝由于是内核自己完成的,故在第二个阶段不会发生阻塞

网络IO模型大致可分为两类五种,其中同步IO有四种,异步IO有一种:

  1. 对于Bloking IO阻塞IO模型而言,其在Java中就是经典的BIO。其特征为阻塞同步IO
  2. 对于Non-Blocking IO非阻塞IO模型而言,由于需要用户线程不断发起系统调用,频繁地在内核空间与用户空间之间进行切换,性能较低,故很少得到应用
  3. 相比较Non-Blocking IO非阻塞IO模型,取而代之的是Multiplexing IO多路复用IO技术。其将前者频繁地轮询操作交由操作系统内核来完成,是目前高并发网络应用的主流技术手段。其在Java中对应的就是Java 1.4中引入的NIO,其特征为非阻塞同步IO
  4. 对于Signal-Driven IO信号驱动IO模型而言,由于其不适用于TCP协议,故也很少被使用
  5. 对于Asynchronous IO异步IO模型来说,其在Java中对应的就是Java 1.7中引入的AIO,其特征为非阻塞异步IO

figure 1.jpeg

BIO

在Java 1.4之前BIO是Java网络编程唯一的选择,其特点是阻塞同步。由于accept、read方法均是阻塞操作。如果没有连接请求,accept方法阻塞;如果无数据可读取,read方法阻塞。故服务端侧需要为每一个客户端连接都提供一个线程。显然在BIO模型下,如果存在大量客户端连接,势必增大服务端的压力。甚至在极端情况下服务端会由于开启的线程过多而最终宕机,故为了保险起见。最佳的实践方式是通过线程池来提供、维护与客户端通信所需的线程

figure 2.jpeg

这里给出BIO模型下的服务端编程示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* BIO下的服务端
* @author Aaron Zhu
* @date 2022-03-15
*/
public class Server {
public static void main(String[] args) {
try{
ServerSocket ss = new ServerSocket(9999);
while (true) {
// 阻塞等待客户端连接
Socket socket = ss.accept();
new MsgHandler( socket ).start();
}
}catch (IOException e) {
System.out.println("[Server]: Happen Exception: "+e.getMessage());
}
}
}

class MsgHandler extends Thread {
private Socket socket;

public MsgHandler(Socket socket) {
this.socket = socket;
}

@Override
public void run() {
try {
InputStream inputStream = socket.getInputStream();
BufferedReader bufferedReader = new BufferedReader( new InputStreamReader(inputStream));
System.out.println("[Server]: 客户端上线");
String msg;
// 阻塞等待客户端的输入
while ( (msg=bufferedReader.readLine()) != null ) {
if( "Bye".equals(msg) ) {
break;
}
System.out.println("[Server]: " + msg);
}

bufferedReader.close();
socket.close();
} catch (Exception e) {
System.out.println("[Server]: Happen Exception: "+e.getMessage());
}

System.out.println("[Server]: 客户端下线");
}
}

这里给出BIO模型下的客户端编程示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* BIO下的客户端
* @author Aaron Zhu
* @date 2022-03-15
*/
public class Client {
public static void main(String[] args) {
try (
Socket socket = new Socket("127.0.0.1", 9999);
PrintStream printStream = new PrintStream(socket.getOutputStream());
Scanner scanner = new Scanner(System.in) ) {

while (true) {
System.out.print("[Client]: 请说:");
String msg = scanner.nextLine();
if( "Bye".equals(msg) ) {
break;
}

printStream.println(msg);
printStream.flush();
}

socket.shutdownOutput();
} catch (Exception e) {
System.out.println("[Client]: Happen Exception: "+e.getMessage());
}
}
}

NIO

Java从1.4开始引入了非阻塞同步的NIO。不同于BIO,其支持面向Buffer缓冲区的、基于Channel通道的IO操作。在NIO模型中,有三个核心部分:Buffer缓冲区、Channel通道、Selector选择器

1. Buffer缓冲区

本质上是一块可以读取、写入数据的内存空间,其被包装为NIO Buffer对象。同时为了进一步方便操作,还提供了一组相应的API进行操作、管理。为了方便理解,这里通过实践的方式加强对缓冲区的认识

在Buffer中有两个最重要的游标:position、limit。前者表示下一个将要被写入或读取的元素索引,当调用get/put方法时会自动更新。其中,初始化值为0;后者表示读取、写入元素时的界限。当向Buffer写完数据、准备读取时,必须调用flip方法将其切换为可读模式。具体地,其会将buffer的limit设置为pos的值、将pos设置0,以实现从头读取。与此同时,可以通过clear方法将Buffer恢复至初始状态。具体地,将pos的值归零、limit设置为Buffer的容量值。这样即可再次写入数据以覆盖历史数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Test
public void test1() {
System.out.println("---------------Test 1 : 实例化---------------");
ByteBuffer buffer = ByteBuffer.allocate(15);
System.out.println("capacity: " + buffer.capacity());
System.out.println("position: " + buffer.position());
System.out.println("limit: " + buffer.limit());

System.out.println("---------------Test 2 : 写数据---------------");
String msg = "BobAaronTina";
buffer.put( msg.getBytes() );
System.out.println("capacity: " + buffer.capacity());
System.out.println("position: " + buffer.position());
System.out.println("limit: " + buffer.limit());

System.out.println("---------------Test 3 : 切换为可读模式---------------");
// 将buffer的limit设置为pos的值, 将pos设置0
buffer.flip();
System.out.println("capacity: " + buffer.capacity());
System.out.println("position: " + buffer.position());
System.out.println("limit: " + buffer.limit());

System.out.println("---------------Test 4 : 读取数据---------------");
byte[] bytes = new byte[3];
buffer.get( bytes );
String res = new String( bytes );
System.out.println("res : "+res);
System.out.println("capacity: " + buffer.capacity());
System.out.println("position: " + buffer.position());
System.out.println("limit: " + buffer.limit());

System.out.println("---------------Test 5 : 清空缓冲区---------------");
// 将pos的值归零、limit设置为Buffer的容量值
buffer.clear();
System.out.println("capacity: " + buffer.capacity());
System.out.println("position: " + buffer.position());
System.out.println("limit: " + buffer.limit());
}

测试结果,如下所示

figure 3.jpeg

在Buffer中,可通过mark、reset方法分别实现将当前pos值保存到mark变量中、将pos设置为mark变量的值。与此同时,还可以通过hasRemaining、remaining方法分别实现判断buffer中是否还有剩余元素、返回剩余元素的数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Test
public void test2() {
ByteBuffer buffer = ByteBuffer.allocate(15);
// 1. 写数据
String msg = "USHelloChina";
buffer.put( msg.getBytes() );
// 2. 切换为读模式
buffer.flip();

System.out.println("---------- 第一次读取 ----------");
for (int i=0;i<2;i++) {
char ch = (char) buffer.get();
System.out.println("char : " + ch);
}
System.out.println("position: " + buffer.position());
System.out.println("limit: " + buffer.limit());

System.out.println("---------- 标记位置 ----------");
// 将当前pos值保存到mark变量中
buffer.mark();

System.out.println("---------- 第二次读取 ----------");
byte[] bytes = new byte[5];
buffer.get( bytes );
String res = new String( bytes );
System.out.println("res : "+res);
System.out.println("Buffer Info: " + buffer.toString());

System.out.println("---------- 回到标记位置 ----------");
// 将pos设置为mark变量的值
buffer.reset();
System.out.println("position: " + buffer.position());
System.out.println("limit: " + buffer.limit());

System.out.println("---------- 第三次读取 ----------");
// 判断buffer中是否还有剩余元素
if ( buffer.hasRemaining() ) {
// 返回剩余元素的数量
int size = buffer.remaining();
bytes = new byte[size];
buffer.get( bytes );
res = new String( bytes );
System.out.println("size : "+size);
System.out.println("res : "+res);
System.out.println("Buffer Info: " + buffer.toString());
}

}

测试结果,如下所示

figure 4.jpeg

事实上,Buffer实例所使用的内存分为两种:堆内内存(即非直接内存)、堆外内存(即直接内存)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
public void test3() {
// 创建堆内内存的Buffer实例, 实现类为 HeapByteBuffer
Buffer buffer1 = ByteBuffer.allocate(15);
// 该Buffer是否为直接内存
boolean b1 = buffer1.isDirect();
System.out.println("是否为直接内存: " + b1);

// 创建堆外内存的Buffer实例, 实现类为DirectByteBuffer
ByteBuffer buffer2 = ByteBuffer.allocateDirect(15);
// 该Buffer是否为直接内存
boolean b2 = buffer2.isDirect();
System.out.println("是否为直接内存: " + b2);
}

测试结果,如下所示

figure 5.jpeg

2. Channel通道

相比较传统的单向流(输入流、输出流)而言,通道是双向的。既可以读数据,也可以写数据

3. Selector选择器

Selector选择器,也被称作为多路复用器,是Java NIO中最重要的部分。其可以同时管理多个通道,并确定其中的哪些通道已经准备好进行读取、写入操作。换言之,利用选择器可以实现通过一个线程管理多个通道,即管理多个客户端连接。NIO模型的示意图如下所示

figure 6.jpeg

这里给出NIO模型下的服务端编程示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* NIO下的服务端
* @author Aaron Zhu
* @date 2022-03-23
*/
public class Server {
public static void main(String[] args) throws IOException {
// 获取通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 切换为非阻塞模式
serverSocketChannel.configureBlocking(false);
// 绑定监听端口
serverSocketChannel.bind( new InetSocketAddress(8585) );

// 获取选择器Selector
Selector selector = Selector.open();
// 将通道注册到选择器中, 并开始监听客户端连接类型的事件
serverSocketChannel.register( selector, SelectionKey.OP_ACCEPT);

// 使用Selector选择器轮询已经就绪的事件
while ( selector.select()>0 ) {
// 获取迭代器以进行事件的遍历、处理
Iterator<SelectionKey> it = selector.selectedKeys().iterator();

while ( it.hasNext() ) {
// 通过迭代器获取当前事件
SelectionKey selectionKey = it.next();

// 事件类型 为 接受连接事件
if( selectionKey.isAcceptable() ) {
System.out.println("[Server]: 客户端上线");
// 获取当前连接进来的客户端通道
SocketChannel socketChannel = serverSocketChannel.accept();
// 切换为非阻塞模式
socketChannel.configureBlocking(false);
// 将该客户端通道注册到选择器中, 并开始监听读就绪类型的事件
socketChannel.register(selector, SelectionKey.OP_READ);
} else if( selectionKey.isReadable() ) { // 事件类型 为 读就绪事件
StringBuilder sb = new StringBuilder();
// 通过事件获取相应的客户端通道
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
// 创建一个Buffer实例, 用于存放从通道中读取的数据
ByteBuffer buffer = ByteBuffer.allocate(5);
int len = 0;
// 从客户端通道不停地读取数据
while ( (len=socketChannel.read(buffer)) > 0 ) {
// 切换为读模式
buffer.flip();
// 处理Buffer中的数据
String temp = new String(buffer.array(), 0, len);
sb.append( temp );
// 清除Buffer
buffer.clear();
}

// 客户端通道断开连接
if( len==-1 ) {
// 取消注册当前事件的客户端通道
selectionKey.cancel();
// 关闭客户端通道
socketChannel.close();
System.out.println("[Server]: 客户端下线");
}

System.out.println("[Server]: " + sb.toString());
}

// 事件处理完毕后,移除当前事件
it.remove();
}
}
}
}

这里给出NIO模型下的客户端编程示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* NIO下的客户端
* @author Aaron Zhu
* @date 2022-03-24
*/
public class Client {
public static void main(String[] args) throws IOException {
// 获取客户端通道
SocketChannel socketChannel = SocketChannel.open( new InetSocketAddress("127.0.0.1", 8585));
// 切换为非阻塞模式
socketChannel.configureBlocking(false);
// 创建Buffer实例
int bufferSize = 5;
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);

Scanner scanner = new Scanner(System.in);
while (true) {
System.out.print("[Client]: 请说:");
String msg = scanner.nextLine();
if( "Bye".equals(msg) ) {
System.out.println("[Client]: 客户端下线");
break;
}

byte[] bytes = msg.getBytes();
int start = 0;
// 将客户端的消息分批写入Buffer、客户端通道
while ( start < bytes.length ) {
int length = start + bufferSize <= bytes.length ? bufferSize : bytes.length-start;
// 将客户端消息部分写入Buffer
buffer.put(bytes, start, length);
// 切换为读模式
buffer.flip();
// 将客户端消息部分写入通道
socketChannel.write( buffer );
// 清除Buffer
buffer.clear();
// 更新下一次数据写入的起点
start = start + length;
}
}

// 关闭客户端通道
socketChannel.close();
}
}

AIO

Java从1.7开始引入了非阻塞异步的AIO,作为对NIO的改进、增强,故其也被称为NIO 2.0。其分别引入了服务端异步Socket通道AsynchronousServerSocketChannel、客户端异步Socket通道AsynchronousSocketChannel,前者负责服务端Socket的创建、监听;后者负责客户端消息的读写操作。同时提供了一个CompletionHandler,作为消息处理回调接口

这里给出AIO模型下的服务端编程示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* AIO下的服务端
* @author Aaron Zhu
* @date 2022-03-27
*/
public class Server {
public static void main(String[] args) throws IOException {
// 获取通道
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
// 绑定监听端口
serverSocketChannel.bind( new InetSocketAddress(9696) );
// 准备接受客户端连接请求
serverSocketChannel.accept( serverSocketChannel, new ServerHandler() );

while (true) {
}
}
}

当接受到客户端的连接请求后,我们需要提供一个相应的CompletionHandler实现类。进行业务逻辑处理。具体地,通过实现completed方法用于接受客户端请求、建立连接后的业务处理逻辑,通过实现failed方法用于进行服务端发生异常的处理逻辑。具体实现如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
* @author Aaron Zhu
* @date 2022-03-27
*/
public class ServerHandler implements CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel> {
/**
* 接受客户端请求、建立连接后, 业务处理逻辑
* @param socketChannel 客户端通道
* @param serverSocketChannel 服务端通道
*/
@Override
public void completed(AsynchronousSocketChannel socketChannel, AsynchronousServerSocketChannel serverSocketChannel) {
// 准备接受下一个客户端的连接请求
serverSocketChannel.accept(serverSocketChannel, this);

// 创建Buffer实例
ByteBuffer byteBuffer = ByteBuffer.allocate(2048);
socketChannel.read( byteBuffer, byteBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
// 客户端通道断开连接
if(result == -1) {
try {
// 关闭客户端通道
socketChannel.close();
} catch (IOException e) {
System.out.println("[Server]: happen exception, "+e.getMessage());
}
return;
}

// 切换为读模式
attachment.flip();
// 处理Buffer中的数据
String msg = new String(attachment.array(), 0, result);
// 清除Buffer
attachment.clear();
System.out.println("[Server]: " + msg);
// 准备下一次读
socketChannel.read(attachment, attachment, this);
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("[Server]: happen exception, "+exc.getMessage());
}
} );
}

/**
* 服务端发生异常的处理逻辑
* @param exc
* @param serverSocketChannel
*/
@Override
public void failed(Throwable exc, AsynchronousServerSocketChannel serverSocketChannel) {
System.out.println("[Server]: happen exception, "+exc.getMessage());
}
}

这里给出AIO模型下的客户端编程示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* AIO下的客户端
* @author Aaron Zhu
* @date 2022-03-27
*/
public class Client {
public static void main(String[] args) throws IOException {
// 获取通道
AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
// 请求连接服务端
socketChannel.connect( new InetSocketAddress("127.0.0.1", 9696));
// 创建Buffer实例
ByteBuffer buffer = ByteBuffer.allocate(1024);

Scanner scanner = new Scanner(System.in);
while (true) {
System.out.print("[Client]: 请说:");
String msg = scanner.nextLine();
if( "Bye".equals(msg) ) {
System.out.println("[Client]: 客户端下线");
break;
}

byte[] bytes = msg.getBytes();
// 将客户端消息写入Buffer
buffer.put( msg.getBytes() );
// 切换为读模式
buffer.flip();
// 将客户端消息部分写入通道
socketChannel.write( buffer );
// 清除Buffer
buffer.clear();
}

// 关闭客户端通道
socketChannel.close();
}
}

参考文献

  1. 凤凰架构 周志明著
请我喝杯咖啡捏~

欢迎关注我的微信公众号:青灯抽丝