集群通信组件tribes之通道拦截器

最后更新于:2022-04-01 07:10:37

拦截器应该可以说是一个很经典的设计模式,它有点类似于过滤器,当某信息从一个地方流向目的地的过程中,可能需要统一对信息进行处理,如果考虑到系统的可扩展性和灵活性通常就会使用拦截器模式,它就像一个个关卡被设置在信息流动的通道中,并且可以按照实际需要添加和减少关卡。Tribes为了在应用层提供对源消息统一处理的渠道引入通道拦截器,用户在应用层只需要根据自己需要添加拦截器即可,例如,压缩解压拦截器、消息输出输入统计拦截器、异步消息发送器等等。 拦截器的数据流向示意图可以参考前面的tribes简介章节,数据从IO层流向应用层,中间就会经过一个拦截器栈,应用层处理完就会返回一个ack给发送端表示已经接收并处理完毕(消息可靠级别为SYNC_ACK),下面尝试用最简单一些代码和伪代码说明tribes的拦截器实现,旨在领会拦截器如何设计而并非具体的实现。最终实现的功能如图所示,最底层的协调者ChannelCoordinator永远作为第一个加入拦截器栈的拦截器,往上则是按照添加顺序排列,且每个拦截器的previous、next分别指向前一个拦截器和下一个拦截器。  ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8e2e5f8.jpg) ① 定义拦截器接口 ~~~ public interface ChannelInterceptor{     public void setNext(ChannelInterceptor next) ;     public ChannelInterceptor getPrevious();     public void sendMessage(ChannelMessage msg);     public void messageReceived(ChannelMessage msg); } ~~~ ② 定义一个基础拦截器,提供一些公共的操作,由于拦截器执行完后要触发下个拦截器,所以把触发工作统一抽离到基础类里面完成,当然里面必须包含前一个和后一个拦截器的引用。 ~~~ public class ChannelInterceptorBase implements ChannelInterceptor {     private ChannelInterceptor next;     private ChannelInterceptor previous;     public ChannelInterceptorBase() {     }     public final void setNext(ChannelInterceptor next) {         this.next = next;     }     public final ChannelInterceptor getNext() {         return next;     }     public final void setPrevious(ChannelInterceptor previous) {         this.previous = previous;     }     public final ChannelInterceptor getPrevious() {         return previous;     }     public void sendMessage(ChannelMessage msg) {         if (getNext() != null) getNext().sendMessage(msg,);     }     public void messageReceived(ChannelMessage msg) {         if (getPrevious() != null) getPrevious().messageReceived(msg);     } } ~~~ ③ 压缩解压拦截器,此拦截器负责按一定算法压缩和解压处理。 ~~~ public class GzipInterceptor extends ChannelInterceptorBase {     public void sendMessage(ChannelMessage msg){             compress the msg;             getNext().sendMessage(msg);     }     public void messageReceived(ChannelMessage msg) {             decompress the msg;             getPrevious().messageReceived(msg);     } } ~~~ ④ 最底层的协调器,直接与网络IO做交互 ~~~ public class ChannelCoordinator extends ChannelInterceptorBase{     public ChannelCoordinator() {     }     public void sendMessage(ChannelMessage msg) throws ChannelException {         Network IO Send     } public void messageReceived(ChannelMessage msg) {     Network IO READ         super.messageReceived(msg);     } } ~~~ ⑤ 测试类 ~~~ public class Test{ public void main(String[] args){ ChannelCoordinator coordinator = new ChannelCoordinator(); GzipInterceptor gzipInterceptor = new GzipInterceptor(); coordinator.setNext(null); coordinator.setPrevious(gzipInterceptor); gzipInterceptor.setPrevious(null); gzipInterceptor .setNext(coordinator); gzipInterceptor.sendMessage(msg); coordinator.messageReceived(msg); }  } ~~~     Tribes的拦截器整体设计就如上面,整个拦截器的执行顺序如下,当执行写操作时,数据流向GzipInterceptor -> ChannelCoordinator -> Network IO;当执行读操作时,数据流向则为Network IO -> ChannelCoordinator -> GzipInterceptor。理解了整个设计原理后对于tribes的整体把握将会更加深入。 对java有兴趣的朋友可以交个朋友 ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8d22882.jpg)
';