引用计数对象
自 Netty 4 版本以来,某些对象的声明周期由其引用计数管理,以便 Netty 可以将其(或其共享资源)返回到对象池(或对象分配器)中,只要不再使用它。垃圾回收和引用队列无法提供高效的实时不可达性保证,而引用计数以轻微的不便为代价提供了一种替代机制。
ByteBuf
是利用引用计数来提高分配和释放性能的最显着类型,本页面将使用 ByteBuf
解释 Netty 中的引用计数如何工作。
引用计数对象的初始引用计数为 1
ByteBuf buf = ctx.alloc().directBuffer();
assert buf.refCnt() == 1;
当您释放引用计数对象时,其引用计数将减少 1。如果引用计数达到 0,则引用计数对象将被释放或返回到其来源的对象池
assert buf.refCnt() == 1;
// release() returns true only if the reference count becomes 0.
boolean destroyed = buf.release();
assert destroyed;
assert buf.refCnt() == 0;
尝试访问引用计数为 0 的引用计数对象将触发 IllegalReferenceCountException
assert buf.refCnt() == 0;
try {
buf.writeLong(0xdeadbeef);
throw new Error("should not reach here");
} catch (IllegalReferenceCountExeception e) {
// Expected
}
引用计数还可通过 retain()
操作增加,只要它尚未被销毁
ByteBuf buf = ctx.alloc().directBuffer();
assert buf.refCnt() == 1;
buf.retain();
assert buf.refCnt() == 2;
boolean destroyed = buf.release();
assert !destroyed;
assert buf.refCnt() == 1;
一般经验法则是,最后访问引用计数对象的参与方也负责销毁该引用计数对象。具体来说
- 如果 [发送] 组件应该将引用计数对象传递给另一个 [接收] 组件,则发送组件通常不需要销毁它,而是将该决定推迟到接收组件。
- 如果一个组件使用了一个引用计数对象,并且知道没有其他任何东西会再访问它(即,不会将引用传递给另一个组件),则该组件应该销毁它。
这是一个简单的示例
public ByteBuf a(ByteBuf input) {
input.writeByte(42);
return input;
}
public ByteBuf b(ByteBuf input) {
try {
output = input.alloc().directBuffer(input.readableBytes() + 1);
output.writeBytes(input);
output.writeByte(42);
return output;
} finally {
input.release();
}
}
public void c(ByteBuf input) {
System.out.println(input);
input.release();
}
public void main() {
...
ByteBuf buf = ...;
// This will print buf to System.out and destroy it.
c(b(a(buf)));
assert buf.refCnt() == 0;
}
操作 | 谁应该释放? | 谁释放了? |
---|---|---|
1. main() 创建 buf |
buf →main() |
|
2. main() 使用 buf 调用 a() |
buf →a() |
|
3. a() 仅仅返回 buf 。 |
buf →main() |
|
4. main() 使用 buf 调用 b() |
buf →b() |
|
5. b() 返回 buf 的副本 |
buf →b() ,copy →main() |
b() 释放 buf |
6. main() 使用 copy 调用 c() |
copy →c() |
|
7. c() 吞下 copy |
copy →c() |
c() 释放 copy |
ByteBuf.duplicate()
、ByteBuf.slice()
和 ByteBuf.order(ByteOrder)
创建一个派生缓冲区,该缓冲区与父缓冲区共享内存区域。派生缓冲区没有自己的引用计数,而是共享父缓冲区的引用计数。
ByteBuf parent = ctx.alloc().directBuffer();
ByteBuf derived = parent.duplicate();
// Creating a derived buffer does not increase the reference count.
assert parent.refCnt() == 1;
assert derived.refCnt() == 1;
相反,ByteBuf.copy()
和 ByteBuf.readBytes(int)
不是派生缓冲区。返回的 ByteBuf
已分配,需要释放。
请注意,父缓冲区及其派生缓冲区共享相同的引用计数,并且在创建派生缓冲区时引用计数不会增加。因此,如果您要将派生缓冲区传递给应用程序的其他组件,则必须首先对其调用 retain()
。
ByteBuf parent = ctx.alloc().directBuffer(512);
parent.writeBytes(...);
try {
while (parent.isReadable(16)) {
ByteBuf derived = parent.readSlice(16);
derived.retain();
process(derived);
}
} finally {
parent.release();
}
...
public void process(ByteBuf buf) {
...
buf.release();
}
有时,ByteBuf
由缓冲区持有者包含,例如 DatagramPacket
、HttpContent
和 WebSocketframe
。这些类型扩展了一个名为 ByteBufHolder
的通用接口。
缓冲区持有者共享其包含缓冲区的引用计数,就像派生缓冲区一样。
当事件循环将数据读入 ByteBuf
并使用它触发 channelRead()
事件时,相应管道中的 ChannelHandler
负责释放缓冲区。因此,使用接收数据的处理程序应在其 channelRead()
处理程序方法中对数据调用 release()
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
try {
...
} finally {
buf.release();
}
}
正如本文档的“谁销毁?”部分中所解释的,如果您的处理程序将缓冲区(或任何引用计数对象)传递给下一个处理程序,则无需释放它
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
...
ctx.fireChannelRead(buf);
}
请注意,ByteBuf
并不是 Netty 中唯一的引用计数类型。如果您正在处理解码器生成的消息,则该消息很可能也具有引用计数
// Assuming your handler is placed next to `HttpRequestDecoder`
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
HttpRequest req = (HttpRequest) msg;
...
}
if (msg instanceof HttpContent) {
HttpContent content = (HttpContent) msg;
try {
...
} finally {
content.release();
}
}
}
如果您有疑问或想简化释放消息,可以使用 ReferenceCountUtil.release()
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
...
} finally {
ReferenceCountUtil.release(msg);
}
}
或者,您可以考虑扩展 SimpleChannelHandler
,它会对您收到的所有消息调用 ReferenceCountUtil.release(msg)
。
与入站消息不同,出站消息是由您的应用程序创建的,并且 Netty 负责在将它们写入网络后释放它们。但是,拦截您的写入请求的处理程序应确保正确释放任何中间对象。(例如编码器)
// Simple-pass through
public void write(ChannelHandlerContext ctx, Object message, ChannelPromise promise) {
System.err.println("Writing: " + message);
ctx.write(message, promise);
}
// Transformation
public void write(ChannelHandlerContext ctx, Object message, ChannelPromise promise) {
if (message instanceof HttpContent) {
// Transform HttpContent to ByteBuf.
HttpContent content = (HttpContent) message;
try {
ByteBuf transformed = ctx.alloc().buffer();
....
ctx.write(transformed, promise);
} finally {
content.release();
}
} else {
// Pass non-HttpContent through.
ctx.write(message, promise);
}
}
引用计数的缺点是容易泄漏引用计数对象。由于 JVM 不知道 Netty 实现的引用计数,因此一旦它们变得不可达,JVM 就会自动对它们进行垃圾回收,即使它们的引用计数不为零。一旦垃圾回收,对象就不能复活,因此无法返回到它所在的池中,从而会产生内存泄漏。
幸运的是,尽管查找泄漏很困难,但 Netty 默认会对约 1% 的缓冲区分配进行采样,以检查您的应用程序中是否存在泄漏。如果发生泄漏,您将找到以下日志消息
泄漏:在垃圾回收之前未调用 ByteBuf.release()。启用高级泄漏报告以找出泄漏发生的位置。要启用高级泄漏报告,请指定 JVM 选项“-Dio.netty.leakDetectionLevel=advanced”或调用 ResourceLeakDetector.setLevel()
使用上述 JVM 选项重新启动您的应用程序,您将看到泄漏缓冲区被访问的应用程序的最近位置。以下输出显示了我们的单元测试(XmlFrameDecoderTest.testDecodeWithXml()
)的泄漏
Running io.netty.handler.codec.xml.XmlFrameDecoderTest
15:03:36.886 [main] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected.
Recent access records: 1
#1:
io.netty.buffer.AdvancedLeakAwareByteBuf.toString(AdvancedLeakAwareByteBuf.java:697)
io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithXml(XmlFrameDecoderTest.java:157)
io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithTwoMessages(XmlFrameDecoderTest.java:133)
...
Created at:
io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:55)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155)
io.netty.buffer.UnpooledUnsafeDirectByteBuf.copy(UnpooledUnsafeDirectByteBuf.java:465)
io.netty.buffer.WrappedByteBuf.copy(WrappedByteBuf.java:697)
io.netty.buffer.AdvancedLeakAwareByteBuf.copy(AdvancedLeakAwareByteBuf.java:656)
io.netty.handler.codec.xml.XmlFrameDecoder.extractFrame(XmlFrameDecoder.java:198)
io.netty.handler.codec.xml.XmlFrameDecoder.decode(XmlFrameDecoder.java:174)
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:227)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:140)
io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:74)
io.netty.channel.embedded.EmbeddedEventLoop.invokeChannelRead(EmbeddedEventLoop.java:142)
io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:317)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
io.netty.channel.embedded.EmbeddedChannel.writeInbound(EmbeddedChannel.java:176)
io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithXml(XmlFrameDecoderTest.java:147)
io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithTwoMessages(XmlFrameDecoderTest.java:133)
...
如果您使用 Netty 5 或更高版本,则会提供其他信息来帮助您找出哪个处理程序最后处理了泄漏的缓冲区。以下示例显示泄漏的缓冲区由名称为 EchoServerHandler#0
的处理程序处理,然后被垃圾回收,这意味着 EchoServerHandler#0
很可能忘记释放缓冲区
12:05:24.374 [nioEventLoop-1-1] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected.
Recent access records: 2
#2:
Hint: 'EchoServerHandler#0' will handle the message from this point.
io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:329)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:133)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:485)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:452)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:346)
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:794)
java.lang.Thread.run(Thread.java:744)
#1:
io.netty.buffer.AdvancedLeakAwareByteBuf.writeBytes(AdvancedLeakAwareByteBuf.java:589)
io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:208)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:125)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:485)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:452)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:346)
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:794)
java.lang.Thread.run(Thread.java:744)
Created at:
io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:55)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:146)
io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:107)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:485)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:452)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:346)
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:794)
java.lang.Thread.run(Thread.java:744)
目前有 4 个级别的泄漏检测
-
DISABLED
- 完全禁用泄漏检测。不推荐。 -
SIMPLE
- 告知 1% 的缓冲区是否存在泄漏。默认。 -
ADVANCED
- 告知 1% 的缓冲区中泄漏缓冲区的访问位置。 -
PARANOID
- 与ADVANCED
相同,但适用于每个缓冲区。适用于自动化测试阶段。如果构建输出包含“LEAK:
”,则可以使构建失败。
您可以将泄漏检测级别指定为 JVM 选项 -Dio.netty.leakDetection.level
java -Dio.netty.leakDetection.level=advanced ...
注意:此属性过去称为 io.netty.leakDetectionLevel
。
- 在
PARANOID
泄漏检测级别以及SIMPLE
级别运行您的单元测试和集成测试。 - 在
SIMPLE
级别将您的应用程序部署到整个集群之前,对其进行一段时间的预发布,以查看是否存在泄漏。 - 如果存在泄漏,请在
ADVANCED
级别再次进行预发布,以获取有关泄漏来源的一些提示。 - 不要将存在泄漏的应用程序部署到整个集群。
在单元测试中忘记释放缓冲区或消息非常容易。它将生成泄漏警告,但这并不一定意味着您的应用程序存在泄漏。您可以使用 ReferenceCountUtil.releaseLater()
实用程序方法,而不是使用 try-finally
块包装单元测试以释放所有缓冲区
import static io.netty.util.ReferenceCountUtil.*;
@Test
public void testSomething() throws Exception {
// ReferenceCountUtil.releaseLater() will keep the reference of buf,
// and then release it when the test thread is terminated.
ByteBuf buf = releaseLater(Unpooled.directBuffer(512));
...
}
外部链接