跳过导航

4.x 用户指南

您知道此页面是根据 Github Wiki 页面 自动生成的,对吗?您可以在 此处 自行改进它!

前言

问题

如今,我们使用通用应用程序或库进行相互通信。例如,我们经常使用 HTTP 客户端库从 Web 服务器检索信息,并通过 Web 服务调用远程过程调用。但是,通用协议或其实现有时无法很好地扩展。这就像我们不使用通用 HTTP 服务器交换大文件、电子邮件消息以及近实时消息(例如财务信息和多人游戏数据)一样。需要的是专门用于特殊用途的高度优化的协议实现。例如,您可能希望实现针对基于 AJAX 的聊天应用程序、媒体流或大文件传输进行优化的 HTTP 服务器。您甚至可以设计并实现完全根据您的需要定制的全新协议。另一个不可避免的情况是,当您必须处理旧版专有协议以确保与旧系统的互操作性时。在这种情况下,重要的是我们如何在不牺牲最终应用程序的稳定性和性能的情况下快速实现该协议。

解决方案

Netty 项目旨在提供异步事件驱动的网络应用程序框架和工具,用于快速开发可维护的高性能和高可扩展性协议服务器和客户端。

换句话说,Netty 是一个 NIO 客户端服务器框架,可以快速轻松地开发网络应用程序,例如协议服务器和客户端。它极大地简化和精简了网络编程,例如 TCP 和 UDP 套接字服务器开发。

“快速而轻松”并不意味着最终应用程序将遭受可维护性或性能问题。Netty 经过精心设计,吸取了从 FTP、SMTP、HTTP 以及各种基于二进制和文本的旧版协议的实现中学到的经验。因此,Netty 成功地找到了一种方法来实现易于开发、性能、稳定性和灵活性,而不会妥协。

一些用户可能已经找到了其他声称具有相同优势的网络应用程序框架,您可能想知道是什么让 Netty 与它们如此不同。答案是它所基于的理念。Netty 的设计从第一天起就为您提供最舒适的体验,无论是在 API 还是在实现方面。这不是什么有形的东西,但您会意识到,当您阅读本指南并使用 Netty 时,这一理念将使您的生活变得更加轻松。

入门

本章围绕 Netty 的核心结构进行介绍,并提供简单的示例,让您快速入门。当您完成本章时,您将能够在 Netty 之上编写客户端和服务器。

如果您更喜欢自上而下的学习方式,您可能希望从 第 2 章,架构概述 开始,然后返回此处。

开始之前

在本教程中运行示例的最低要求只有两个:最新版本的 Netty 和 JDK 1.6 或更高版本。最新版本的 Netty 可在 项目下载页面 中找到。要下载正确的 JDK 版本,请参阅您首选的 JDK 供应商网站。

在阅读过程中,您可能对本章中介绍的类有更多疑问。每当您想了解更多信息时,请参阅 API 参考。本文档中的所有类名都链接到在线 API 参考,以方便您使用。此外,请不要犹豫,联系 Netty 项目社区,让我们知道是否存在任何不正确的信息、语法错误或拼写错误,以及您是否有任何改进文档的建议。

编写丢弃服务器

世界上最简单的协议不是“Hello, World!”而是 DISCARD。这是一个丢弃任何接收到的数据而不进行任何响应的协议。

要实现 DISCARD 协议,您需要做的就是忽略所有接收到的数据。让我们直接从处理程序实现开始,该实现处理 Netty 生成的 I/O 事件。

package io.netty.example.discard;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * Handles a server-side channel.
 */
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
        // Discard the received data silently.
        ((ByteBuf) msg).release(); // (3)
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}
  1. DiscardServerHandler 扩展了 ChannelInboundHandlerAdapter,这是 ChannelInboundHandler 的实现。ChannelInboundHandler 提供了您可以覆盖的各种事件处理程序方法。现在,仅扩展 ChannelInboundHandlerAdapter 就足够了,而无需自己实现处理程序接口。
  2. 我们在此覆盖 channelRead() 事件处理程序方法。每当从客户端接收到新数据时,都会使用接收到的消息调用此方法。在此示例中,接收到的消息的类型为 ByteBuf
  3. 要实现 DISCARD 协议,处理程序必须忽略接收到的消息。ByteBuf 是一个引用计数对象,必须通过 release() 方法显式释放。请记住,释放传递给处理程序的任何引用计数对象是处理程序的责任。通常,channelRead() 处理程序方法的实现如下
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    try {
        // Do something with msg
    } finally {
        ReferenceCountUtil.release(msg);
    }
}
  1. 当 Netty 因 I/O 错误或处理事件时抛出异常而导致异常时,exceptionCaught() 事件处理程序方法将使用 Throwable 调用。在大多数情况下,应记录捕获的异常并在此处关闭其关联的通道,但此方法的实现可能有所不同,具体取决于你要如何处理异常情况。例如,你可能希望在关闭连接之前发送带有错误代码的响应消息。

到目前为止都很好。我们已经实现了 DISCARD 服务器的前半部分。现在剩下的就是编写使用 DiscardServerHandler 启动服务器的 main() 方法。

package io.netty.example.discard;
    
import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
    
/**
 * Discards any incoming data.
 */
public class DiscardServer {
    
    private int port;
    
    public DiscardServer(int port) {
        this.port = port;
    }
    
    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new DiscardServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
    
            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)
    
            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
    
    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }

        new DiscardServer(port).run();
    }
}
  1. NioEventLoopGroup 是处理 I/O 操作的多线程事件循环。Netty 为不同类型的传输提供了各种 EventLoopGroup 实现。在此示例中,我们正在实现一个服务器端应用程序,因此将使用两个 NioEventLoopGroup。第一个通常称为“boss”,它接受传入的连接。第二个通常称为“worker”,它在 boss 接受连接并将接受的连接注册到 worker 后处理已接受连接的流量。使用多少个线程以及如何将它们映射到创建的 Channel 取决于 EventLoopGroup 实现,甚至可以通过构造函数进行配置。
  2. ServerBootstrap 是一个用于设置服务器的帮助程序类。你可以直接使用 Channel 设置服务器。但是,请注意,这是一个繁琐的过程,在大多数情况下你不需要这样做。
  3. 在此,我们指定使用 NioServerSocketChannel 类,该类用于实例化新的 Channel 以接受传入的连接。
  4. 此处指定的处理程序将始终由新接受的 Channel 评估。 ChannelInitializer 是一种特殊处理程序,其目的是帮助用户配置新的 Channel。您很可能希望通过添加一些处理程序(例如 DiscardServerHandler)来配置新 ChannelChannelPipeline,以实现您的网络应用程序。随着应用程序变得复杂,您很可能会向管道添加更多处理程序,并最终将此匿名类提取到顶级类中。
  5. 您还可以设置特定于 Channel 实现的参数。我们正在编写 TCP/IP 服务器,因此我们被允许设置套接字选项,例如 tcpNoDelaykeepAlive。请参阅 ChannelOption 的 apidocs 和特定的 ChannelConfig 实现,以获取有关受支持 ChannelOption 的概述。
  6. 您注意到 option()childOption() 了吗? option() 适用于接受传入连接的 NioServerSocketChannelchildOption() 适用于父 ServerChannel 接受的 Channel,在本例中为 NioSocketChannel
  7. 我们现在可以开始了。剩下的就是绑定到端口并启动服务器。在这里,我们绑定到机器中所有 NIC(网络接口卡)的端口 8080。您现在可以多次调用 bind() 方法(使用不同的绑定地址)。

恭喜!您刚刚在 Netty 之上完成了您的第一个服务器。

查看接收到的数据

既然我们已经编写了我们的第一个服务器,我们需要测试它是否真的有效。测试它的最简单方法是使用 telnet 命令。例如,您可以在命令行中输入 telnet localhost 8080 并键入一些内容。

然而,我们可以说服务器工作正常吗?我们无法真正知道,因为它是一个丢弃服务器。你根本不会得到任何响应。为了证明它确实在工作,让我们修改服务器以打印它接收到的内容。

我们已经知道,每当收到数据时,都会调用channelRead()方法。让我们将一些代码放入DiscardServerHandlerchannelRead()方法中

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ByteBuf in = (ByteBuf) msg;
    try {
        while (in.isReadable()) { // (1)
            System.out.print((char) in.readByte());
            System.out.flush();
        }
    } finally {
        ReferenceCountUtil.release(msg); // (2)
    }
}
  1. 这个效率低下的循环实际上可以简化为:System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII))
  2. 或者,你可以在此处执行in.release()

如果你再次运行telnet命令,你将看到服务器打印它接收到的内容。

丢弃服务器的完整源代码位于发行版的io.netty.example.discard包中。

编写回显服务器

到目前为止,我们一直在消费数据,而根本没有响应。然而,服务器通常应该对请求做出响应。让我们学习如何通过实现ECHO协议来向客户端编写响应消息,在该协议中,任何接收到的数据都会被发送回去。

与我们在前面部分中实现的丢弃服务器唯一的区别在于,它将接收到的数据发回,而不是将接收到的数据打印到控制台。因此,再次修改channelRead()方法就足够了

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.write(msg); // (1)
        ctx.flush(); // (2)
    }
  1. 一个ChannelHandlerContext对象提供了各种操作,使你能够触发各种I/O事件和操作。在这里,我们调用write(Object)逐字写入接收到的消息。请注意,与我们在DISCARD示例中所做不同,我们没有释放接收到的消息。这是因为Netty在将消息写入线路时会为你释放它。
  2. ctx.write(Object)不会将消息写入线路。它在内部进行缓冲,然后由ctx.flush()将其刷新到线路。或者,你可以调用ctx.writeAndFlush(msg)以简化操作。

如果你再次运行telnet命令,你将看到服务器发回你发送给它的任何内容。

回显服务器的完整源代码位于发行版的io.netty.example.echo包中。

编写时间服务器

本节中要实现的协议是TIME协议。它与前面的示例不同,因为它发送一个包含32位整数的消息,而不接收任何请求,并在发送消息后关闭连接。在本示例中,你将学习如何构建和发送消息,以及如何在完成时关闭连接。

由于我们要忽略任何接收到的数据,但要在建立连接后立即发送消息,因此这次不能使用 channelRead() 方法。相反,我们应该覆盖 channelActive() 方法。以下是实现

package io.netty.example.time;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
        
        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 如前所述,channelActive() 方法将在建立连接并准备好生成流量时调用。让我们在此方法中编写一个表示当前时间的 32 位整数。

  2. 要发送新消息,我们需要分配一个包含消息的新缓冲区。我们将编写一个 32 位整数,因此我们需要一个容量至少为 4 字节的 ByteBuf。通过 ChannelHandlerContext.alloc() 获取当前的 ByteBufAllocator 并分配一个新缓冲区。

  3. 与往常一样,我们编写构造的消息。

    但是,等等,反转在哪里?我们以前不是在 NIO 中发送消息之前调用 java.nio.ByteBuffer.flip() 吗?ByteBuf 没有这样的方法,因为它有两个指针;一个用于读取操作,另一个用于写入操作。当您向 ByteBuf 写入内容时,写入器索引会增加,而读取器索引不会改变。读取器索引和写入器索引分别表示消息的开始和结束位置。

    相比之下,NIO 缓冲区没有提供一种干净的方法来找出消息内容的开始和结束位置,而不调用反转方法。当您忘记翻转缓冲区时,您将遇到麻烦,因为没有数据或不正确的数据将被发送。在 Netty 中不会发生这样的错误,因为我们针对不同的操作类型有不同的指针。您会发现随着您习惯它,它会让您的生活变得更加轻松——一种无需翻转的生活!

    需要注意的另一件事是 ChannelHandlerContext.write()(和 writeAndFlush())方法返回一个 ChannelFuture。一个 ChannelFuture 表示尚未发生的 I/O 操作。这意味着任何请求的操作可能尚未执行,因为 Netty 中的所有操作都是异步的。例如,以下代码甚至可能在发送消息之前关闭连接

    Channel ch = ...;
    ch.writeAndFlush(message);
    ch.close();

    因此,你需要在 ChannelFuture 完成后调用 close() 方法,该方法由 write() 方法返回,并且当写入操作完成后它会通知其侦听器。请注意,close() 也可能不会立即关闭连接,并且它会返回一个 ChannelFuture

  4. 那么我们如何得知写入请求何时完成?这就像向返回的 ChannelFuture 添加一个 ChannelFutureListener 一样简单。在这里,我们创建了一个新的匿名 ChannelFutureListener,它会在操作完成后关闭 Channel

    或者,你可以使用预定义的侦听器简化代码

    f.addListener(ChannelFutureListener.CLOSE);

要测试我们的时间服务器是否按预期工作,你可以使用 UNIX rdate 命令

$ rdate -o <port> -p <host>

其中 <port> 是你在 main() 方法中指定的端口号,<host> 通常是 localhost

编写时间客户端

DISCARDECHO 服务器不同,我们需要一个 TIME 协议客户端,因为人类无法将 32 位二进制数据转换为日历上的日期。在本节中,我们将讨论如何确保服务器正常工作,以及如何使用 Netty 编写客户端。

Netty 中服务器和客户端之间最大也是唯一的区别在于使用了不同的 BootstrapChannel 实现。请查看以下代码

package io.netty.example.time;

public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            
            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
  1. Bootstrap 类似于 ServerBootstrap,只是它适用于非服务器通道,例如客户端或无连接通道。
  2. 如果您只指定一个 EventLoopGroup,它将同时用作 boss 组和 worker 组。不过,boss worker 不用于客户端。
  3. 使用 NioSocketChannel 而不是 NioServerSocketChannel 来创建客户端 Channel
  4. 请注意,我们在此处不使用 childOption(),这与使用 ServerBootstrap 时不同,因为客户端 SocketChannel 没有父级。
  5. 我们应该调用 connect() 方法而不是 bind() 方法。

如您所见,它与服务器端代码并没有什么不同。那么 ChannelHandler 实现呢?它应该从服务器接收一个 32 位整数,将其转换为人类可读的格式,打印转换后的时间,然后关闭连接

package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg; // (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 在 TCP/IP 中,Netty 将从对等方发送的数据读入 ByteBuf

它看起来非常简单,与服务器端示例没有任何不同。但是,此处理程序有时会拒绝工作,引发 IndexOutOfBoundsException。我们将在下一节讨论为什么会发生这种情况。

处理基于流的传输

套接字缓冲区的一个小警告

在基于流的传输(例如 TCP/IP)中,接收到的数据存储在套接字接收缓冲区中。不幸的是,基于流的传输的缓冲区不是数据包队列,而是字节队列。这意味着,即使您将两条消息作为两个独立的数据包发送,操作系统也不会将它们视为两条消息,而只会将它们视为一堆字节。因此,无法保证您读取的内容与远程对等方写入的内容完全相同。例如,让我们假设操作系统的 TCP/IP 协议栈收到了三个数据包

Three packets received as they were sent

由于基于流的协议的这种一般特性,您的应用程序中以以下片段形式读取它们的可能性很高

Three packets split and merged into four buffers

因此,接收方,无论它是服务器端还是客户端,都应该将接收到的数据碎片化成一个或多个有意义的帧,以便应用程序逻辑可以轻松理解。在上述示例中,接收到的数据应该像下面这样进行帧化

Four buffers defragged into three

第一个解决方案

现在让我们回到TIME客户端示例。我们在这里遇到了同样的问题。32 位整数的数据量非常小,而且不太可能经常被碎片化。然而,问题在于它可以被碎片化,并且随着流量的增加,碎片化的可能性也会增加。

简单的解决方案是创建一个内部累积缓冲区,并等到所有 4 个字节都接收进入内部缓冲区。以下是修正了该问题的修改后的 TimeClientHandler 实现

package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    private ByteBuf buf;
    
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        buf = ctx.alloc().buffer(4); // (1)
    }
    
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        buf.release(); // (1)
        buf = null;
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m); // (2)
        m.release();
        
        if (buf.readableBytes() >= 4) { // (3)
            long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 一个 ChannelHandler 有两个生命周期侦听器方法:handlerAdded()handlerRemoved()。只要它不会长时间阻塞,你就可以执行任意(解)初始化任务。
  2. 首先,所有接收到的数据都应该累积到 buf 中。
  3. 然后,处理程序必须检查 buf 是否有足够的数据,在本例中为 4 个字节,并继续执行实际业务逻辑。否则,当更多数据到达时,Netty 将再次调用 channelRead() 方法,最终所有 4 个字节都将累积起来。

第二个解决方案

虽然第一个解决方案解决了 TIME 客户端的问题,但修改后的处理程序看起来并不那么干净。想象一个更复杂的协议,它由多个字段组成,例如可变长度字段。你的 ChannelInboundHandler 实现将很快变得难以维护。

你可能已经注意到,你可以向一个 ChannelPipeline 添加多个 ChannelHandler,因此,你可以将一个整体的 ChannelHandler 拆分为多个模块化的处理程序,以降低应用程序的复杂性。例如,你可以将 TimeClientHandler 拆分为两个处理程序

  • 处理碎片化问题的 TimeDecoder,以及
  • 最初的简单版本的 TimeClientHandler

幸运的是,Netty 提供了一个可扩展的类,它可以帮助你编写第一个类

package io.netty.example.time;

public class TimeDecoder extends ByteToMessageDecoder { // (1)
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }
        
        out.add(in.readBytes(4)); // (4)
    }
}
  1. ByteToMessageDecoderChannelInboundHandler 的实现,它可以轻松处理碎片问题。
  2. ByteToMessageDecoder 在接收到新数据时,会使用内部维护的累积缓冲区调用 decode() 方法。
  3. 当累积缓冲区中没有足够数据时,decode() 可以决定不向 out 添加任何内容。当接收到更多数据时,ByteToMessageDecoder 将再次调用 decode()
  4. 如果 decode()out 添加了一个对象,则表示解码器已成功解码一条消息。 ByteToMessageDecoder 将丢弃累积缓冲区的已读部分。请记住,您无需解码多条消息。 ByteToMessageDecoder 将持续调用 decode() 方法,直到它不再向 out 添加任何内容。

现在我们有另一个要插入到 ChannelPipeline 中的处理程序,我们应该修改 TimeClient 中的 ChannelInitializer 实现

b.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
    }
});

如果您是一个喜欢冒险的人,您可能想尝试 ReplayingDecoder,它可以进一步简化解码器。不过,您需要查阅 API 参考了解更多信息。

public class TimeDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(
            ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        out.add(in.readBytes(4));
    }
}

此外,Netty 提供了开箱即用的解码器,使您可以非常轻松地实现大多数协议,并帮助您避免最终得到一个难以维护的单一处理程序实现。有关更多详细示例,请参阅以下包

使用 POJO 而不是 ByteBuf

到目前为止,我们回顾的所有示例都使用 ByteBuf 作为协议消息的主要数据结构。在本节中,我们将改进 TIME 协议客户端和服务器示例,以使用 POJO 而不是 ByteBuf

ChannelHandler 中使用 POJO 的优势显而易见;通过将从 ByteBuf 中提取信息的代码从处理程序中分离出来,处理程序变得更易于维护和重复使用。在 TIME 客户端和服务器示例中,我们只读取一个 32 位整数,直接使用 ByteBuf 并不是什么大问题。但是,在实现真实世界的协议时,你会发现有必要进行分离。

首先,让我们定义一个名为 UnixTime 的新类型。

package io.netty.example.time;

import java.util.Date;

public class UnixTime {

    private final long value;
    
    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }
    
    public UnixTime(long value) {
        this.value = value;
    }
        
    public long value() {
        return value;
    }
        
    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}

现在,我们可以修改 TimeDecoder 以生成 UnixTime,而不是 ByteBuf

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    if (in.readableBytes() < 4) {
        return;
    }

    out.add(new UnixTime(in.readUnsignedInt()));
}

使用更新的解码器后,TimeClientHandler 不再使用 ByteBuf

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    UnixTime m = (UnixTime) msg;
    System.out.println(m);
    ctx.close();
}

更简单、更优雅,对吧?相同的技术可以应用于服务器端。让我们这次先更新 TimeServerHandler

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ChannelFuture f = ctx.writeAndFlush(new UnixTime());
    f.addListener(ChannelFutureListener.CLOSE);
}

现在,唯一缺少的部分是一个编码器,它是 ChannelOutboundHandler 的实现,它将 UnixTime 转换回 ByteBuf。它比编写解码器简单得多,因为在对消息进行编码时无需处理数据包分段和组装。

package io.netty.example.time;

public class TimeEncoder extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        UnixTime m = (UnixTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt((int)m.value());
        ctx.write(encoded, promise); // (1)
    }
}
  1. 这单行代码中有一些重要内容。

    首先,我们原样传递原始 ChannelPromise,以便当编码数据实际写入线路时,Netty 将其标记为成功或失败。

    其次,我们没有调用 ctx.flush()。有一个单独的处理程序方法 void flush(ChannelHandlerContext ctx),其目的是覆盖 flush() 操作。

为了进一步简化,你可以使用 MessageToByteEncoder

public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
        out.writeInt((int)msg.value());
    }
}

剩下的最后一项任务是在服务器端的 ChannelPipeline 中插入一个 TimeEncoder,位于 TimeServerHandler 之前,并且将其留作一个简单的练习。

关闭应用程序

关闭 Netty 应用程序通常与通过 shutdownGracefully() 关闭所有你创建的 EventLoopGroup 一样简单。它返回一个 Future,在 EventLoopGroup 完全终止并且属于该组的所有 Channel 都已关闭时通知你。

摘要

在本章中,我们对 Netty 进行了快速浏览,演示了如何在 Netty 之上编写一个完全可用的网络应用程序。

即将到来的章节中将提供有关 Netty 的更详细信息。我们还鼓励您查看 io.netty.example 包中的 Netty 示例。

另请注意,社区始终等待您的问题和想法,以帮助您并根据您的反馈不断改进 Netty 及其文档。

上次检索时间为 2024 年 7 月 19 日