Netty学习

一、Buffer(缓冲区)

IO 面向流(Stream oriented),NIO 面向缓冲区(Buffer oriented)。

Buffer 是一个对象,它包含一些要写入或者要读出的数据。在面向流的 I/O 中可以将数据直接写入或者将数据直接读到 Stream 对象中。虽然 Stream 中也有 Buffer 开头的扩展类,但只是流的包装类,还是从流读到缓冲区,而 NIO 却是直接读到 Buffer 中进行操作。

在 NIO 厍中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的;在写入数据时,写入到缓冲区中。任何时候访问 NIO 中的数据,都是通过缓冲区进行操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public abstract class Buffer {
// 关系: mark <= position <= limit <= capacity
// 标记字节内容中的某个元素,配合reset()方法可以从这个标记的位置反复读取内容
private int mark = -1;
// 下一个要被读写的byte元素的下标索引
private int position = 0;
// 缓冲区中第一个不能读写的元素的数组下标索引,也可以认为是缓冲区中实际元素的数量
private int limit;
// 缓冲区能够容纳元素的最大数量,这个值在缓冲区创建时被设定,而且不能够改变
private int capacity;
long address; // Used only by direct buffers,直接内存的地址

// ……
}

1.1 ByteBuffer

最常用的缓冲区是 ByteBuffer,一个 ByteBuffer 提供了一组功能用于操作 byte 数组。

1
2
3
4
5
6
public abstract class ByteBuffer extends Buffer implements Comparable<ByteBuffer> {
// 仅限堆内内存使用
final byte[] hb;
final int offset;
boolean isReadOnly;
}

1.2 ByteBuffer.API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 申请堆外内存(直接内存,少一次拷贝,读写效率高,分配效率低,不受 GC 影响)
public static ByteBuffer allocateDirect(int capacity)
// 申请堆内内存(读写效率低,分配效率高,受到 GC 的影响)
public static ByteBuffer allocate(int capacity)
// 原始字节包装成ByteBuffer
public static ByteBuffer wrap(byte[] array, int offset, int length)
// 原始字节包装成ByteBuffer
public static ByteBuffer wrap(byte[] array)
// 创建共享此缓冲区内容的新字节缓冲区
public abstract ByteBuffer duplicate()
// 分片,创建一个新的字节缓冲区
// 新ByteBuffer的开始位置是此缓冲区的当前位置position
public abstract ByteBuffer slice()
// 获取字节内容
public abstract byte get()
// 从ByteBuffer偏移offset的位置,获取length长的字节数组,然后返回当前ByteBuffer对象
public ByteBuffer get(byte[] dst, int offset, int length)
// 设置byte内存
public abstract ByteBuffer put(byte b);
// 以offset为起始位置设置length长src的内容,并返回当前ByteBuffer对象
public ByteBuffer put(byte[] src, int offset, int length);
// 将没有读完的数据移到到缓冲区的初始位置,position设置为最后一没读字节数据的下个索引,limit重置为capacity
// 读->写模式,相当于flip的反向操作
public abstract ByteBuffer compact();
// 是否是直接内存
public abstract boolean isDirect();
// 转换缓冲区为写模式
public ByteBuffer clear();
  1. 创建大小为 10 的 ByteBuffer 对象
1
ByteBuffer bf = ByteBuffer.allocate(10);

  1. 写入数据
1
2
ByteBuffer buf = ByteBuffer.allocate(10);
buf.put("csc".getBytes());

  1. 调用 flip 转换缓冲区为读模式
1
buf.flip();

  1. 读取缓冲区中到内容,buf.get()
1
System.out.println((char) buf.get());

  1. 调用 clear() 或 compact() 转换缓冲区为写模式,循环至步骤 1

除了 ByteBuffer,还有其他的一些缓冲区,事实上,每一种 Java 基本类型(除了 Boolean 类型)都对应有一种缓冲区。

1.3 Scattering Reads(分散读取)

需要在已知所需文本信息长度的前提下,将文本分割后,分散填充至不同的 buffer 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 读取一个文本文件 HelloWord.txt,然后将数据填充至多个 buffer
try (RandomAccessFile file = new RandomAccessFile("HelloWorld.txt", "rw")) {
FileChannel channel = file.getChannel();
ByteBuffer bba = ByteBuffer.allocate(3);
ByteBuffer bbb = ByteBuffer.allocate(3);
ByteBuffer bbc = ByteBuffer.allocate(5);
channel.read(new ByteBuffer[]{bba, bbb, bbc});
bba.flip();
bbb.flip();
bbc.flip();
debugAll(bba);
debugAll(bbb);
debugAll(bbc);
} catch (IOException e) {
throw new RuntimeException(e);
}

1.4 Gatering Writes(集中写入)

1
2
3
4
5
6
7
8
9
ByteBuffer bba = StandardCharsets.UTF_8.encode("hello");
ByteBuffer bbb = StandardCharsets.UTF_8.encode("world");
ByteBuffer bbc = StandardCharsets.UTF_8.encode("你好");

try (FileChannel file = new RandomAccessFile("HelloWorld.txt", "rw").getChannel()) {
file.write(new ByteBuffer[]{bba, bbb, bbc});
} catch (IOException e) {
throw new RuntimeException(e);
}

1.5 综合案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public static void main(String[] args) throws UnsupportedEncodingException {
ByteBuffer buffer = ByteBuffer.allocate(10);
buffer.put(new byte[]{97, 98, 99, 100});

/*
网络上有多条数据发送给服务器,数据之间使用 \n 进行分离
但由于某种原因这些数据在接收时,被进行了重新组合,例如原始数据有3条为
Hello,world\n
I'm zhangsan\n
How are you?\n
变成了下面的两个 byteBuffer(黏包,半包)
Hello,world\nI'm zhangsan\nHo
w are you?\n
现在要求变写程序,将错乱的数据恢复成原始的按 \n 分隔的数据
*/
ByteBuffer source = ByteBuffer.allocate(64);
source.put("Hello,world\nI'm zhangsan\nHo".getBytes());
split(source);
source.put("w are you?\n".getBytes());
split(source);
}

private static void split(ByteBuffer source) {
source.flip();
for (int i = 0; i < source.limit(); i++) {
// 找到一条完整的消息
if (new String(new byte[]{source.get(i)}).equals("\n")) {
int length = i + 1 - source.position();
ByteBuffer target = ByteBuffer.allocate(length);
for (int j = 0; j < length; j++) {
target.put(source.get());
}
debugAll(target);
}
}
source.compact();
}

二、Channel(通道)

NIO 通过 Channel(通道)进行读写。

通道是双向的,可读也可写,而流的读写是单向的。

无论读写,通道只能和 Buffer 交互。因为 Buffer,通道可以异步地读写。

2.1 FileChannel

FileChannel 只能工作在阻塞模式下

获取

不能直接打开 FileChannel,必须通过 FileInputStream、FileOutputStream 或者 RandomAccessFile 来获取 FileChannel,它们都有 getChannel 方法。

  • 通过 FileInputStream 获取的 channel 只能读
  • 通过 FileOutputStream 获取的 channel 只能写
  • 通过 RandomAccessFile 获取的 channel 是否能读写,根据构造时的传入读写模式参数决定

读取

会从 channel 读取数据填充 ByteBuffer,返回值表示读到了多少字节,-1 表示到达了文件的末尾

1
int readBytes = channel.read(buffer);

写入

SocketChannel

1
2
3
4
5
6
7
8
ByteBuffer buffer = ...;
buffer.put(...); // 存入数据
buffer.flip(); // 切换读模式

// 检测 buffer 中还有没有剩余数据
while(buffer.hasRemaining) {
channel.write(buffer);
}

在 while 中调用 channel.write 是因为 write 方法并不能保证一次将 buffer 中的内容全部写入 channel

关闭

channel 必须关闭,不过调用了 FileInputStream、FileOutputStream 或者 RandomAccessFile 的 close 方法会间接地调用 channel 的 close 方法。

位置

1
2
3
4
5
6
// 获取当前位置
long pos = channel.position();

// 设置当前位置
long newPos = ...;
channel.position(newPos);

设置当前位置时,如果设置为文件的末尾

  • 这时读取会返回 -1
  • 这时写入,会追加内容,但要注意如果 position 超过了文件末尾,再写入时在新内容和原末尾之间会有空洞(00)

三、Selector(选择器)

NIO 有选择器,而 IO 没有。

选择器满足了单个线程处理多个通道的效果,因此大大减少了线程的数量。

因为线程之间的切换对于操作系统来说是昂贵的(每次切换线程都要重新读取线程之前的状态、值、上下文等信息),因此选择器提高了系统的效率。

三、一些关键类

ServerBootstrap

Netty 建立服务端的辅助类。

Channel

频道、通道的意思。

是传入(入站)或者传出(出站)数据的载体。

可以被打开或者被关闭,连接或者断开连接。

在内部会为每个 Channel 分配一个 EventLoop,用以处理所有事件。

EventLoop

EventLoopGroup

是一组 EventLoop

Future

ChannelHandle

通道处理者,是所有处理入站和出站数据的应用程序逻辑的容器。

ChannelHandlerAdapter

ChannelInboundHandlerAdapter

Netty 接收数据 Handler 处理器。





  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2022-2024 Liangxj
  • 访问人数: | 浏览次数: