集群RPC通信
最后更新于:2022-04-01 07:10:53
RPC即远程过程调用,它的提出旨在消除通信细节、屏蔽繁杂且易错的底层网络通信操作,像调用本地服务一般地调用远程服务,让业务开发者更多关注业务开发而不必考虑网络、硬件、系统的异构复杂环境。
先看看集群中RPC的整个通信过程,假设从节点node1开始一个RPC调用,①先将待传递的数据放到NIO集群通信框架(这里使用的是tribes框架)中;②由于使用的是NIO模式,线程无需阻塞直接返回;③由于与集群其他节点通信需要花销若干时间,为了提高CPU使用率当前线程应该放弃CPU的使用权进行等待操作;④NIO集群通信框架tribes接收到node2节点的响应消息,并将消息封装成Response对象保存至响应数组;⑤tribes接收到node4节点的响应消息,由于是使用了并行通信,所以node4可能比node3先返回消息,并将消息封装成Response对象保存至响应数组;⑥tribes最后接收到node3节点的响应消息,并将消息封装成Response对象保存至响应数组;⑦现在所有节点的响应都已经收集完毕,是时候通知刚刚被阻塞的那条线程了,原来的线程被notify醒后拿到所有节点的响应Response[]进行处理,至此完成了整个集群RPC过程。
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd91574dc.jpg)
上面整个过程是在只有一条线程的情况下,一切看起来没什么问题,但如果有多条线程并发调用则会导致一个问题:线程与响应的对应关系将被打乱,无法确定哪个线程对应哪几个响应。因为NIO通信框架不会每个线程都独自使用一个socket通道,为提高性能一般都是使用长连接,所有线程公用一个socket通道,这时就算线程一比线程二先放入tribes也不能保证响应一比响应二先接收到,所以接收到响应一后不知道该通知线程一还是线程二。只有解决了这个问题才能保证RPC调用的正确性。
要解决线程与响应对应的问题就需要维护一个线程响应关系列表,响应从关系列表中就能查找对应的线程,如下图,在发送之前生成一个UUID标识,此标识要保证同socket中唯一,再把UUID与线程对象关系对应起来,可使用Map数据结构实现,UUID的值作为key,线程对应的锁对象为value。接着制定一个协议报文,UUID作为报文的其中一部分,报文发往另一个节点node2后将响应信息message放入报文中并返回,node1对接收到的报文进行解包根据UUID去查找并唤起对应的线程,告诉它“你要的消息已经收到,往下处理吧”。但在集群环境下,我们更希望是集群中所有节点的消息都接收到了才往下处理,如下图下半部分,一个UUID1的请求报文会发往node2、node3和node4三个节点,这时假如只接收到一个响应则不唤起线程,直到node2、node3对应UUID1的响应报文都接收到后才唤起对应线程往下执行。同样地,UUID2、UUID3的报文消息都是如此处理,最后集群中对应的响应都能正确回到各自线程上。
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd916e0fc.jpg)
用简单代码实现一个RPC例子,选择一个集群通信框架负责底层通信,这里使用tribes,接着往下:
①定义一个RPC接口,这些方法是预留提供给上层具体逻辑处理的入口,replyRequest方法用于处理响应逻辑,leftOver方法用于残留请求的逻辑处理。
~~~
public interface RpcCallback {
public Serializable replyRequest(Serializable msg, Member sender);
public void leftOver(Serializable msg, Member sender);
}
~~~
②定义通信消息协议,实现Externalizable接口自定义序列化和反序列化,message用于存放响应消息,uuid标识用于关联线程,rpcId用于标识RPC实例,reply表示是否回复。
~~~
public class RpcMessage implements Externalizable {
protected Serializable message;
protected byte[] uuid;
protected byte[] rpcId;
protected boolean reply = false;
public RpcMessage() {
}
public RpcMessage(byte[] rpcId, byte[] uuid, Serializable message) {
this.rpcId = rpcId;
this.uuid = uuid;
this.message = message;
}
~~~
@Override
~~~
public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException {
reply = in.readBoolean();
int length = in.readInt();
uuid = new byte[length];
in.readFully(uuid);
length = in.readInt();
rpcId = new byte[length];
in.readFully(rpcId);
message = (Serializable) in.readObject();
}
~~~
@Override
~~~
public void writeExternal(ObjectOutput out) throws IOException {
out.writeBoolean(reply);
out.writeInt(uuid.length);
out.write(uuid, 0, uuid.length);
out.writeInt(rpcId.length);
out.write(rpcId, 0, rpcId.length);
out.writeObject(message);
}
}
~~~
③响应类型,提供多种唤起线程的条件,一共四种类型,分别表示接收到第一个响应就唤起线程、接收到集群中大多数节点的响应就唤起线程、接收到集群中所有节点的响应才唤起线程、无需等待响应的无响应模式。
~~~
public class RpcResponseType {
public static final int FIRST_REPLY = 1;
public static final int MAJORITY_REPLY = 2;
public static final int ALL_REPLY = 3;
public static final int NO_REPLY = 4;
}
~~~
④响应对象,用于封装接收到的消息,Member在通信框架tribes是节点的抽象,这里用来表示来源节点。
~~~
public class RpcResponse {
private Member source;
private Serializable message;
public RpcResponse() {
}
public RpcResponse(Member source, Serializable message) {
this.source = source;
this.message = message;
}
public void setSource(Member source) {
this.source = source;
}
public void setMessage(Serializable message) {
this.message = message;
}
public Member getSource() {
return source;
}
public Serializable getMessage() {
return message;
}
}
~~~
⑤RPC响应集,用于存放同个UUID的所有响应。
~~~
public class RpcCollector {
public ArrayList responses = new ArrayList();
public byte[] key;
public int options;
public int destcnt;
public RpcCollector(byte[] key, int options, int destcnt) {
this.key = key;
this.options = options;
this.destcnt = destcnt;
}
public void addResponse(Serializable message, Member sender){
RpcResponse resp = new RpcResponse(sender,message);
responses.add(resp);
}
public boolean isComplete() {
if ( destcnt <= 0 ) return true;
switch (options) {
case RpcResponseType.ALL_REPLY:
return destcnt == responses.size();
case RpcResponseType.MAJORITY_REPLY:
{
float perc = ((float)responses.size()) / ((float)destcnt);
return perc >= 0.50f;
}
case RpcResponseType.FIRST_REPLY:
return responses.size()>0;
default:
return false;
}
}
public RpcResponse[] getResponses() {
return responses.toArray(new RpcResponse[responses.size()]);
}
}
~~~
⑥RPC核心类,是整个RPC的抽象,它要实现tribes框架的ChannelListener接口,在messageReceived方法中处理接收到的消息。因为所有的消息都会通过此方法,所以它必须要根据key去处理对应的线程,同时它也要负责调用RpcCallback接口定义的相关的方法,例如响应请求的replyRequest方法和处理残留的响应leftOver方法,残留响应是指有时我们在接收到第一个响应后就唤起线程。
~~~
public class RpcChannel implements ChannelListener {
private Channel channel;
private RpcCallback callback;
private byte[] rpcId;
private int replyMessageOptions = 0;
private HashMap responseMap = new HashMap();
public RpcChannel(byte[] rpcId, Channel channel, RpcCallback callback) {
this.rpcId = rpcId;
this.channel = channel;
this.callback = callback;
channel.addChannelListener(this);
}
public RpcResponse[] send(Member[] destination, Serializable message,
int rpcOptions, int channelOptions, long timeout)
throws ChannelException {
int sendOptions = channelOptions& ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK;
byte[] key = UUIDGenerator.randomUUID(false);
RpcCollector collector = new RpcCollector(key, rpcOptions,
destination.length);
try {
synchronized (collector) {
if (rpcOptions != RpcResponseType.NO_REPLY)
responseMap.put(key, collector);
RpcMessage rmsg = new RpcMessage(rpcId, key, message);
channel.send(destination, rmsg, sendOptions);
if (rpcOptions != RpcResponseType.NO_REPLY)
collector.wait(timeout);
}
} catch (InterruptedException ix) {
Thread.currentThread().interrupt();
} finally {
responseMap.remove(key);
}
return collector.getResponses();
}
~~~
@Override
~~~
public void messageReceived(Serializable msg, Member sender) {
RpcMessage rmsg = (RpcMessage) msg;
byte[] key = rmsg.uuid;
if (rmsg.reply) {
RpcCollector collector = responseMap.get(key);
if (collector == null) {
callback.leftOver(rmsg.message, sender);
} else {
synchronized (collector) {
if (responseMap.containsKey(key)) {
collector.addResponse(rmsg.message, sender);
if (collector.isComplete())
collector.notifyAll();
} else {
callback.leftOver(rmsg.message, sender);
}
}
}
} else {
Serializable reply = callback.replyRequest(rmsg.message, sender);
rmsg.reply = true;
rmsg.message = reply;
try {
channel.send(new Member[] { sender }, rmsg, replyMessageOptions
& ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
} catch (Exception x) {
}
}
}
~~~
@Override
~~~
public boolean accept(Serializable msg, Member sender) {
if (msg instanceof RpcMessage) {
RpcMessage rmsg = (RpcMessage) msg;
return Arrays.equals(rmsg.rpcId, rpcId);
} else
return false;
}
}
~~~
⑦自定义一个RPC,它要实现RpcCallback接口,分别对请求处理和残留响应处理,这里请求处理仅仅是简单返回“hello,response for you!”作为响应消息,残留响应处理则是简单输出“receive a leftover message!”。假如整个集群有五个节点,由于接收模式设置成了FIRST_REPLY,所以每个只会接受一个响应消息,其他的响应都被当做残留响应处理。
~~~
public class MyRPC implements RpcCallback {
@Override
public Serializable replyRequest(Serializable msg, Member sender) {
RpcMessage mapmsg = (RpcMessage) msg;
mapmsg.message = "hello,response for you!";
return mapmsg;
}
@Override
public void leftOver(Serializable msg, Member sender) {
System.out.println("receive a leftover message!");
}
public static void main(String[] args) {
MyRPC myRPC = new MyRPC();
byte[] rpcId = new byte[] { 1, 1, 1, 1 };
byte[] key = new byte[] { 0, 0, 0, 0 };
String message = "hello";
int sendOptions = Channel.SEND_OPTIONS_SYNCHRONIZED_ACK
| Channel.SEND_OPTIONS_USE_ACK;
RpcMessage msg = new RpcMessage(rpcId, key, (Serializable) message);
RpcChannel rpcChannel = new RpcChannel(rpcId, channel, myRPC);
RpcResponse[] resp = rpcChannel.send(channel.getMembers(), msg,
RpcResponseType.FIRST_REPLY, sendOptions, 3000);
while(true)
Thread.currentThread().sleep(1000);
}
}
~~~
可以看到通过上面的RPC封装后,上层可以把更多的精力关注到消息逻辑处理上面了,而不必关注具体的网络IO如何实现,屏蔽了繁杂重复的网络传输操作,为上层提供了很大的方便。
从单机到集群会话的管理之集群模式二(更大的集群)
最后更新于:2022-04-01 07:10:51
《从单机到集群会话的管理之集群模式一》中讲到的全节点复制的网络流量随节点数量增加呈平方趋势增长,也正是因为这个因素导致无法构建较大规模的集群,为了使集群节点能更加大,首要解决的就是数据复制时流量增长的问题,下面将介绍另外一种会话管理方式,每个会话只会有一个备份,它使会话备份的网络流量随节点数量的增加呈线性趋势增长,大大减少了网络流量和逻辑操作,可构建较大的集群。
下面看看这种方式具体的工作机制,集群一般是通过负载均衡对外提供整体服务,所有节点被隐藏在后端组成一个整体。前面各种模式的实现都无需负载均衡协助,所以图中都把负载均衡省略了。最常见的负载方式是前面用apache拖所有节点,它支持将类似“326257DA6DB76F8D2E38F2C4540D1DEA.tomcat1”的会话id进行分解,定位到tomcat集群中以tomcat1命名的节点上(这种方式称为Session Stick,由apache jk模块实现)。每个会话存在一个原件和一个备份,且备份与原件不会保存在同一个节点上,如下图,例如当客户端发起请求后通过负载均衡被分发到tomcat1实例节点上,生成一个包含.tomcat1后缀的会话标识,并且tomcat1节点根据一定策略选出此次会话对象备份的节点,然后将包含了{会话id,备份ip}的信息发送给tomcat2、tomcat3、tomcat4,如图中虚线所示,这样每个节点都有一个会话id、备份ip列表,即每个节点都有每个会话的备份ip地址。
完成上面一步后就是将会话内容备份到备份节点上,假如tomcat1的s1、s2两个会话的备份地址为tomcat2,则把会话对象备份到tomcat2中,类似的有tomcat2把s3会话备份到tomcat4,tomcat4把s4、s5两个对话备份到tomcat3,这样集群中所有的会话都已经有了一份备份。当tomcat1一直不出故障,由于Session Stick技术客户端将一直访问到tomcat1节点上,保证一直能获取到会话。而当tomcat1出故障了,这时tomcat也提供了一个failover机制,apache感知到后端集群tomcat1节点被移除了,这时它会把请求随机分配到其他任意节点上,接下去会有两种情况:
①刚好分到了备份节点tomcat2上,此时仍能获取到s1会话,除此之外,tomcat2还要另外做的事是将这个s1会话标记为原件且继续选取一个备份地址备份s1会话,这样一来又有了备份。
②假如分到了非备份节点tomcat3,此时肯定找不到s1会话,于是它将向集群所有节点发问,“请问谁有s1会话的备份ip地址信息?”,因为只有tomcat2有s1的备份地址信息,它接收到询问后应答告知tomcat3节点s1会话的备份在tomcat2,根据这个信息就能查到s1会话了,并且tomcat3在自己本地生成s1会话并标为原件,tomcat2上的副本不变,这样一来同样能找到s1会话,正常完整整个请求处理。
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd91307fb.jpg)
《从单机到集群会话的管理》系列文章从单机到集群分析了web服务器的会话管理的不同模型,包含了单机非持久化、单机文件持久化、单机数据库(缓存)持久化、集群数据库(缓存)、集群全节点复制、集群原件副本备份等等。分析了不同模型的工作原理及优点不足,深入理解各种会话管理模式对于实际项目的会话方案选型有很大的帮助。
从单机到集群会话的管理之集群模式一
最后更新于:2022-04-01 07:10:49
为什么要使用集群?主要有两方面原因:一是对于一些核心系统要求长期不能中断服务,为了提供高可用性我们需要由多台机器组成的集群;另外一方面,随着访问量越来越大且业务逻辑越来越复杂,单台机器的处理能力已经不足以处理如此多且复杂的逻辑,于是需要增加若干台机器使整个服务处理能力得到提升。
如果说一个web应用不涉及会话的话,那么做集群是相当简单的,因为节点都是无状态的,集群内各个节点无需互相通信,只需要将各个请求均匀分配到集群节点即可。但基本所有web应用都会使用会话机制,所以做web应用集群时整个难点在于会话数据的同步,当然你可以通过一些策略规避复杂的额数据同步操作,例如前面说到的把会话信息保存在分布式缓存或数据库中统一集中管理,如下图,每个tomcat实例只需去写入或读取数据库即可,避免了tomcat集群之间的通信。但这种方式也有不足,要额外引入数据库或缓存服务,同时也要保证它们的高可用性,增加了机器和维护成本。
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd90b6d78.jpg)
鉴于以上存在的不足,提供另一种解决思路就是tomcat集群节点自身完成各自的数据同步,不管访问到哪个节点都能找到对应的会话,如下图,客户端第一次访问生成会话,tomcat自身会将会话信息同步到其他节点上,而且是每次请求完成都会同步此次请求过程中对session的所有操作,这样一来下一次请求到集群中任意节点都能找到响应的会话信息,且能保证信息的及时性。细看很容易发现集群的节点之间的会话是两两互相复制的,一旦集群节点数量及访问量大起来,将导致大量的会话信息需要互相复制同步,很容易导致网络阻塞,而且这些同步操作很可能会成为整体性能的瓶颈,根据经验,此种方案在实际生产上推荐的集群节点个数为3-6个,无法组建更大的集群,而且冗余了大量的数据,利用率不高。
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd90f1a63.jpg)
全节点复制的网络流量随节点数量增加呈平方趋势增长,也正是因为这个因素导致无法构建较大规模的集群,为了使集群节点能更加大,首要解决的就是数据复制时流量增长的问题,下节将介绍另外一种会话管理方式,每个会话只会有一个备份,它使会话备份的网络流量随节点数量的增加呈线性趋势增长,大大减少了网络流量和逻辑操作,可构建较大的集群。
喜欢java的同学可以加个好友:
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8d22882.jpg)
从单机到集群会话的管理之单机模式
最后更新于:2022-04-01 07:10:46
单机时代对会话的管理主要有两种方式——非持久化方式和持久化方式。非持久化方式指会话直接由tomcat管理并保存在机器内存上,它是最简单的方式,如下图,所有的会话集合都保存在内存上,客户端访问时根据自己的会话id直接在服务器内存中寻找,查找简单且速度快,但同时也存在两个缺点:一是容量比较小,当数据量大时容易导致内存不足;一是机器意外停止会导致会话数据丢失缺点。
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd905fd6c.jpg)
为了解决上面非持久化方式存在的缺陷,我们需要引入持久化机制,即持久化方式。可以将会话数据以文件形式持久化到硬盘中,也可以通过数据库持久化会话数据。首先看硬盘持久化,如下图,会话数据会以文件形式保存在硬盘中,由于硬盘比存储空间比内存大且机器意外关机都不会使数据丢失,所以硬盘存储解决了上面两个缺点,但是硬盘读取的速度比较慢,可能会影响整体的响应时间,硬盘持久化方式在实际中基本不会使用。
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd907c45f.jpg)
Tomcat提供的另外一种默认的持久化方式就是将会话数据持久化到数据库上,所有会话数据交由数据库存储,tomcat通过jdbc数据库驱动并使用连接池技术去数据库指定表读取会话信息,此种方式解决了非持久化方式的所有缺点同时也对以文件方式存储方式的IO进行了优化,用数据库存储会话其实是一种集中管理模式,现在实际中更多是使用一个分布式缓存替代数据库,例如memcached、redis集群等,因为缓存的查询读取速度快,且集群解决了高可用的问题,但Tomcat官方版本是不提供会话保存到memcached或redis的支持,如要使用可自己编写一个会话管理器及一个阀门valve,或使用第三方jar包。需要说明的是集中管理模式不管是tomcat单机还是集群模式都可以使用。
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd9095b51.jpg)
集群通信组件tribes之使用方法
最后更新于:2022-04-01 07:10:44
上面已经对tribes的内部实现机制及原理进行了深入的剖析,在理解它的设计原理后看看如何使用tribes,整个使用相当简单便捷,只需要四步:
① 定义一个消息对象,由于这个消息对象是要在网络之间传递的,网络传输涉及到序列化,所以需要实现Serializable接口。
~~~
public class MyMessage implements Serializable {
private String message;
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
~~~
② 定义一个ChannelListener监听器,对消息的处理逻辑放在messageReceived方法中。
~~~
public class MyMessageListener implements ChannelListener{
public boolean accept(Serializable myMessage, Member member) {
return true;
}
public void messageReceived(Serializable myMessage, Member member) {
System.out.println(((MyMessage)myMessage).getMessage()+" from "+member.getName());
}
}
~~~
③ 定义一个MembershipListener监听器,对集群成员的加入及失效的逻辑处理,在memberAdded中对成员加入事件逻辑处理,在memberDisappeared中对成员失效事件逻辑处理。
~~~
public class MyMemberListener implements MembershipListener {
public void memberAdded(Member member) {
System.out.println(member.getName()+" Added");
}
public void memberDisappeared(Member member) {
System.out.println(member.getName()+" Disappeared");
}
}
~~~
④ 主程序,分别实例化ChannelListener、MembershipListener并添加到channel中,然后启动channel,由于集群通信需要启动几个节点才可实现,为方便操作这里引入args参数,当参数值为”r”时表示只是启动一个节点并加入集群,而参数值为”s”时则表示启动节点加入集群后并且向集群所有成员发送Message,主程序使用循环睡眠是为了不让程序结束,一旦结束节点就不存在了。可以先带”r”参数运行两次,即意味着启动了两个节点,最后再带”s”参数运行,即第三个节点启动并向前两个成语节点发送消息,前两个节点分别输出了”hello from tcp://{169, 254, 75, 186}:4002”,而成员监听器则会在节点加入或失效时输出类似这样的消息”tcp://{169, 254, 75, 186}:4002 Added”、”tcp://{169, 254, 75, 186}:4000 Disappeared”。
~~~
public class TribesTest {
public static void main(String[] args) throws ChannelException,InterruptedException {
Channel myChannel = new GroupChannel();
ChannelListener msgListener = new MyMessageListener();
MembershipListener mbrListener = new MyMemberListener();
myChannel.addMembershipListener(mbrListener);
myChannel.addChannelListener(msgListener);
myChannel.start(Channel.DEFAULT);
switch (args[0]) {
case ("r"):
while (true)
Thread.currentThread().sleep(1000);
case ("s"):
MyMessage myMsg = new MyMessage();
myMsg.setMessage("hello");
Member[] group = myChannel.getMembers();
myChannel.send(group, myMsg, Channel.SEND_OPTIONS_DEFAULT);
while (true)
Thread.currentThread().sleep(1000);
}
}
}
~~~
喜欢java的可以交个朋友:
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8d22882.jpg)
集群通信组件tribes之应用程序处理入口
最后更新于:2022-04-01 07:10:42
Tribes为了更清晰更好地划分职责,它被设计成用IO层和应用层,IO层专心负责网络传输方面的逻辑处理,把接收到的数据往应用层传送,当然应用层发送的数据也是通过此IO层发送,数据传往应用层后必须要留一些处理入口供应用层进行逻辑处理,而考虑系统解耦,这个入口最好的方式是使用监听器模式,在底层发生各种事件时触发所有安装好的监听器,使之执行监听器里面的处理逻辑。这些事件主要包含了集群成员的加入和退出、消息报文接收完毕等信息,所以整个消息流转过程被分成两类监听器,一类是跟集群成员的变化相关的监听器MembershipListener,另外一类是跟集群消息接收发送相关的监听器ChannelListener。应用层只要关注这两个接口就行了,写好各种处理逻辑的监听器添加到通道中即可。
下面是这两个监听器的接口,从接口定义的方法可以很清晰地看到各个方法被调用的时机,MembershipListener类型中memberadd是有成员加入时调用的方法,memberDisappeared是成员退出时调用的方法,ChannelListener类型中accept用于判断是否接受消息,messageReceived用于对消息处理的方法,应用层把逻辑分别写到这几个方法就可以在对应时刻执行相应的逻辑。
~~~
public interface MembershipListener {
public void memberAdded(Member member);
public void memberDisappeared(Member member);
}
public interface ChannelListener {
public void messageReceived(Serializable msg, Member sender);
public boolean accept(Serializable msg, Member sender);
}
~~~
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd903183a.jpg)
我们可以在应用层自定义若干监听器并且添加到GroupChannel中的两个监听器列表中,GroupChannel其实可以看成是一个封装了IO层的抽象容器,它会在各个适当的时期遍历监听器列表中的所有监听器并调用监听器对应的方法,即执行应用层定义的业务逻辑,至此完成了数据从IO层流向应用层并完成处理。两种类型的监听器给应用层提供了处理入口,应用层只需关注逻辑处理,而其他的IO操作则交由IO层,这两层通过监听器模式串联起来,优雅地将模块解耦。
喜欢java的可以交个朋友:
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8d22882.jpg)
内存数据网格hazelcast的一些机制原理
最后更新于:2022-04-01 07:10:39
hazelcast作为一个内存数据网格工具,还算比较优秀,听说有Apache顶级项目使用它,值得研究下,使用文档可以直接看官方文档,但机制原理相关的资料基本没有,本人硬撸源码写的一些东西,跟大家分享一下。
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8e44fb8.jpg)![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8e5ba06.jpg)![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8e6cd57.jpg)![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8e810a8.jpg)![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8e94dc7.jpg)![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8eaac71.jpg)![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8ec1d4a.jpg)![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8edcf99.jpg)![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8f03ff8.jpg)![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8f19ad5.jpg)![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8f2af11.jpg)![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8f406d3.jpg)![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8f576c0.jpg)![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8f6c6f8.jpg)![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8f7d1da.jpg)![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8f92b80.jpg)![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8fa5d2b.jpg)![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8fc25e1.jpg)![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8fdc4e0.jpg)![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8fee4a2.jpg)
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd901739f.jpg)
喜欢java的可以交个朋友:
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8d22882.jpg)
集群通信组件tribes之通道拦截器
最后更新于:2022-04-01 07:10:37
拦截器应该可以说是一个很经典的设计模式,它有点类似于过滤器,当某信息从一个地方流向目的地的过程中,可能需要统一对信息进行处理,如果考虑到系统的可扩展性和灵活性通常就会使用拦截器模式,它就像一个个关卡被设置在信息流动的通道中,并且可以按照实际需要添加和减少关卡。Tribes为了在应用层提供对源消息统一处理的渠道引入通道拦截器,用户在应用层只需要根据自己需要添加拦截器即可,例如,压缩解压拦截器、消息输出输入统计拦截器、异步消息发送器等等。
拦截器的数据流向示意图可以参考前面的tribes简介章节,数据从IO层流向应用层,中间就会经过一个拦截器栈,应用层处理完就会返回一个ack给发送端表示已经接收并处理完毕(消息可靠级别为SYNC_ACK),下面尝试用最简单一些代码和伪代码说明tribes的拦截器实现,旨在领会拦截器如何设计而并非具体的实现。最终实现的功能如图所示,最底层的协调者ChannelCoordinator永远作为第一个加入拦截器栈的拦截器,往上则是按照添加顺序排列,且每个拦截器的previous、next分别指向前一个拦截器和下一个拦截器。
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8e2e5f8.jpg)
① 定义拦截器接口
~~~
public interface ChannelInterceptor{
public void setNext(ChannelInterceptor next) ;
public ChannelInterceptor getPrevious();
public void sendMessage(ChannelMessage msg);
public void messageReceived(ChannelMessage msg);
}
~~~
② 定义一个基础拦截器,提供一些公共的操作,由于拦截器执行完后要触发下个拦截器,所以把触发工作统一抽离到基础类里面完成,当然里面必须包含前一个和后一个拦截器的引用。
~~~
public class ChannelInterceptorBase implements ChannelInterceptor {
private ChannelInterceptor next;
private ChannelInterceptor previous;
public ChannelInterceptorBase() {
}
public final void setNext(ChannelInterceptor next) {
this.next = next;
}
public final ChannelInterceptor getNext() {
return next;
}
public final void setPrevious(ChannelInterceptor previous) {
this.previous = previous;
}
public final ChannelInterceptor getPrevious() {
return previous;
}
public void sendMessage(ChannelMessage msg) {
if (getNext() != null) getNext().sendMessage(msg,);
}
public void messageReceived(ChannelMessage msg) {
if (getPrevious() != null) getPrevious().messageReceived(msg);
}
}
~~~
③ 压缩解压拦截器,此拦截器负责按一定算法压缩和解压处理。
~~~
public class GzipInterceptor extends ChannelInterceptorBase {
public void sendMessage(ChannelMessage msg){
compress the msg;
getNext().sendMessage(msg);
}
public void messageReceived(ChannelMessage msg) {
decompress the msg;
getPrevious().messageReceived(msg);
}
}
~~~
④ 最底层的协调器,直接与网络IO做交互
~~~
public class ChannelCoordinator extends ChannelInterceptorBase{
public ChannelCoordinator() {
}
public void sendMessage(ChannelMessage msg) throws ChannelException {
Network IO Send
}
public void messageReceived(ChannelMessage msg) {
Network IO READ
super.messageReceived(msg);
}
}
~~~
⑤ 测试类
~~~
public class Test{
public void main(String[] args){
ChannelCoordinator coordinator = new ChannelCoordinator();
GzipInterceptor gzipInterceptor = new GzipInterceptor();
coordinator.setNext(null);
coordinator.setPrevious(gzipInterceptor);
gzipInterceptor.setPrevious(null);
gzipInterceptor .setNext(coordinator);
gzipInterceptor.sendMessage(msg);
coordinator.messageReceived(msg);
}
}
~~~
Tribes的拦截器整体设计就如上面,整个拦截器的执行顺序如下,当执行写操作时,数据流向GzipInterceptor -> ChannelCoordinator -> Network IO;当执行读操作时,数据流向则为Network IO -> ChannelCoordinator -> GzipInterceptor。理解了整个设计原理后对于tribes的整体把握将会更加深入。
对java有兴趣的朋友可以交个朋友
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8d22882.jpg)
集群通信组件tribes之集群的消息接收通道
最后更新于:2022-04-01 07:10:35
与消息发送通道对应,发送的消息需要一个接收端接收消息,它就是ChannelReceiver。接收端负责接收处理其他节点从消息发送通道发送过来的消息,实际情况如图每个节点都有一个ChannelSender和ChannelReceiver,ChannelSender向其他节点的ChannelReceiver发送消息。本质是每个节点暴露一个端口作为服务端监听客户端,而每个节点又充当客户端连接其他节点的服务端,所以ChannelSender就是充当客户端的集合,ChannelReceiver充当服务端。
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8dd6aa8.jpg)
集群消息复制过程中,每个节点ChannelReceiver负责接收来自其他节点的消息,假设一个n节点的集群,一般情况下每个ChannelReceiver对应n-1个连接,因为集群之间的通信连接都是长连接,长连接有助于提高通信效率,如下图,4个节点的集群,node1的ChannelReceiver的客户端连接数为3,分别是node2、node3、node4三个节点作为客户端发起的socket连接。这三个节点产生的数据会通过此通道同步到node1节点,同样地,node2的ChannelReceiver拥有node1、node3、node4的客户端连接,这三个节点产生的数据也会同步到node2节点。Node3、node4也拥有三个客户端连接。为提高处理效率,此处还是使用NIO处理模型。
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8e06fc6.jpg)
除此之外,再接收操作中为了优化性能采取了很多措施,例如引入任务池,即是把接收任务提前定义好放入内存中,接收时可直接获取使用而不用再实例化;例如一次获取若干个报文进行处理,即使用nio模式读取消息到缓冲区后直接处理整个缓冲区的消息,它可能包含若干个报文;网络IO需要优化的地方及手段都比较多,tribes确实已经做了很多优化方面的工作。
喜欢java的同学可以交个朋友:
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8d22882.jpg)
集群通信组件tribes之集群的平行通信
最后更新于:2022-04-01 07:10:33
前面的集群成员维护服务为我们提供了集群内所有成员的地址端口等信息,可以通过MembershipService可以轻易从节点本地的成员列表获取集群所有的成员信息,有了这些成员信息后就可以使用可靠的TCP/IP协议进行通信了。这节讨论的正是实际中真正用于消息传送通道的相关机制及实现细节。
如下图,四个节点本地都拥有了一张集群成员的信息列表,这时节点1有这么一个需求:为了保证数据的安全可靠,在往自己的内存存放一份数据的同时还要同步到其他三个节点的内存中。节点1有一个专门负责发送的组件ChannelSender,首先从成员列表中获取其他三个节点的通信地址和端口,再分别向此三个节点建立TCP/IP通道发送数据,其他节点有一个负责接收数据的服务,将数据更新到内存中。
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8d866ff.jpg)
最理想的状态是发送给节点2、3、4都成功了,这样从整体看来ChannelSender像是提供了一个多通道的平行发送方式,所以也称为平行发送器。但现实中并不能保证同一批的消息往所有节点发送都成功,有可能发送到节点3时因为某种原因失败了,而节点2、4都成功了,这时通常要采取一些策略来应对,例如重新发送。Tribes所使用的策略是优先进行若干次尝试发送,若干次失败后将向上抛出异常信息,异常信息包含哪些节点发送失败及其原因,默认的尝试次数是1次。
为确保数据确实被节点接收到,需要在应用层引入一个协议保证传输的可靠性,即是通知机制,发送者发送消息给接收者,接受者接收到后返回一个ACK表示自己已经接收成功。Tribes中详细的协议报文定义如下:START_DATA(7bytes)+消息长度(4bytes)+消息长度(nbytes)+END_DATA(7bytes)。START_DATA为数据开始标识,为固定数组值70,76,84,50,48,48,50,END_DATA为数据结束标识,为固定数组值84,76,70,50,48,48,51,ACK_DATA表示通知报文,为固定数组值6,2,3。所以如果传输的是通知报文的话即为START_DATA+ACK_DATA的长度+ACK_DATA+END_DATA。所以整个集群的消息同步如下图,节点1通过ChannelSender发送消息给节点2、3、4,发送成功的判定标准就是节点2、3、4返回给节点1一个ack标识,节点1接收到了则认为发送成功。
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8db3672.jpg)
为提高通信效率这里默认使用了NIO模式而非BIO模式(也可设置为BIO模式),使用NIO模式能统一管理所有通信的channel,避免了等待一个通道发送完毕另一个通道才能发送,如果逐个通信将导致阻塞IO时间很长通信效率低下。另外平行发送的过程需要一个锁保证消息的正确发送,例如有data1、data2、data3三个数据需要发送,应该是一个接一个数据包发送的而不能data1发一部分data2发一部分。
这节介绍了Tribes如何向集群其他成员发送数据,通过本地获取其他成员节点的地址和端口,再通过平行消息发送通道发送给其他节点,其中有ack机制和重发机制保证数据成功接收。
喜欢java的同学可以交个朋友:
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8d22882.jpg)
集群通信组件Tribes之如何维护集群成员信息
最后更新于:2022-04-01 07:10:30
一个集群包含若干成员,要对这些成员进行管理就必须要有一张包含所有成员的列表,当要对某个节点做操作时通过这个列表可以准确找到该节点的地址进而对该节点发送操作消息。如何维护这张包含所有成员的列表是本节要讨论的主题。
成员维护是集群的基础功能,一般划分一个独立模块或层完成此功能,它提供成员列表查询、成员维护、成员列表改变事件通知等能力。由于tribes定位于基于同等节点之间的通信,所以并不存在主节点选举的问题,它所要具备的功能是自动发现节点,即新节点加入要通知集群其他成员更新成员列表,让每个节点都能及时更新成员列表,每个节点都维护一份集群成员表。如图,节点1、节点2、节点3使用组播通过交换机各自已经维护一份成员列表,且他们隔一段时间向交换机组播自己节点消息,即心跳操作。当第四个节点加入集群组,节点四向交换机组播自己的节点消息,原理三个节点接收到后各自把节点四加入到各自的成员列表中,而原来三个节点也不断向交换机发送节点消息,节点四接收到后依次更新成员列表信息,最终达到四个节点都拥有四个节点成员信息。
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8d625c9.jpg)
看下tribes的集群是如何设计实现以上功能的,其成员列表的创建维护是基于经典的组播方式实现,每个节点都创建一个节点信息发射器和节点信息接收器,让他们运行于独立的线程中。发射器用于向组内发送自己节点的消息,而接收器则用于接收其他节点发送过来的节点消息并进行处理。要使节点之间通信能被识别就需要定义一个语义,即约定报文协议的结构,tribes的成员报文是这样定义的,两个固定值用于表示报文的开始和结束,开始标识TRIBES_MBR_BEGIN 的值为字节数组84, 82, 73, 66, 69, 83, 45, 66, 1, 0,结束标识TRIBES_MBR_END的值为字节数组84, 82, 73, 66, 69, 83, 45, 69, 1, 0,整个协议包结构为:开始标识(10bytes)+包长度(4bytes)+存活时间(8bytes)+tcp端口(4bytes)+安全端口(4bytes)+udp端口(4bytes)+host长度(1byte)+host(nbytes)+命令长度(4bytes)+命令(nbytes)+域名长度(4bytes)+域名(nbytes)+唯一会话id(16bytes)+有效负载长度(4bytes)+有效负载(nbytes)+结束标识(10bytes)。成员发射器按照协议组织成包结构并组播,接收器接收包并按照协议进行解包,根据包信息维护成员表。
下面用一段代码简单展示实现过程,由于篇幅问题包的处理省略:
~~~
public class McastService {
private MulticastSocket socket;
private String address = "228.0.0.4";
private int port = 8000;
private InetAddress addr;
private byte[] buffer = new byte[2048];
private DatagramPacket receivePacket;
private final Object sendLock = new Object();
public void start() {
try {
addr = InetAddress.getByName(address);
receivePacket = new DatagramPacket(buffer, buffer.length, addr,port);
socket.joinGroup(addr);
new ReceiverThread().start();
new SenderThread().start();
} catch (IOException e) {
}
}
public class ReceiverThread extends Thread {
public void run() {
while (true) {
try {
receive();
} catch (ArrayIndexOutOfBoundsException ax) {
}
}
}
}
public class SenderThread extends Thread {
public void run() {
while (true) {
try {
send();
} catch (Exception x) {
}
try {
Thread.sleep(1000);
} catch (Exception ignore) {
}
}
}
}
public void send() {
byte[] data = 按照成员协议组织包结构;
DatagramPacket packet = new DatagramPacket(data, data.length, addr, port);
try {
socket.send(packet);
} catch (IOException e) {
}
}
public void receive() {
try {
socket.receive(receivePacket);
解析处理成员报文。
} catch (IOException e) {
}
}
}
~~~
第一步要先执行加入组播成员操作,接着分别启动接收器线程、发射器线程,一般接收器要优先启动。发射器每隔1秒组织协议包发送心跳,组播组内成员的接收器对接收到的协议报文进行解析,按照一定的逻辑更新各自节点本地成员列表,如果成员表已包含协议包的成员则只更新存活时间等消息。
Tribes利用上述原理维护集群成员,并且由独立模块MembershipService提供成员的相关服务,例如获取集群所有成员相关信息等。
喜欢java的同学可以交个朋友:
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8d22882.jpg)
集群通信组件Tribes之整体介绍
最后更新于:2022-04-01 07:10:28
接下来一系列文章会对集群通信框架tribes进行源码级别的分析,欢迎讨论。
把若干机器组合成一个集群,集群为了能协同工作,成员之间的通信是必不可少的,当然可以说这也是集群实现中重点需要解决的核心问题,一个强大的通信协同机制是集群的基础。
简约地说,Tribes是一个具备让你通过网络向组成员发送和接收信息、动态检测发现其他节点的组通信能力的高扩展性的独立的消息框架。在组成员之间进行信息复制及成员维护是一个相对复杂的事情,因为不仅要考虑各种通信协议还要有必要的机制提供不同的消息传输保证级别,且成员关系的维护要及时准确,同时针对IO不同场景需提供不同的IO模式,这些都是组成员消息传输要遇到的需要深入考虑的几点。而Tribes很好地将点对点、点对组的通信抽象得即简单又相对灵活。
Tribes拥有消息可靠的传输机制,它默认基于TCP协议传输,TCP拥有三次握手机制保证且有流量控制机制,另外在应用层面的消息可靠保证分为三个级别:
①NO_ACK级别,这是可靠级别最低的方式,使用此种级别时则认为Tribes一旦把消息发送给socket的发送队列则认为发送成功,尽管传输过程中发生异常导致接收方可能没有接收到,当然这种级别也是发送最快的方式。
②ACK级别,这是最推荐使用的一种方式,它能保证接收方肯定接能收到消息,Tribes向其他节点发送消息后只有接收到了接受者的确认消息才会认为发送成功,这种确认机制能在更高层面保证消息可靠性,不过发送效率会有影响,因为每个消息都需要确认,得不到确认的会重发。
③SYNC_ACK级别,这种方式不仅保证传输成功还保证执行成功,Tribes向其他节点发送消息后接收者接收到不马上返回ACK确认而是对接收到的消息进行处理,直到处理成功才返回ACK确认。如果接收成功处理失败接收者会返回ACK_FAIL给发送者,发送者将会重发。当然这种级别消息发送效率是最低最慢的。
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8d413a3.jpg)
整个Tribes的设计核心可以用上图表示,在IO层有三个重要的模块,其中MembershipService模块主要负责组成员关系的维护,包括维护现有成员及发现新成员,这些工作都是模块自动运行完成,你无需关心组成员的维护工作;ChannelSender模块负责向组内其他成员发送消息及其各种机制的详细实现;ChannelReceiver模块用于接收组内其他成员发送过来的消息及其各种机制的详细实现。消息的可靠性就是通过ChannelSender及ChannelReceiver的协同得到不同级别的保证的。拦截器栈提供了在消息传送到应用层之前对消息进行一些额外的操作,例如对某些信息进行过滤编码等等操作;最后到应用层,多数情况下我们只需关注应用层的东西即能使用起来,应用层面主要就是一些监听器,所以只要实现监听器里面指定的方法即可以对IO层传输上来的消息做逻辑处理。
拦截器、监听器的引入都是经典的模式,抽象一个底层作为数据处理层,实现各种复杂的通信及机制,而拦截器则是对底层数据的一种统一额外加工处理,监听器则作为接口提供应用层对数据做业务逻辑处理,组成了一个优雅的设计方案。
喜欢研究java的同学可以交个朋友:
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8d22882.jpg)
集群
最后更新于:2022-04-01 07:10:26
## 集群
* 现在如果要构造一个真正在生产环境上可使用的可靠的系统,基本都离不开集群的概念,总的来说集群是指由若干互相独立的机器通过高速网络组成的一个整体服务,整个集群的内部实现相对外部是透明的,对外部而言它就像一个独立的服务器。要使若干机器协同工作不是一件简单的事,其核心是如何在多机器之间进行通信及各种任务调度使之协同工作。
* 在工程上常见的两种集群是——负载均衡集群和高可用集群。
* 负载均衡集群(Load Balance Cluster),随着系统的处理量的不断增加,很快到达一台机器的处理极限,所以需要更多的机器来分担处理,负载均衡集群一般是通过一定的分发算法把访问流量均匀分布到集群里面的各个机器上进行处理,理想的集群是通过添加机器使处理能力达到线性增长,但现实中往往处理过程并非是无状态的,会涉及到一些共享状态变量,所以当集群数量达到一定程度后处理能力并不能按线性增长且可靠性可能也会降低。关于负载均衡集群如何协调分配分发请求的问题,即可以使用专门的负载均衡硬件如F5,也可以使用软件级别的方式实现负载均衡如LVS、HAProxy等。
* 高可用集群(High Available Cluster),高可用集群通过软件把若干机器连接起来,这种集群更偏重的是当集群中某个机器发生故障后能通过自动切换或流量转移等措施来保证整个集群对外的可用性。在实际运用中很经典的就是mysql数据库的主从节点,一般情况如果是一主多从我们会使用读写分离措施,主节点负责执行写操作而从节点执行读操作,一旦主节点发生故障后系统将自动从多个从节点中选举出一个节点作为主节点继续对外提供服务,保证了服务的高可用。高可用集群与负载均衡集群一般不会绝对地划分开,我们不会将一台机器什么也不做就放着就等其他机器出故障再去候补,为了使用地更加高效可以把这些备用(standby)的机器用于负载均衡,故障发生后再接管故障节点的职责。
喜欢java的同学可以交个朋友:
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8d22882.jpg)
java组播MulticastSocket
最后更新于:2022-04-01 07:10:24
在单播模式中有服务器端和客户端之分,而组播模式与单播模式不同,每个端都是以路由器或交换机做为中转广播站,任意一端向路由器或交换机发送消息,路由或交换机负责发送其他节点,每个节点都是同等的。所以在编程模式上用同一个类表示即可——MulticastSocket。
MulticastSocket属于jdk提供的类,类路径为java.net.MulticastSocket,利用此类可以很方便地实现组播功能,下面展示一个简单例子,两个节点之间通过组播传输消息。
①节点一,指定组播地址为228.0.0.4,端口为8000,节点一通过调用MulticastSocket的joinGroup方法申请将节点一加入到组播队伍中,接着使用一个无限循环往组里发“Hello from node1”消息,这是为了方便节点2加入后接收节点1的消息做准备,需要说明的是组播是通过DatagramPacket对象发送消息的,调用MulticastSocket的send方法即可把消息发送出去。这里为了缩减例子长度省去了退出组及关闭套接字的一些操作,实际使用中需完善。
~~~
public class Node1 {
private static int port = 8000;
private static String address = "228.0.0.4";
public static void main(String[] args) throws Exception {
try {
InetAddress group = InetAddress.getByName(address);
MulticastSocket mss = null;
mss = new MulticastSocket(port);
mss.joinGroup(group);
while (true) {
String message = "Hello from node1";
byte[] buffer = message.getBytes();
DatagramPacket dp = new DatagramPacket(buffer, buffer.length,
group, port);
mss.send(dp);
Thread.sleep(1000);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
~~~
②节点二,指定同样的组播地址与端口,申请加入与节点一相同的组播组,接着通过循环不断接收来自其他节点发送的消息,通过MulticastSocket的receive方法可读到消息,将不断接收到来自节点一发送的消息“receive from node1:Hello from node1”。当然节点2也可以往组播组发送消息,因为每个节点都是同等的,只要其他节点对组播消息进行接收。如果你还想增加其他节点,尽管申请加入组播组,所有节点都可以接收发送消息。
~~~
public class Node2 {
private static int port = 8000;
private static String address = "228.0.0.4";
public static void main(String[] args) throws Exception {
InetAddress group = InetAddress.getByName(address);
MulticastSocket msr = null;
try {
msr = new MulticastSocket(port);
msr.joinGroup(group);
byte[] buffer = new byte[1024];
while (true) {
DatagramPacket dp = new DatagramPacket(buffer, buffer.length);
msr.receive(dp);
String s = new String(dp.getData(), 0, dp.getLength());
System.out.println("receive from node1:"+s);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
~~~
组播通信
最后更新于:2022-04-01 07:10:21
考虑一个场景,某公司用远程视频会议软件开一个会议,BOSS在总部发言而其他分部员工接收视频,这时如果还是使用单播模式的话,总部的视频将通过网络传给每个分部员工,它有一个特点是有多少客户端就需要传送多少次,当客户端的数量越来越大时可能会导致网络阻塞,而且这种传送效率极低。于是引入了组播通信概念。
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8cf3da8.jpg)
如图,上为单播模式,S1向S2、S3和S4发送消息时必须发送三次,且每次都是从S1出发到各自目的地,传输效率低且浪费网络资源。下图为组播模式,S1向S2、S3和S4发送消息只需S1发送一次到路由器,连接S2、S3、S4客户端的路由器将负责往他们发送消息,解决了传输效率及浪费网络资源问题。
所以组播其实是为了优化单播在某些使用场景的局限性,它是一种一对多的传播方式,假如某个主机结点想接受相关的信息只需要向路由器或交换机申请加入某组即可,路由器或交换机在接受到相关信息后就会负责向组内所有成员发送信息,总结起来组播有以下特点:①节省不必要的网络资源;②有针对性地向组内成员传播;③可以在互联网上进行传播;④没有可靠传输协议导致数据不可靠。
组播中最重要的内容是如何维护路由器与主机之间的关系,其主要是通过IGMP协议进行维护,它主要维护不同路由器与不同主机之间的成员关系,具体的维护方式比较复杂,因为涉及到多个路由器且路由之间互相连接组成一个树状网络,而组内成员可能处于任何一个路由中,即树的任何树叶结点,所以需要复杂的算法去维护这些关系才知道信息要网哪发送。IGMP协议主要负责组成员的加入和退出、组内成员查询等功能,具体实现不再展开,只要明白使用组播就需要通过IGMP协议申请加入组成员才能接收组播出来的消息,而退出组后将接收不了消息。
组播相当于把主机与主机之间的通信压力转嫁到了路由器上面,所以要得到路由及网络的支持才能进行组播,整个传输过程中涉及的路由器或交换机都要支持组播,否则将无法使用组播通信。另外你的主机得支持组播,TCP/IP层面支持组播发送与接收。
在IP层面需要一个组播地址指定组播,它被称为D类地址,从224.0.0.0-239.255.255.255,这些地址根据范围大致分为局域网和因特网地址,224.0.0.0-244.0.0.255用于局域网,224.0.1.0-238.255.255.255用于因特网。Tomcat默认的组播地址为228.0.0.4,而Tomcat为何会涉及到组播则要归到集群的概念,因为集群涉及到内存的共享问题,所以需要使用组播进行内存同步,在集群相关章节将进行更加深入的探讨。
喜欢研究java的同学可以交个朋友,下面是本人的微信号:
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-01-15_5698bd8d22882.jpg)
前言
最后更新于:2022-04-01 07:10:19
> 原文出处:[架构设计专栏文章](http://blog.csdn.net/column/details/cluster.html)
> 作者:[汪建](http://blog.csdn.net/wangyangzhizhou)
**本系列文章经作者授权在看云整理发布,未经作者允许,请勿转载!**
#集群
> 集群相关原理、技术的文章