多路复用

什么是多路复用

理解完 NIO 模型,我们可能会注意到,在 Linux 中,万物皆文件,而文件的话就得要操作到磁盘,而操作磁盘则需要用户态和内核态的切换。在 NIO 模式下,我们不断的去判断连接是否有数据传过来,这里我们就得操作到磁盘。我们在轮询的过程中不断地切换内核态和用户态,这是非常浪费资源的。

于是,我们想,有没有一种方法可以不用到内核态和用户态的切换。

我们可以把轮询的过程交由内核去完成,当内核监听到有连接收到数据时,再告诉我们,从而我们通过线程去处理它。把轮询交由内核,用户态只需收到结果,这个过程就叫多路复用。

select

select 和 poll 跟我们原先的思路一样,只是让内核去执行轮询过程,当内核知道连接有数据时,再通知用户线程。

select 是基于 bitmap(即位表)来实现的,所以它的长度会受到一定的限制,最大为 1024,有三个 bitmap,分别记录读状态,写状态,异常状态。

poll

poll 用一个结构体对数据进行封装,不需要三个 bitmap 来分别记录状态,而是通过一个 status 属性来表示。然后通过可变数组的形式来管理这些结构体,解决了 select 长度不足的问题。但它们两个都是将文件描述符放到数组中,然后通过内核不断去轮询,判断其状态是否发生改变,若改变则通过回调的方式通知用户线程。

epoll

使用 select 和 poll 时,我们发现了以下三个问题:

  • 如何突破文件描述符数量的限制(poll 已经解决)
  • 如何避免用户态和内核态对文件描述符集合的拷贝
  • 如何避免线性遍历文件描述符集合

针对第一点,epoll 采用了红黑树的结构存储文件描述符,这样来在性能上与 poll 的链表相比有了很大的提升。对于第二点,epoll 只开辟了一个空间,即只用内核来存放文件描述符,用户态也是操作这一份空间。

对于第三点,epoll 新增了一个 socket 就绪链表和回调函数。当连接(socket) 有数据时,会自动触发回调函数,将就绪的 socket 放到链表中,同时告诉用户态过来处理。这样的话内核就不用一直去轮询符号表了,实现了 O(1) 的复杂度。

代码演示

服务端:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class EpollServer {
    public void static main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();   // 监听连接的线程组
        EventLoopGroup workerGroup = new NioEventLoopGroup();  // 处理连接的线程组
        
        try {
            // 1. 启动器,负责组装 netty 组件
            new ServerBootstrap()
                // 2. 添加线程组
                .group(bossGroup, workerGroup)
                // 3. 选择基于 NIO 的服务端 channel 实现
                .channel(NioServerSocketChannel.class)
                // 4. 处理流程
                .childHandler(
                    // 5. channel 代表和客户端进行数据读写的通道 Initializer 初始化,负责添加别的 handler
                    new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            // 6. 添加具体 handler
                            ch.pipeline().addLast(new StringDecoder());  // 将 bytebuf 转为字符串
                            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {  // 自定义handler
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                // 打印上一步转化好的字符串
                                System.out.println(msg);
                                super.channelRead(ctx, msg);
                            }
                        });
                    }
                })
                .bind(8080)   // 7. 绑定监听端口
                .channel().closeFuture().sync();   // 等待服务端关闭
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

客户端:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class EpollClient {
    public static void main(String[] args) throws InterruptedException {
        // 1. 启动类
        new Bootstrap()
                // 2. 添加 eventLoop
                .group(new NioEventLoopGroup())
                // 3. 选择客户端 channel 实现
                .channel(NioSocketChannel.class)
                // 4. 添加处理器
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override  // 在连接建立后被调用
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        nioSocketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                // 5. 连接到服务器
                .connect(new InetSocketAddress("localhost", 8080))
                .sync()   // 阻塞方法,直到连接建立
                .channel()   // 代表连接对象
                // 6. 向服务器发送数据
                .writeAndFlush("hello, world");   // 发送数据
    }
}
updatedupdated2022-06-232022-06-23