什么是 Netty

Netty 简介#

Netty 是由 JBoss 开发,基于 Java NIO 的一个高性能通信框架。之前几篇文章介绍了 Java NIO 的一些基本的概念和 API。但在实际的网络开发中,其实很少使用 Java NIO 原生的 API。主要有以下原因:

  • 原生 API 使用单线程模型,不能很好利用多核优势,如果自己去写多线程结合起来比较麻烦;
  • 原生 API 是直接使用的 IO 数据,没有做任何封装处理,对数据的编解码、TCP 的粘包和拆包、客户端断连、网络的可靠性和安全性方面没有做处理;

在《Netty 权威指南》这本书里提到一个真实的故事,两个项目团队都要做基于 NIO 非阻塞特性的一个高性能、异步和高可靠性的底层通信框架,但一个团队选择了基于 Java NIO API 从头开发,另一个团队选择了基于 Netty 开发。最终,从头开发的团队遇到了各种各样的问题,导致项目延迟,而基于 Netty 开发的团队则进展顺利。

其实网络开发是一个比较复杂的事情,因为网络的不稳定性,通常会遇到各种各样的问题,比如前面提到的客户端突然断连、TCP 的拆包和沾包等等。

幸运的是,网络上已经有了这么一个成熟的框架帮我们处理了这些事情,这个框架就是 Netty。Netty 主要应用于以下领域:

  • 高性能的 RPC 框架:常用于微服务之间的高性能远程调用(如 Dubbo)
  • 游戏行业:Netty 可以很轻松地定制和开发一个私有协议栈,
  • 即时通讯:Netty 基于 Java NIO,并且做了一些优化,支持高性能的即时通讯

Netty 可以做什么#

这里有一个 Netty 的功能特性的图:

功能特性

Netty Core 提供了基本功能,包括文件零拷贝、基本的 API、可扩展的基于事件的模型(下文详细介绍)。

Netty 支持非常多的协议,比如 HTTP、WebSocket 等。当然,Netty 也可以自定义协议。常见协议的示例代码可以参考 netty 源码里面的 example 包。

Netty 同时支持 Java 的 BIO 和 NIO 两种方式。且很容易与 Spring 等主流框架进行集成。

Reactor 线程模型#

首先介绍处理事件的两种方式:

  • 轮询方式:线程不断轮询访问相关事件发生源有没有发生事件,有发生事件就调用事件处理逻辑。Java 原生的 NIO 就是使用的轮询方式。
  • 事件驱动方式,发生事件,主线程把事件放入事件队列,在另外线程不断循环消费事件列表中的事件,调用事件对应的处理逻辑处理事件。事件驱动方式也被称为消息通知方式,其实是设计模式中观察者模式的思路。

Reactor 是反应堆的意思。Reactor 线程模型是指通过一个或多个输入,同时传递给服务处理器的服务请求的事件驱动处理模式。

Reactor 模式主要工作原理如下图:

reactor线程模型

Reactor 有一个专门负责监听和分发事件的线程,如图中的 Service Handler,所有请求进来后,被它分发到具体的处理线程,如图中的 Event Handler 去处理。

Reactor 可能有多个,而 Netty 正是使用了多 Reactor 的线程模型。

Netty 是怎么工作的#

netty工作原理架构图

Netty 里面有两个 Group,分别是 Boss Group 和 Worker Group。其中,Boss Group 用于处于连接,Worker Group 用于处理实际的读写 IO。

Boss Group 里面的 NioEventLoop 会轮询 accept 事件,遇到有新的连接,就生成 NioSocketChannel,并把这个 Channel 注册到 Worker Group 的 Selector 上。

Worker Group 轮询 read 和 write 事件,在可读或者可写条件满足时,就进行处理。

Worker Group 和 Boss Group 都是通过里面的 NioEventLoop 来操作的。NioEventLoop 中维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用 NioEventLoop 的 run 方法。

最后都会执行一个 runAllTasks 方法,它用于处理任务队列中的任务。任务队列中的任务包括用户调用 eventloop.execute 或 schedule 执行的任务,或者其他线程提交到该 eventloop 的任务。

示例代码#

以下是使用 Netty 创建一个 Server 的示例代码。Client 也可以使用 Netty 来创建,也可以使用之前文章里面的 NIO 示例代码。这里就不展示基于 Netty 的 Client 端的代码,只展示 Server 端的代码了。

public class Server {

    private final static int PORT = 8080;

    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChildChannelHandler());

            ChannelFuture f = b.bind(PORT).sync();
            System.out.println(Thread.currentThread().getName() +
                    ",服务器开始监听端口,等待客户端连接.........");

            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }


    private static class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            socketChannel.pipeline().addLast(new ServerHandler());
        }
    }

    static class ServerHandler extends ChannelInboundHandlerAdapter {

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            byte[] reg = new byte[buf.readableBytes()];
            buf.readBytes(reg);
            String body = new String(reg, StandardCharsets.UTF_8);
            System.out.println(Thread.currentThread().getName() +
                    ",The server receive  order : " + body);

            String respMsg = "I am Server,消息接收 success!";
            ByteBuf respByteBuf = Unpooled.copiedBuffer(respMsg.getBytes());
            ctx.write(respByteBuf);
        }
    }
}
预览

作者:公众号_xy 的技术圈

链接:www.imooc.com/article/289251

来源:慕课网

本作品采用《CC 协议》,转载必须注明作者和本文链接