跳过导航

5.x 用户指南

您知道此页面是通过 Github Wiki 页面 自动生成的。您可以在 此处 自行改进!

第三方翻译

前言

问题

如今,我们使用通用应用程序或库相互通信。例如,我们经常使用 HTTP 客户端库从 Web 服务器检索信息,并通过 Web 服务调用远程过程调用。

然而,通用协议或其实现有时无法很好地扩展。就像我们不会使用通用 HTTP 服务器来交换大文件、电子邮件消息以及接近实时消息(例如财务信息和多人游戏数据)。需要的是专门用于特殊用途的高度优化的协议实现。例如,您可能希望实现一个针对基于 AJAX 的聊天应用程序、媒体流或大文件传输进行优化的 HTTP 服务器。您甚至可以设计并实现一个完全针对您的需求量身定制的新协议。

另一个不可避免的情况是,当您必须处理遗留专有协议以确保与旧系统的互操作性时。在这种情况下,重要的是我们如何在不牺牲最终应用程序的稳定性和性能的情况下快速实现该协议。

解决方案

Netty 项目 致力于提供一个异步事件驱动的网络应用程序框架和工具,用于快速开发可维护的高性能 · 高可扩展性协议服务器和客户端。

换句话说,Netty 是一个 NIO 客户端服务器框架,它支持快速轻松地开发网络应用程序,例如协议服务器和客户端。它极大地简化和精简了网络编程,例如 TCP 和 UDP 套接字服务器开发。

“快速轻松”并不意味着最终应用程序将遭受可维护性或性能问题。Netty 经过精心设计,吸取了从 FTP、SMTP、HTTP 以及各种基于二进制和文本的遗留协议的实现中获得的经验。因此,Netty 成功地找到了一种方法,在不妥协的情况下实现易于开发、性能、稳定性和灵活性。

一些用户可能已经找到了其他声称具有相同优势的网络应用程序框架,您可能想知道是什么让 Netty 与它们如此不同。答案是它所基于的理念。Netty 的设计从第一天起就旨在在 API 和实现方面为您提供最舒适的体验。这不是什么有形的东西,但您会意识到,当您阅读本指南并使用 Netty 时,这种理念将使您的生活变得更加轻松。

入门

本章将围绕 Netty 的核心构造进行介绍,并通过简单的示例帮助你快速入门。在本章结束时,你将能够在 Netty 之上编写一个客户端和一个服务器。

如果你更喜欢自上而下的学习方式,你可以从第 2 章“架构概述”开始,然后回到这里。

开始之前

运行本章中介绍的示例的最低要求只有两个:最新版本的 Netty 和 JDK 1.6 或更高版本。最新版本的 Netty 可在 项目下载页面 中获得。要下载正确的 JDK 版本,请参阅你首选的 JDK 供应商的网站。

在阅读过程中,你可能会对本章中介绍的类有更多疑问。每当你想要了解有关它们的更多信息时,请参阅 API 参考。为了方便起见,本文档中的所有类名都链接到了在线 API 参考。此外,请不要犹豫,联系 Netty 项目社区,并让我们知道是否存在任何不正确的信息、语法和拼写错误,以及你是否有改进文档的好主意。

编写丢弃服务器

世界上最简单的协议不是“Hello, World!”而是 DISCARD。这是一个丢弃任何接收到的数据而不进行任何响应的协议。

要实现 DISCARD 协议,你唯一需要做的事情就是忽略所有接收到的数据。让我们直接从处理程序实现开始,它处理 Netty 生成的 I/O 事件。

package io.netty.example.discard;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerAdapter;

/**
 * Handles a server-side channel.
 */
public class DiscardServerHandler extends ChannelHandlerAdapter { // (1)

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
        // Discard the received data silently.
        ((ByteBuf) msg).release(); // (3)
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}
  1. DiscardServerHandler 扩展了 ChannelHandlerAdapter,它是 ChannelHandler 的实现。ChannelHandler 提供了你可以覆盖的各种事件处理程序方法。现在,仅仅扩展 ChannelHandlerAdapter 就足够了,而不是自己实现处理程序接口。
  2. 我们在这里覆盖 channelRead() 事件处理程序方法。每当从客户端收到新数据时,都会使用接收到的消息调用此方法。在此示例中,接收到的消息的类型是 ByteBuf
  3. 要实现 DISCARD 协议,处理程序必须忽略接收到的消息。ByteBuf 是一个引用计数对象,必须通过 release() 方法显式释放。请记住,释放传递给处理程序的任何引用计数对象是处理程序的责任。通常,channelRead() 处理程序方法的实现如下
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    try {
        // Do something with msg
    } finally {
        ReferenceCountUtil.release(msg);
    }
}
  1. exceptionCaught() 事件处理程序方法在 Netty 由于 I/O 错误或处理事件时抛出异常的处理程序实现而引发 Throwable 时调用。在大多数情况下,应在此处记录捕获的异常并关闭其关联的通道,但此方法的实现可能因您希望如何处理异常情况而异。例如,您可能希望在关闭连接之前发送带有错误代码的响应消息。

到目前为止一切顺利。我们已经实现了 DISCARD 服务器的前半部分。现在剩下的就是编写 main() 方法,该方法使用 DiscardServerHandler 启动服务器。

package io.netty.example.discard;
    
import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
    
/**
 * Discards any incoming data.
 */
public class DiscardServer {
    
    private int port;
    
    public DiscardServer(int port) {
        this.port = port;
    }
    
    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new DiscardServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
    
            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)
    
            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
    
    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new DiscardServer(port).run();
    }
}
  1. NioEventLoopGroup 是处理 I/O 操作的多线程事件循环。Netty 为不同类型的传输提供了各种 EventLoopGroup 实现。在此示例中,我们正在实现一个服务器端应用程序,因此将使用两个 NioEventLoopGroup。第一个通常称为“boss”,它接受传入的连接。第二个通常称为“worker”,它在 boss 接受连接并将接受的连接注册到 worker 后处理接受的连接的流量。使用多少个线程以及如何将它们映射到创建的 Channel 取决于 EventLoopGroup 实现,甚至可以通过构造函数进行配置。
  2. ServerBootstrap 是一个用于设置服务器的帮助程序类。您可以直接使用 Channel 设置服务器。但是,请注意这是一个繁琐的过程,在大多数情况下您不需要这样做。
  3. 在此,我们指定使用 NioServerSocketChannel 类,该类用于实例化新的 Channel 以接受传入的连接。
  4. 此处指定的处理器将始终由新接受的 Channel 评估。 ChannelInitializer 是一个特殊处理器,其目的是帮助用户配置新的 Channel。您很可能希望通过添加某些处理器(例如 DiscardServerHandler)来配置新 ChannelChannelPipeline,以实现您的网络应用程序。随着应用程序变得复杂,您很可能会向管道中添加更多处理器,并最终将此匿名类提取到顶级类中。
  5. 您还可以设置特定于 Channel 实现的参数。我们正在编写 TCP/IP 服务器,因此我们被允许设置套接字选项,例如 tcpNoDelaykeepAlive。请参阅 ChannelOption 的 apidocs 和特定的 ChannelConfig 实现,以获取对受支持 ChannelOption 的概述。
  6. 您注意到 option()childOption() 了吗? option() 用于接受传入连接的 NioServerSocketChannelchildOption() 用于父 ServerChannel 接受的 Channel,在本例中为 NioServerSocketChannel
  7. 我们现在可以开始了。剩下的就是绑定到端口并启动服务器。在此,我们绑定到计算机中所有网卡 (NIC) 的端口 8080。您现在可以按需多次调用 bind() 方法(使用不同的绑定地址)。

恭喜!您刚刚在 Netty 之上完成了您的第一个服务器。

查看接收到的数据

现在我们已经编写了我们的第一个服务器,我们需要测试它是否真的有效。测试它的最简单方法是使用 telnet 命令。例如,您可以在命令行中输入 telnet localhost 8080 并输入一些内容。

但是,我们能说服务器工作正常吗?我们无法真正知道,因为它是一个丢弃服务器。您根本不会得到任何响应。为了证明它确实有效,让我们修改服务器以打印它收到的内容。

我们已经知道每当收到数据时都会调用 channelRead() 方法。让我们将一些代码放入 DiscardServerHandlerchannelRead() 方法中

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ByteBuf in = (ByteBuf) msg;
    try {
        while (in.isReadable()) { // (1)
            System.out.print((char) in.readByte());
            System.out.flush();
        }
    } finally {
        ReferenceCountUtil.release(msg); // (2)
    }
}
  1. 这个低效的循环实际上可以简化为:System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII))
  2. 或者,你也可以在这里执行in.release()

如果你再次运行telnet命令,你将看到服务器打印出它接收到的内容。

discard 服务器的完整源代码位于分发版的 io.netty.example.discard 包中。

编写回显服务器

到目前为止,我们一直在消耗数据,而没有做出任何响应。然而,服务器通常应该对请求做出响应。让我们学习如何通过实现 ECHO 协议来编写一个响应消息给客户端,其中任何接收到的数据都会被发回。

与我们在前几节中实现的 discard 服务器唯一的区别在于,它将接收到的数据发回,而不是将接收到的数据打印到控制台。因此,再次修改 channelRead() 方法就足够了

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.write(msg); // (1)
        ctx.flush(); // (2)
    }
  1. 一个 ChannelHandlerContext 对象提供了各种操作,使你能够触发各种 I/O 事件和操作。在这里,我们调用 write(Object) 来逐字书写接收到的消息。请注意,与我们在 DISCARD 示例中所做不同,我们没有释放接收到的消息。这是因为 Netty 在将消息写入网络时会为你释放它。
  2. ctx.write(Object) 不会将消息写入网络。它在内部被缓冲,然后由 ctx.flush() 刷新到网络。或者,你可以调用 ctx.writeAndFlush(msg) 以简化操作。

如果你再次运行telnet命令,你将看到服务器发回你发送给它的任何内容。

回显服务器的完整源代码位于分发版的 io.netty.example.echo 包中。

编写时间服务器

本节要实现的协议是 TIME 协议。它与之前的示例不同,它发送一条包含 32 位整数的消息,而不接收任何请求,并在发送消息后关闭连接。在本示例中,你将学习如何构造和发送消息,以及如何在完成时关闭连接。

由于我们打算忽略任何接收到的数据,但在建立连接后立即发送一条消息,因此这次我们不能使用 channelRead() 方法。相反,我们应该覆盖 channelActive() 方法。以下是实现

package io.netty.example.time;

public class TimeServerHandler extends ChannelHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
        
        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {

            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 如上所述,当建立连接并准备生成流量时,将调用 channelActive() 方法。让我们在这个方法中写入一个表示当前时间的 32 位整数。

  2. 要发送新消息,我们需要分配一个将包含消息的新缓冲区。我们将写入一个 32 位整数,因此我们需要一个容量至少为 4 字节的 ByteBuf。通过 ChannelHandlerContext.alloc() 获取当前 ByteBufAllocator 并分配一个新缓冲区。

  3. 和往常一样,我们写入构造的消息。

    但是等等,翻转在哪里?在 NIO 中发送消息之前,我们不是习惯调用 java.nio.ByteBuffer.flip() 吗?ByteBuf 没有这种方法,因为它有两个指针;一个用于读取操作,另一个用于写入操作。当您向 ByteBuf 写入内容时,写入器索引会增加,而读取器索引不会改变。读取器索引和写入器索引分别表示消息的开始和结束位置。

    相比之下,NIO 缓冲区没有提供一种干净的方法来弄清楚消息内容的开始和结束位置,而不调用翻转方法。如果您忘记翻转缓冲区,您将遇到麻烦,因为什么数据都不会发送,或者发送了不正确的数据。在 Netty 中不会发生这样的错误,因为我们为不同的操作类型提供了不同的指针。您会发现当您习惯它时,它会让您的生活变得更加轻松——一种不用翻转的生活!

    需要注意的另一点是,ChannelHandlerContext.write()(和 writeAndFlush())方法返回一个 ChannelFuture。一个 ChannelFuture 表示一个尚未发生的 I/O 操作。这意味着,任何请求的操作可能尚未执行,因为 Netty 中的所有操作都是异步的。例如,以下代码可能在发送消息之前就关闭了连接

    Channel ch = ...;
    ch.writeAndFlush(message);
    ch.close();

    因此,你需要在 write() 方法返回的 ChannelFuture 完成后调用 close() 方法,它会在写操作完成后通知其侦听器。请注意,close() 也可能不会立即关闭连接,它会返回一个 ChannelFuture

  4. 那么,我们如何在写请求完成后得到通知呢?这和向返回的 ChannelFuture 添加一个 ChannelFutureListener 一样简单。这里,我们创建了一个新的匿名 ChannelFutureListener,它会在操作完成后关闭 Channel

    或者,你可以使用预定义的侦听器来简化代码

    f.addListener(ChannelFutureListener.CLOSE);

要测试我们的时间服务器是否按预期工作,你可以使用 UNIX rdate 命令

$ rdate -o <port> -p <host>

其中 <port> 是你在 main() 方法中指定的端口号,<host> 通常是 localhost

编写时间客户端

DISCARDECHO 服务器不同,我们需要一个 TIME 协议的客户端,因为人类无法将 32 位二进制数据转换成日历上的日期。在本节中,我们将讨论如何确保服务器正常工作,并学习如何使用 Netty 编写客户端。

Netty 中服务器和客户端之间最大也是唯一的区别是使用了不同的 BootstrapChannel 实现。请查看以下代码

package io.netty.example.time;

public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            
            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
  1. BootstrapServerBootstrap 类似,但它适用于非服务器通道,例如客户端或无连接通道。
  2. 如果您只指定一个 EventLoopGroup,它将同时用作 boss 组和 worker 组。不过,boss worker 不用于客户端。
  3. 使用 NioSocketChannel(而不是 NioServerSocketChannel)来创建客户端 Channel
  4. 请注意,我们在这里不使用 childOption(),这与我们使用 ServerBootstrap 时不同,因为客户端 SocketChannel 没有父级。
  5. 我们应该调用 connect() 方法,而不是 bind() 方法。

如您所见,它与服务器端代码并没有什么不同。那么 ChannelHandler 实现如何呢?它应该从服务器接收一个 32 位整数,将其转换为人类可读格式,打印转换后的时间,然后关闭连接

package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg; // (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 在 TCP/IP 中,Netty 将从对等方发送的数据读入 [ByteBuf] 中。

它看起来非常简单,与服务器端示例没有任何不同。但是,此处理程序有时会拒绝工作,引发 IndexOutOfBoundsException。我们将在下一节讨论为什么会发生这种情况。

处理基于流的传输

套接字缓冲区的一个小警告

在基于流的传输(例如 TCP/IP)中,接收到的数据将存储在套接字接收缓冲区中。不幸的是,基于流的传输的缓冲区不是数据包的队列,而是字节的队列。这意味着,即使您将两条消息作为两个独立的数据包发送,操作系统也不会将它们视为两条消息,而只是将它们视为一堆字节。因此,无法保证您读取的内容与远程对等方写入的内容完全相同。例如,让我们假设操作系统的 TCP/IP 协议栈收到了三个数据包

Three packets received as they were sent

由于基于流的协议的这种一般特性,您的应用程序很有可能以以下片段形式读取它们

Three packets split and merged into four buffers

因此,接收方(无论它是服务器端还是客户端)都应将接收到的数据解包成一个或多个有意义的帧,以便应用程序逻辑能够轻松理解。对于上面的示例,接收到的数据应像下面一样进行分帧

Four buffers defragged into three

第一个解决方案

现在让我们回到TIME客户端示例。我们在这里遇到了同样的问题。32 位整数的数据量非常小,不太可能经常被分段。但是,问题在于它可以被分段,并且随着流量的增加,分段的可能性会增加。

简单的解决方案是创建一个内部累积缓冲区,并等到所有 4 个字节都接收到了内部缓冲区中。以下是修改后的 TimeClientHandler 实现,它解决了这个问题

package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelHandlerAdapter {
    private ByteBuf buf;
    
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        buf = ctx.alloc().buffer(4); // (1)
    }
    
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        buf.release(); // (1)
        buf = null;
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m); // (2)
        m.release();
        
        if (buf.readableBytes() >= 4) { // (3)
            long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 一个 ChannelHandler 有两个生命周期侦听器方法:handlerAdded()handlerRemoved()。只要它不会长时间阻塞,你就可以执行任意(解)初始化任务。
  2. 首先,所有接收到的数据都应累积到 buf 中。
  3. 然后,处理程序必须检查 buf 是否有足够的数据(本例中为 4 个字节),并继续执行实际业务逻辑。否则,当更多数据到达时,Netty 将再次调用 channelRead() 方法,最终将累积所有 4 个字节。

第二个解决方案

虽然第一个解决方案解决了 TIME 客户端的问题,但修改后的处理程序看起来并不那么干净。想象一个更复杂的协议,它由多个字段组成,例如可变长度字段。你的 ChannelHandler 实现将很快变得难以维护。

正如你可能注意到的,你可以将多个 ChannelHandler 添加到 ChannelPipeline 中,因此,你可以将一个单一的 ChannelHandler 拆分为多个模块化的处理程序,以降低应用程序的复杂性。例如,你可以将 TimeClientHandler 拆分为两个处理程序

  • TimeDecoder 处理碎片问题,以及
  • TimeClientHandler 的最初简单版本。

幸运的是,Netty 提供了一个可扩展类,它可以帮助你编写第一个开箱即用的

package io.netty.example.time;

public class TimeDecoder extends ByteToMessageDecoder { // (1)
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }
        
        out.add(in.readBytes(4)); // (4)
    }
}
  1. ByteToMessageDecoderChannelHandler 的实现,它可以轻松处理碎片问题。
  2. ByteToMessageDecoder 在接收到新数据时,使用内部维护的累积缓冲区调用 decode() 方法。
  3. 当累积缓冲区中没有足够的数据时,decode() 可以决定不向 out 添加任何内容。当接收到更多数据时,ByteToMessageDecoder 将再次调用 decode()
  4. 如果 decode()out 添加了一个对象,则表示解码器成功解码了一条消息。 ByteToMessageDecoder 将丢弃累积缓冲区的已读部分。请记住,你不需要解码多条消息。 ByteToMessageDecoder 将继续调用 decode() 方法,直到它不再向 out 添加任何内容。

现在我们有了另一个要插入到 ChannelPipeline 中的处理程序,我们应该修改 TimeClient 中的 ChannelInitializer 实现

b.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
    }
});

如果你是一个喜欢冒险的人,你可能想尝试 ReplayingDecoder,它进一步简化了解码器。不过,你需要查阅 API 参考以获取更多信息。

public class TimeDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(
            ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        out.add(in.readBytes(4));
    }
}

此外,Netty 提供了开箱即用的解码器,使你能够非常轻松地实现大多数协议,并帮助你避免最终使用不可维护的单片处理程序实现。请参阅以下软件包以获取更详细的示例

使用 POJO 而不是 ByteBuf

到目前为止,我们回顾的所有示例都将 ByteBuf 用作协议消息的主要数据结构。在本节中,我们将改进 TIME 协议客户端和服务器示例,以使用 POJO 而不是 ByteBuf

ChannelHandler 中使用 POJO 的优点显而易见;通过将从 ByteBuf 中提取信息的代码从处理程序中分离出来,您的处理程序变得更易于维护和重复使用。在 TIME 客户端和服务器示例中,我们只读取一个 32 位整数,直接使用 ByteBuf 并不是什么大问题。但是,您会发现随着实现真实世界的协议,有必要进行分离。

首先,让我们定义一个名为 UnixTime 的新类型。

package io.netty.example.time;

import java.util.Date;

public class UnixTime {

    private final long value;
    
    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }
    
    public UnixTime(long value) {
        this.value = value;
    }
        
    public long value() {
        return value;
    }
        
    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}

现在,我们可以修改 TimeDecoder 以生成 UnixTime 而不是 ByteBuf

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    if (in.readableBytes() < 4) {
        return;
    }

    out.add(new UnixTime(in.readUnsignedInt()));
}

使用更新的解码器,TimeClientHandler 不再使用 ByteBuf

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    UnixTime m = (UnixTime) msg;
    System.out.println(m);
    ctx.close();
}

更简单、更优雅,对吧?同样的技术可以应用于服务器端。让我们这次先更新 TimeServerHandler

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ChannelFuture f = ctx.writeAndFlush(new UnixTime());
    f.addListener(ChannelFutureListener.CLOSE);
}

现在,唯一缺少的部分是编码器,它是 ChannelHandler 的实现,它将 UnixTime 转换回 ByteBuf。它比编写解码器简单得多,因为在对消息进行编码时无需处理数据包分段和组装。

package io.netty.example.time;

public class TimeEncoder extends ChannelHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        UnixTime m = (UnixTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt((int) m.value());
        ctx.write(encoded, promise); // (1)
    }
}
  1. 在这个处理程序方法中有一些重要事项需要注意

    首先,我们按原样传递原始 ChannelPromise,以便 Netty 在将编码数据实际写入线路时将其标记为成功或失败。

    其次,我们没有调用 ctx.flush()。有一个单独的处理程序方法 void flush(ChannelHandlerContext ctx),其目的是覆盖 flush() 操作。

为了进一步简化,您可以使用 MessageToByteEncoder

public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
        out.writeInt((int) msg.value());
    }
}

剩下的最后任务是在服务器端将一个 TimeEncoder 插入到 ChannelPipeline 中,在 TimeServerHandler 之前,这是一个简单的练习。

关闭应用程序

关闭 Netty 应用程序通常就像关闭所有通过 shutdownGracefully() 创建的 EventLoopGroup 一样简单。它返回一个 Future,当 EventLoopGroup 完全终止并且属于该组的所有 Channel 都已关闭时通知您。

总结

在本章中,我们快速浏览了 Netty,并演示了如何在 Netty 之上编写一个完全可用的网络应用程序。

后续章节中将提供有关 Netty 的更多详细信息。我们还鼓励您查看 io.netty.example 包中的 Netty 示例。

另请注意,社区 始终等待您的问题和想法,以帮助您并根据您的反馈不断改进 Netty 及其文档。

上次检索时间为 2024 年 7 月 19 日