过滤器
IoFilter 扮演着很重要角色,它是 MINA 的核心结构之一。它过滤 IoService 和 IoHandler 之间的所有 I/O 事件和请求。如果你有网络应用编程的经验,你完全可以把它当成 Servlet 过滤器的表兄弟。许多开箱即用的过滤器通过使用类似以下的开箱即用过滤器简化横切注入用来提升网络应用的开发速度:
- LoggingFilter 记录所有事件和请求
- ProtocolCodecFilter 将一个连入的 ByteBuffer 转化为消息 POJO,反之亦然
- CompressionFilter 压缩所有数据
- SSLFilter 添加 SSL - TLS - StartTLS 支持
- 更多!
本文我们将了解如何为一个真实案例实现一个 IoFilter。通常实现一个 IoFilter 是很简单的,但你也需要知道 MINA 的内部细节。本文将对这些相关内部细节进行解释。
现有的过滤器
我们已经有很多写好的过滤器了。下表列出了所有现有的过滤器,并在他们的用途方面进行简要说明。
过滤器 | 类 | 描述 |
---|---|---|
Blacklist | BlacklistFilter | 阻止列入黑名单的远程地址的连接 |
Buffered Write | BufferedWriteFilter | 缓存输出请求,就像 BufferedOutputStream 所做的那样 |
Compression | CompressionFilter | |
ConnectionThrottle | ConnectionThrottleFilter | |
ErrorGenerating | ErrorGeneratingFilter | |
Executor | ExecutorFilter | |
FileRegionWrite | FileRegionWriteFilter | |
KeepAlive | KeepAliveFilter | |
Logging | LoggingFilter | 日志事件消息,比如 MessageReceived、MessageSent、SessionOpened 等等 |
MDC Injection | MdcInjectionFilter | 将关键 IoSession 属性注入 MDC |
Noop | NoopFilter | 一个不作任何事情的过滤器。用于测试。 |
Profiler | ProfilerTimerFilter | 分析事件消息,比如 MessageReceived、MessageSent、SessionOpened 等等 |
ProtocolCodec | ProtocolCodecFilter | 负责对消息进行编码和解码的过滤器 |
Proxy | ProxyFilter | |
Reference counting | ReferenceCountingFilter | 跟踪本过滤器的使用次数 |
RequestResponse | RequestResponseFilter | |
SessionAttributeInitializing | SessionAttributeInitializingFilter | |
StreamWrite | StreamWriteFilter | |
SslFilter | SslFilter | |
WriteRequest | WriteRequestFilter |
选择性重写事件
你可以对 IoAdapter 重写以取代直接实现 IoFilter 的做法。除非重写,否则所有接收到的事件将被直接转发给下一个过滤器。
public class MyFilter extends IoFilterAdapter {
@Override
public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception {
// Some logic here...
nextFilter.sessionOpened(session);
// Some other logic here...
}
}
转换写请求
如果你要通过 IoSession.write() 对一个连入的写请求进行转换,事情可能会变得相当棘手。例如,假设你的过滤器在一个 HighLevelMessage 对象调用 IoSession.write() 时将 HighLevelMessage 转换为 LowLevelMessage。你可以在你的过滤器的 filterWrite() 方法里添加适当的转换代码并认为一切就这样了。但是你也需要注意 messageSent 事件,因为一个 IoHandler 或者任何之后的过滤器会期望 messageSent() 方法以 HighLevelMessage 作为参数调用,因为让调用者在写 HighLevelMessage 的时候被通知到 HighLevelMessage 已发送是不合理的。因此,如果你的过滤器负责转换时你最好同时实现 filterWrite() 和 messageSent()。
另外还要注意的是,你仍旧需要实现类似的机制,即使输入对象和输出对象的类型是一样的,因为 IoSession.write() 的调用者期望它的 messageSent() 处理器方法有一个具体对象。
假定你在实现一个将字符串转换为字符数组的过滤器。你的过滤器的 filterWrite() 将会类似于:
public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest request) {
nextFilter.filterWrite(
session, new DefaultWriteRequest(
((String) request.getMessage()).toCharArray(), request.getFuture(), request.getDestination()));
}
现在我们需要在 messageSent() 中做相反的事情:
public void messageSent(NextFilter nextFilter, IoSession session, Object message) {
nextFilter.messageSent(session, new String((char[]) message));
}
字符串到字节缓存的转换怎么样?这样我们会更加高效,我们不在需要重建原始消息 (字符串)。但是,这比前面的例子复杂:
public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest request) {
String m = (String) request.getMessage();
ByteBuffer newBuffer = new MyByteBuffer(m, ByteBuffer.wrap(m.getBytes());
nextFilter.filterWrite(
session, new WriteRequest(newBuffer, request.getFuture(), request.getDestination()));
}
public void messageSent(NextFilter nextFilter, IoSession session, Object message) {
if (message instanceof MyByteBuffer) {
nextFilter.messageSent(session, ((MyByteBuffer) message).originalValue);
} else {
nextFilter.messageSent(session, message);
}
}
private static class MyByteBuffer extends ByteBufferProxy {
private final Object originalValue;
private MyByteBuffer(Object originalValue, ByteBuffer encodedValue) {
super(encodedValue);
this.originalValue = originalValue;
}
}
如果你使用的是 MINA 2.0,这将跟 1.0 和 1.1 有所不同。同时也参考一下 CompressionFilter 和 RequestResponseFilter。
过滤 sessionCreated 事件时须谨慎
sessionCreated 是一个特殊事件,它必须在 I/O 处理程序中执行 (参考 线程模型的配置)。决不允许将 sessionCreated 事件转发给其他线程。
public void sessionCreated(NextFilter nextFilter, IoSession session) throws Exception {
// ...
nextFilter.sessionCreated(session);
}
// 不要这么干
public void sessionCreated(final NextFilter nextFilter, final IoSession session) throws Exception {
Executor executor = ...;
executor.execute(new Runnable() {
nextFilter.sessionCreated(session);
});
}
小心空缓存!
在一些案例中 MINA 使用了一个空的缓冲区作为一个内部信号。空缓存有时会成为一个问题,因为它可能会造成各种各样的异常,比如 IndexOutOfBoundsException。这里我们介绍如何避免类似于这种难以预料的情况。
ProtocolCodecFilter 使用了一个空缓存 (比如 buf.hasRemaining() = 0) 来标记消息的结束部分。如果你的过滤器放在 ProtocolCodecFilter 之前,如果你的过滤器实现在缓存为空时能抛出一个异常的话,请确认你的过滤器将空缓存转发给了下一个过滤器:
public void messageSent(NextFilter nextFilter, IoSession session, Object message) {
if (message instanceof ByteBuffer && !((ByteBuffer) message).hasRemaining()) {
nextFilter.messageSent(nextFilter, session, message);
return;
}
...
}
public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest request) {
Object message = request.getMessage();
if (message instanceof ByteBuffer && !((ByteBuffer) message).hasRemaining()) {
nextFilter.filterWrite(nextFilter, session, request);
return;
}
...
}
这样的话,我们是否总是要为每个过滤器插入 if 块?幸运的是,不需要。这是处理空缓存的黄金法则:
- 如果你的过滤器及时在缓存为空时也能正常工作,你就完全不需要添加 if 块了
- 如果你的过滤器放在 ProtocolCodecFilter 之后,你也不需要添加 if 块
- 除此之外的话你就需要 if 块了
如果你需要加 if 块,记着你不总是需要遵循上面例子所讲的。你可以在任何地方检查缓存是否为空,只要你的过滤器不会抛出异常。