(九)– 扩展Thrift框架来实现Attachable的RPC调用

最后更新于:2022-04-01 16:20:16

最近有一个分布式调用跟踪系统的项目,需要给基于Thrift的RPC调用添加调用链上下文信息,从而可以生成一次RPC调用的调用链信息。这篇讲讲如何扩展Thrift框架来实现RPC过程中无侵入地添加额外attachment信息的场景。 Thrift框架本身提供了很多机制来支持扩展,比如 1. 扩展TProtocol来实现自定义的序列化类 2. 扩展TTransport来实现自定义的流 3. 采用装饰器模式来装饰Processor,从而在服务实现方法被调用的前后插入自定义逻辑 4. 构建Client和Server时,可以把自定义的Protocol, Transport, Processor作为参数Args传入,从而使用自定义的这些类来处理请求 下图是Thrfit RPC调用涉及到的主要组件,RPC框架都大同小异,基本的结构差不多。绿色部分是可以扩展的点。比如在Client端包一层,可以增加服务寻址,负载均衡等分布式集群的功能,在Server端包一层,可以实现服务端的配置管理,监控等等。 ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-02-19_56c6c62c4389e.jpg) 在这个简化的例子中,只需要扩展TProtocol和Processor,就可以实现在RPC调用时添加额外的attachment。 TProtocol表示了RPC调用的序列化过程,更多可以看这篇[Thrift源码分析(二)-- 协议和编解码](http://blog.csdn.net/iter_zc/article/details/39497863) 。TProtocol将序列化过程分为几步 1. write/read Message,读写消息头,消息头包含了方法名,序列号等信息 2. write/read Struct,将RPC方法的参数/返回值封装成结构体,读写结构体即表示要读写RPC方法参数了 3. write/read Field,每一个参数都被抽象成Field,Field主要包含了字段的索引信息,类型信息等 4. write/read Type,即读写各种具体的数据 TBinaryProtocol是使用地比较多的一种基于二进制流的协议,它实现了上述所有的write/read方法。 ~~~ public void writeMessageBegin(TMessage message) throws TException { if (strictWrite_) { int version = VERSION_1 | message.type; writeI32(version); writeString(message.name); writeI32(message.seqid); } else { writeString(message.name); writeByte(message.type); writeI32(message.seqid); } } public void writeMessageEnd() {} public void writeStructBegin(TStruct struct) {} public void writeStructEnd() {} public void writeFieldBegin(TField field) throws TException { writeByte(field.type); writeI16(field.id); } public void writeFieldEnd() {} ~~~ 看一下上面TBinaryProtocol几个方法实现可以发现,它的write/read Struct是空实现,也即写完Message消息头之后直接开始写Field。具体一个Thrift服务生成的客户端中包含了一个服务方法所有的结构信息,比如所有的参数都被创建了相应的TFiled对象,TField都是从1开始往后编号,并且生成了如何序列化一个具体参数的方法,可以看这篇[Thrift源码分析(三)-- IDL和生成代码分析](http://blog.csdn.net/iter_zc/article/details/39522531) 所以基于TBinaryProtocol协议生成的RPC调用字节流大致如下: ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-02-19_56c6c62c54b67.jpg) Thrift生成的读这个字节流的代码流程大致如下 ~~~ readMessageBegin(); readStructBegin(); while(true){ field = readField(); if(field.type == Type.stop){ break; } switch(field.id){ case 1: readXXXX(); break; ..... case n: readXXXX(); break; default: TProtocolUtil.skip(iprotocol, field.type); } readFieldEnd(); } readStructEnd(); readMessageEnd(); ~~~ 从这个流程中,可以看到几点: 1. Thrift生成的代码在读写字节流时,都是按照生成的TField的索引号去判断,然后读取的 2. Thrift提供了skip和stop解析Filed的机制 我们可以从TFiled的索引号入手,通过下列方法来添加Attachment 1. 扩展TBinaryProtocol, 将我们想往字节流里插入的数据通过特定编号写入字节流 2. 然后在正常解析字节流之前,先将插入的特定编号的TFiled读取出来 3. 将字节流复位,交给后续的Processor处理 第2,3步的处理都是在装饰后的Processor中处理的。 最后生成的字节流如下 ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-02-19_56c6c62c67ef8.jpg) 先看一下AttachableBinaryProtocol的实现 1. 定义了一个私有的Map类型的attachment字段,支持Key-Vaule结构的attachment 2. 扩展了writeMessageBegin方法,在写完message头之后,判断是否有attachment,如果有,就调用writeFieldZero方法讲attachment写入到字节流 3. writeFieldZero方法将attachment作为0号字段写入到字节流。Thrift本身支持Map类型,按照Thrift的规范,将attachment写入字节流 4. readFieldZero方法会从字节流中读取0号索引的Map类型的数据,写入到attachment 5. resetTFramedTransport,将字节流复位。在使用NIO类型的Thrift server的时候,默认使用TFramedTransport作为流实现,TFramedTransport是基于缓冲区的流实现,它内部使用了TMemoryInputTrasport流来存储读入的字节流。而TMemoryInputTrasport提供了reset方法来复位流的position。 ~~~ import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TField; import org.apache.thrift.protocol.TMap; import org.apache.thrift.protocol.TMessage; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.protocol.TType; import org.apache.thrift.transport.TMemoryInputTransport; import org.apache.thrift.transport.TTransport; public class AttachableBinaryProtocol extends TBinaryProtocol { private Map<String, String> attachment; public AttachableBinaryProtocol(TTransport trans) { super(trans); attachment = new HashMap<String, String>(); } public AttachableBinaryProtocol(TTransport trans, boolean strictRead, boolean strictWrite) { super(trans); strictRead_ = strictRead; strictWrite_ = strictWrite; attachment = new HashMap<String, String>(); } /** * Factory */ public static class Factory implements TProtocolFactory { protected boolean strictRead_ = false; protected boolean strictWrite_ = true; protected int readLength_; public Factory() { this(false, true); } public Factory(boolean strictRead, boolean strictWrite) { this(strictRead, strictWrite, 0); } public Factory(boolean strictRead, boolean strictWrite, int readLength) { strictRead_ = strictRead; strictWrite_ = strictWrite; readLength_ = readLength; } public TProtocol getProtocol(TTransport trans) { AttachableBinaryProtocol proto = new AttachableBinaryProtocol( trans, strictRead_, strictWrite_); if (readLength_ != 0) { proto.setReadLength(readLength_); } return proto; } } public void writeMessageBegin(TMessage message) throws TException { super.writeMessageBegin(message); if(attachment.size() > 0){ writeFieldZero(); } } public void writeFieldZero() throws TException{ TField RTRACE_ATTACHMENT = new TField("rtraceAttachment", TType.MAP, (short) 0); this.writeFieldBegin(RTRACE_ATTACHMENT); { this.writeMapBegin(new TMap(TType.STRING, TType.STRING, attachment .size())); for (Map.Entry<String, String> entry : attachment.entrySet()) { this.writeString(entry.getKey()); this.writeString(entry.getValue()); } this.writeMapEnd(); } this.writeFieldEnd(); } public boolean readFieldZero() throws Exception { TField schemeField = this.readFieldBegin(); if (schemeField.id == 0 && schemeField.type == org.apache.thrift.protocol.TType.MAP) { TMap _map = this.readMapBegin(); attachment = new HashMap<String, String>(2 * _map.size); for (int i = 0; i < _map.size; ++i) { String key = this.readString(); String value = this.readString(); attachment.put(key, value); } this.readMapEnd(); } this.readFieldEnd(); return attachment.size() > 0 ? true: false; } public Map<String, String> getAttachment() { return attachment; } /* * 重置TFramedTransport流,不影响Thrift原有流程 */ public void resetTFramedTransport(TProtocol in) { try { Field readBuffer_ = TFramedTransportFieldsCache.getInstance() .getTFramedTransportReadBuffer(); Field buf_ = TFramedTransportFieldsCache.getInstance() .getTMemoryInputTransportBuf(); if (readBuffer_ == null || buf_ == null) { return; } TMemoryInputTransport stream = (TMemoryInputTransport) readBuffer_ .get(in.getTransport()); byte[] buf = (byte[]) (buf_.get(stream)); stream.reset(buf, 0, buf.length); } catch (Exception e) { e.printStackTrace(); } } private static class TFramedTransportFieldsCache { private static TFramedTransportFieldsCache instance; private final Field readBuffer_; private final Field buf_; private final String TFramedTransport_readBuffer_ = "readBuffer_"; private final String TMemoryInputTransport_buf_ = "buf_"; private TFramedTransportFieldsCache() throws Exception { readBuffer_ = org.apache.thrift.transport.TFramedTransport.class .getDeclaredField(TFramedTransport_readBuffer_); readBuffer_.setAccessible(true); buf_ = org.apache.thrift.transport.TMemoryInputTransport.class .getDeclaredField(TMemoryInputTransport_buf_); buf_.setAccessible(true); } public static TFramedTransportFieldsCache getInstance() throws Exception { if (instance == null) { synchronized (TFramedTransportFieldsCache.class) { if (instance == null) { instance = new TFramedTransportFieldsCache(); } } } return instance; } public Field getTFramedTransportReadBuffer() { return readBuffer_; } public Field getTMemoryInputTransportBuf() { return buf_; } } } ~~~ 来具体说下resetTFramedTransport这个方法,它采用了反射机制来从传入的TProtocol中复位字节流。由于TMemoryInputTransport是TFramedTransport的私有属性,只有通过反射机制才能访问到这个readBuffer属性。而真正的字节流存储在TMemoryInputTransport的私有属性buf中,还需要再次通过反射机制来访问TMemoryInputTransport的私有属性buf,TMemoryInputTransport提供了公有的reset方法,可以直接被调用。 resetTFramedTransport方法演示了如何通过反射机制来访问一个对象的私有属性。Filed.get是线程安全的,它最后落脚在Unsafe类上,通过Unsafe类的getObject方法,根据传入的对象和字段的偏移量来直接从内存中读取对应偏移量上属性值。 ~~~ public void resetTFramedTransport(TProtocol in) { try { Field readBuffer_ = TFramedTransportFieldsCache.getInstance() .getTFramedTransportReadBuffer(); Field buf_ = TFramedTransportFieldsCache.getInstance() .getTMemoryInputTransportBuf(); if (readBuffer_ == null || buf_ == null) { return; } TMemoryInputTransport stream = (TMemoryInputTransport) readBuffer_ .get(in.getTransport()); byte[] buf = (byte[]) (buf_.get(stream)); stream.reset(buf, 0, buf.length); } catch (Exception e) { e.printStackTrace(); } } public class TFramedTransport extends TTransport { private TMemoryInputTransport readBuffer_ = new TMemoryInputTransport(new byte[0]); } public final class TMemoryInputTransport extends TTransport { private byte[] buf_; private int pos_; private int endPos_; public TMemoryInputTransport() { } public TMemoryInputTransport(byte[] buf) {    reset(buf); } public TMemoryInputTransport(byte[] buf, int offset, int length) {    reset(buf, offset, length); } public void reset(byte[] buf) {    reset(buf, 0, buf.length); } public void reset(byte[] buf, int offset, int length) {    buf_ = buf;    pos_ = offset;    endPos_ = offset + length; } } ~~~ 再来看看装饰的Processor类, TraceProcessor类,这是一个典型的装饰器模式,实现TProcessor接口,并且维护了一个TProcessor对象。 1. 在process方法中,先将输入流转化成AttachableProtocol,然后读取消息头 readMessageBegin,然后readFieldZero读0号索引的Map字段。 2. 调用resetTFramedProtocol将输入流复位,然后交给实际的realProcessor处理,在readProcessor中最终会调用到Thrift服务的实现类。 ~~~ import java.util.Map; import org.apache.thrift.TException; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TMessage; import org.apache.thrift.protocol.TProtocol; public class TraceProcessor implements TProcessor { private TProcessor realProcessor; private String serviceId; private String serviceName; private int port; public TraceProcessor(TProcessor realProcessor, int port) { this(realProcessor, "", "", port); } public TraceProcessor(TProcessor realProcessor, String serviceName, int port) { this(realProcessor, serviceName, serviceName, port); } public TraceProcessor(TProcessor realProcessor, String serviceId, String serviceName, int port) { this.realProcessor = realProcessor; this.serviceId = serviceId; this.serviceName = serviceName; this.port = port; } @Override public boolean process(TProtocol in, TProtocol out) throws TException { Map<String, String> attachment = null; if(in instanceof AttachableBinaryProtocol){ AttachableBinaryProtocol inProtocal = (AttachableBinaryProtocol)in; TMessage message = inProtocal.readMessageBegin(); // 先读MessageBegin来获得Attachment boolean isAttachableRequest = false; try { isAttachableRequest = inProtocal.readFieldZero(); } catch (Exception e) { } // 重置TramedTransport内部的流,不影响Thrift框架的正常执行流程 inProtocal.resetTFramedTransport(in); if(isAttachableRequest){ attachment = ((AttachableBinaryProtocol)in).getAttachment(); XXXX = attachment.get(XXXX); XXXX = attachment.get(XXXX); } } boolean result = realProcessor.process(in, out); return result; } } ~~~ 采用插入特定索引号的字段到Thrift生成的字节流有个好处是兼容性比较好,因为Thrift反序列化对象时,会按照生成的特定索引号去读取,一旦读到不是指定的索引号,就会skip到,继续读取下一个字段。这样就不会影响Thrift框架原有的序列化机制。
';

(八)–总结加一个完整的可运行的Thrift例子

最后更新于:2022-04-01 16:20:14

前面七篇文章分析了Thrfit的方方面面,看到这里时应该对Thrift有了深入的理解。 [Thrift源码分析(一)-- 基本概念](http://blog.csdn.net/iter_zc/article/details/39496439) [Thrift源码分析(二)-- 协议和编解码](http://blog.csdn.net/iter_zc/article/details/39497863) [Thrift源码分析(三)-- IDL和生成代码分析](http://blog.csdn.net/iter_zc/article/details/39522531) [Thrift源码分析(四)-- 方法调用模型分析](http://blog.csdn.net/iter_zc/article/details/39692951) [Thrift源码分析(五)-- FrameBuffer类分析](http://blog.csdn.net/iter_zc/article/details/39694129) [Thrift源码分析(六)-- Transport传输层分析](http://blog.csdn.net/iter_zc/article/details/39695187) [Thrift源码分析(七)-- TServer服务器分析](http://blog.csdn.net/iter_zc/article/details/39697401) 下面通过一个实际可以运行的例子来跑一跑Thrift,结束这个主题 ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-02-19_56c6c62be2f7b.jpg) 1. 通过IDL来定义接口:   DemoService.thrift ~~~ namespace java com.thrift.test service DemoService{ string sayHi(1:string name); } ~~~ 2. 根据IDL自动生成代码 ~~~ thrift -r --gen java DemoService.thrift ~~~ 3. 生成的代码供服务器和客户端调用 ~~~ /** * Autogenerated by Thrift Compiler (0.8.0) * * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING * @generated */ package com.thrift.test; import org.apache.thrift.scheme.IScheme; import org.apache.thrift.scheme.SchemeFactory; import org.apache.thrift.scheme.StandardScheme; import org.apache.thrift.scheme.TupleScheme; import org.apache.thrift.protocol.TTupleProtocol; import java.util.List; import java.util.ArrayList; import java.util.Map; import java.util.HashMap; import java.util.EnumMap; import java.util.Set; import java.util.HashSet; import java.util.EnumSet; import java.util.Collections; import java.util.BitSet; import java.nio.ByteBuffer; import java.util.Arrays; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DemoService { public interface Iface { public String sayHi(String name) throws org.apache.thrift.TException; } public interface AsyncIface { public void sayHi(String name, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.sayHi_call> resultHandler) throws org.apache.thrift.TException; } public static class Client extends org.apache.thrift.TServiceClient implements Iface { public static class Factory implements org.apache.thrift.TServiceClientFactory<Client> { public Factory() {} public Client getClient(org.apache.thrift.protocol.TProtocol prot) { return new Client(prot); } public Client getClient(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) { return new Client(iprot, oprot); } } public Client(org.apache.thrift.protocol.TProtocol prot) { super(prot, prot); } public Client(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) { super(iprot, oprot); } public String sayHi(String name) throws org.apache.thrift.TException { send_sayHi(name); return recv_sayHi(); } public void send_sayHi(String name) throws org.apache.thrift.TException { sayHi_args args = new sayHi_args(); args.setName(name); sendBase("sayHi", args); } public String recv_sayHi() throws org.apache.thrift.TException { sayHi_result result = new sayHi_result(); receiveBase(result, "sayHi"); if (result.isSetSuccess()) { return result.success; } throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "sayHi failed: unknown result"); } } public static class AsyncClient extends org.apache.thrift.async.TAsyncClient implements AsyncIface { public static class Factory implements org.apache.thrift.async.TAsyncClientFactory<AsyncClient> { private org.apache.thrift.async.TAsyncClientManager clientManager; private org.apache.thrift.protocol.TProtocolFactory protocolFactory; public Factory(org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.protocol.TProtocolFactory protocolFactory) { this.clientManager = clientManager; this.protocolFactory = protocolFactory; } public AsyncClient getAsyncClient(org.apache.thrift.transport.TNonblockingTransport transport) { return new AsyncClient(protocolFactory, clientManager, transport); } } public AsyncClient(org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.transport.TNonblockingTransport transport) { super(protocolFactory, clientManager, transport); } public void sayHi(String name, org.apache.thrift.async.AsyncMethodCallback<sayHi_call> resultHandler) throws org.apache.thrift.TException { checkReady(); sayHi_call method_call = new sayHi_call(name, resultHandler, this, ___protocolFactory, ___transport); this.___currentMethod = method_call; ___manager.call(method_call); } public static class sayHi_call extends org.apache.thrift.async.TAsyncMethodCall { private String name; public sayHi_call(String name, org.apache.thrift.async.AsyncMethodCallback<sayHi_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { super(client, protocolFactory, transport, resultHandler, false); this.name = name; } public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException { prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("sayHi", org.apache.thrift.protocol.TMessageType.CALL, 0)); sayHi_args args = new sayHi_args(); args.setName(name); args.write(prot); prot.writeMessageEnd(); } public String getResult() throws org.apache.thrift.TException { if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) { throw new IllegalStateException("Method call not finished!"); } org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array()); org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport); return (new Client(prot)).recv_sayHi(); } } } public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class.getName()); public Processor(I iface) { super(iface, getProcessMap(new HashMap<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>())); } protected Processor(I iface, Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) { super(iface, getProcessMap(processMap)); } private static <I extends Iface> Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) { processMap.put("sayHi", new sayHi()); return processMap; } private static class sayHi<I extends Iface> extends org.apache.thrift.ProcessFunction<I, sayHi_args> { public sayHi() { super("sayHi"); } protected sayHi_args getEmptyArgsInstance() { return new sayHi_args(); } protected sayHi_result getResult(I iface, sayHi_args args) throws org.apache.thrift.TException { sayHi_result result = new sayHi_result(); result.success = iface.sayHi(args.name); return result; } } } public static class sayHi_args implements org.apache.thrift.TBase<sayHi_args, sayHi_args._Fields>, java.io.Serializable, Cloneable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("sayHi_args"); private static final org.apache.thrift.protocol.TField NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("name", org.apache.thrift.protocol.TType.STRING, (short)1); private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); static { schemes.put(StandardScheme.class, new sayHi_argsStandardSchemeFactory()); schemes.put(TupleScheme.class, new sayHi_argsTupleSchemeFactory()); } public String name; // required /**The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { NAME((short)1, "name"); private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); static { for (_Fields field : EnumSet.allOf(_Fields.class)) { byName.put(field.getFieldName(), field); } } /** * Find the _Fields constant that matches fieldId, or null if its not found. */ public static _Fields findByThriftId(int fieldId) { switch(fieldId) { case 1: // NAME return NAME; default: return null; } } /** * Find the _Fields constant that matches fieldId, throwing an exception * if it is not found. */ public static _Fields findByThriftIdOrThrow(int fieldId) { _Fields fields = findByThriftId(fieldId); if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); return fields; } /** * Find the _Fields constant that matches name, or null if its not found. */ public static _Fields findByName(String name) { return byName.get(name); } private final short _thriftId; private final String _fieldName; _Fields(short thriftId, String fieldName) { _thriftId = thriftId; _fieldName = fieldName; } public short getThriftFieldId() { return _thriftId; } public String getFieldName() { return _fieldName; } } // isset id assignments public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); tmpMap.put(_Fields.NAME, new org.apache.thrift.meta_data.FieldMetaData("name", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(sayHi_args.class, metaDataMap); } public sayHi_args() { } public sayHi_args( String name) { this(); this.name = name; } /** * Performs a deep copy on <i>other</i>. */ public sayHi_args(sayHi_args other) { if (other.isSetName()) { this.name = other.name; } } public sayHi_args deepCopy() { return new sayHi_args(this); } @Override public void clear() { this.name = null; } public String getName() { return this.name; } public sayHi_args setName(String name) { this.name = name; return this; } public void unsetName() { this.name = null; } /**Returns true if field name is set (has been assigned a value) and false otherwise */ public boolean isSetName() { return this.name != null; } public void setNameIsSet(boolean value) { if (!value) { this.name = null; } } public void setFieldValue(_Fields field, Object value) { switch (field) { case NAME: if (value == null) { unsetName(); } else { setName((String)value); } break; } } public Object getFieldValue(_Fields field) { switch (field) { case NAME: return getName(); } throw new IllegalStateException(); } /**Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ public boolean isSet(_Fields field) { if (field == null) { throw new IllegalArgumentException(); } switch (field) { case NAME: return isSetName(); } throw new IllegalStateException(); } @Override public boolean equals(Object that) { if (that == null) return false; if (that instanceof sayHi_args) return this.equals((sayHi_args)that); return false; } public boolean equals(sayHi_args that) { if (that == null) return false; boolean this_present_name = true && this.isSetName(); boolean that_present_name = true && that.isSetName(); if (this_present_name || that_present_name) { if (!(this_present_name && that_present_name)) return false; if (!this.name.equals(that.name)) return false; } return true; } @Override public int hashCode() { return 0; } public int compareTo(sayHi_args other) { if (!getClass().equals(other.getClass())) { return getClass().getName().compareTo(other.getClass().getName()); } int lastComparison = 0; sayHi_args typedOther = (sayHi_args)other; lastComparison = Boolean.valueOf(isSetName()).compareTo(typedOther.isSetName()); if (lastComparison != 0) { return lastComparison; } if (isSetName()) { lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.name, typedOther.name); if (lastComparison != 0) { return lastComparison; } } return 0; } public _Fields fieldForId(int fieldId) { return _Fields.findByThriftId(fieldId); } public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { schemes.get(iprot.getScheme()).getScheme().read(iprot, this); } public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { schemes.get(oprot.getScheme()).getScheme().write(oprot, this); } @Override public String toString() { StringBuilder sb = new StringBuilder("sayHi_args("); boolean first = true; sb.append("name:"); if (this.name == null) { sb.append("null"); } else { sb.append(this.name); } first = false; sb.append(")"); return sb.toString(); } public void validate() throws org.apache.thrift.TException { // check for required fields } private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { try { write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); } catch (org.apache.thrift.TException te) { throw new java.io.IOException(te); } } private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { try { read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); } catch (org.apache.thrift.TException te) { throw new java.io.IOException(te); } } private static class sayHi_argsStandardSchemeFactory implements SchemeFactory { public sayHi_argsStandardScheme getScheme() { return new sayHi_argsStandardScheme(); } } private static class sayHi_argsStandardScheme extends StandardScheme<sayHi_args> { public void read(org.apache.thrift.protocol.TProtocol iprot, sayHi_args struct) throws org.apache.thrift.TException { org.apache.thrift.protocol.TField schemeField; iprot.readStructBegin(); while (true) { schemeField = iprot.readFieldBegin(); if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { break; } switch (schemeField.id) { case 1: // NAME if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { struct.name = iprot.readString(); struct.setNameIsSet(true); } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } iprot.readFieldEnd(); } iprot.readStructEnd(); // check for required fields of primitive type, which can't be checked in the validate method struct.validate(); } public void write(org.apache.thrift.protocol.TProtocol oprot, sayHi_args struct) throws org.apache.thrift.TException { struct.validate(); oprot.writeStructBegin(STRUCT_DESC); if (struct.name != null) { oprot.writeFieldBegin(NAME_FIELD_DESC); oprot.writeString(struct.name); oprot.writeFieldEnd(); } oprot.writeFieldStop(); oprot.writeStructEnd(); } } private static class sayHi_argsTupleSchemeFactory implements SchemeFactory { public sayHi_argsTupleScheme getScheme() { return new sayHi_argsTupleScheme(); } } private static class sayHi_argsTupleScheme extends TupleScheme<sayHi_args> { @Override public void write(org.apache.thrift.protocol.TProtocol prot, sayHi_args struct) throws org.apache.thrift.TException { TTupleProtocol oprot = (TTupleProtocol) prot; BitSet optionals = new BitSet(); if (struct.isSetName()) { optionals.set(0); } oprot.writeBitSet(optionals, 1); if (struct.isSetName()) { oprot.writeString(struct.name); } } @Override public void read(org.apache.thrift.protocol.TProtocol prot, sayHi_args struct) throws org.apache.thrift.TException { TTupleProtocol iprot = (TTupleProtocol) prot; BitSet incoming = iprot.readBitSet(1); if (incoming.get(0)) { struct.name = iprot.readString(); struct.setNameIsSet(true); } } } } public static class sayHi_result implements org.apache.thrift.TBase<sayHi_result, sayHi_result._Fields>, java.io.Serializable, Cloneable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("sayHi_result"); private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("success", org.apache.thrift.protocol.TType.STRING, (short)0); private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); static { schemes.put(StandardScheme.class, new sayHi_resultStandardSchemeFactory()); schemes.put(TupleScheme.class, new sayHi_resultTupleSchemeFactory()); } public String success; // required /**The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { SUCCESS((short)0, "success"); private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); static { for (_Fields field : EnumSet.allOf(_Fields.class)) { byName.put(field.getFieldName(), field); } } /** * Find the _Fields constant that matches fieldId, or null if its not found. */ public static _Fields findByThriftId(int fieldId) { switch(fieldId) { case 0: // SUCCESS return SUCCESS; default: return null; } } /** * Find the _Fields constant that matches fieldId, throwing an exception * if it is not found. */ public static _Fields findByThriftIdOrThrow(int fieldId) { _Fields fields = findByThriftId(fieldId); if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); return fields; } /** * Find the _Fields constant that matches name, or null if its not found. */ public static _Fields findByName(String name) { return byName.get(name); } private final short _thriftId; private final String _fieldName; _Fields(short thriftId, String fieldName) { _thriftId = thriftId; _fieldName = fieldName; } public short getThriftFieldId() { return _thriftId; } public String getFieldName() { return _fieldName; } } // isset id assignments public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); tmpMap.put(_Fields.SUCCESS, new org.apache.thrift.meta_data.FieldMetaData("success", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(sayHi_result.class, metaDataMap); } public sayHi_result() { } public sayHi_result( String success) { this(); this.success = success; } /** * Performs a deep copy on <i>other</i>. */ public sayHi_result(sayHi_result other) { if (other.isSetSuccess()) { this.success = other.success; } } public sayHi_result deepCopy() { return new sayHi_result(this); } @Override public void clear() { this.success = null; } public String getSuccess() { return this.success; } public sayHi_result setSuccess(String success) { this.success = success; return this; } public void unsetSuccess() { this.success = null; } /**Returns true if field success is set (has been assigned a value) and false otherwise */ public boolean isSetSuccess() { return this.success != null; } public void setSuccessIsSet(boolean value) { if (!value) { this.success = null; } } public void setFieldValue(_Fields field, Object value) { switch (field) { case SUCCESS: if (value == null) { unsetSuccess(); } else { setSuccess((String)value); } break; } } public Object getFieldValue(_Fields field) { switch (field) { case SUCCESS: return getSuccess(); } throw new IllegalStateException(); } /**Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ public boolean isSet(_Fields field) { if (field == null) { throw new IllegalArgumentException(); } switch (field) { case SUCCESS: return isSetSuccess(); } throw new IllegalStateException(); } @Override public boolean equals(Object that) { if (that == null) return false; if (that instanceof sayHi_result) return this.equals((sayHi_result)that); return false; } public boolean equals(sayHi_result that) { if (that == null) return false; boolean this_present_success = true && this.isSetSuccess(); boolean that_present_success = true && that.isSetSuccess(); if (this_present_success || that_present_success) { if (!(this_present_success && that_present_success)) return false; if (!this.success.equals(that.success)) return false; } return true; } @Override public int hashCode() { return 0; } public int compareTo(sayHi_result other) { if (!getClass().equals(other.getClass())) { return getClass().getName().compareTo(other.getClass().getName()); } int lastComparison = 0; sayHi_result typedOther = (sayHi_result)other; lastComparison = Boolean.valueOf(isSetSuccess()).compareTo(typedOther.isSetSuccess()); if (lastComparison != 0) { return lastComparison; } if (isSetSuccess()) { lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.success, typedOther.success); if (lastComparison != 0) { return lastComparison; } } return 0; } public _Fields fieldForId(int fieldId) { return _Fields.findByThriftId(fieldId); } public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { schemes.get(iprot.getScheme()).getScheme().read(iprot, this); } public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { schemes.get(oprot.getScheme()).getScheme().write(oprot, this); } @Override public String toString() { StringBuilder sb = new StringBuilder("sayHi_result("); boolean first = true; sb.append("success:"); if (this.success == null) { sb.append("null"); } else { sb.append(this.success); } first = false; sb.append(")"); return sb.toString(); } public void validate() throws org.apache.thrift.TException { // check for required fields } private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { try { write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); } catch (org.apache.thrift.TException te) { throw new java.io.IOException(te); } } private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { try { read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); } catch (org.apache.thrift.TException te) { throw new java.io.IOException(te); } } private static class sayHi_resultStandardSchemeFactory implements SchemeFactory { public sayHi_resultStandardScheme getScheme() { return new sayHi_resultStandardScheme(); } } private static class sayHi_resultStandardScheme extends StandardScheme<sayHi_result> { public void read(org.apache.thrift.protocol.TProtocol iprot, sayHi_result struct) throws org.apache.thrift.TException { org.apache.thrift.protocol.TField schemeField; iprot.readStructBegin(); while (true) { schemeField = iprot.readFieldBegin(); if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { break; } switch (schemeField.id) { case 0: // SUCCESS if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { struct.success = iprot.readString(); struct.setSuccessIsSet(true); } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } iprot.readFieldEnd(); } iprot.readStructEnd(); // check for required fields of primitive type, which can't be checked in the validate method struct.validate(); } public void write(org.apache.thrift.protocol.TProtocol oprot, sayHi_result struct) throws org.apache.thrift.TException { struct.validate(); oprot.writeStructBegin(STRUCT_DESC); if (struct.success != null) { oprot.writeFieldBegin(SUCCESS_FIELD_DESC); oprot.writeString(struct.success); oprot.writeFieldEnd(); } oprot.writeFieldStop(); oprot.writeStructEnd(); } } private static class sayHi_resultTupleSchemeFactory implements SchemeFactory { public sayHi_resultTupleScheme getScheme() { return new sayHi_resultTupleScheme(); } } private static class sayHi_resultTupleScheme extends TupleScheme<sayHi_result> { @Override public void write(org.apache.thrift.protocol.TProtocol prot, sayHi_result struct) throws org.apache.thrift.TException { TTupleProtocol oprot = (TTupleProtocol) prot; BitSet optionals = new BitSet(); if (struct.isSetSuccess()) { optionals.set(0); } oprot.writeBitSet(optionals, 1); if (struct.isSetSuccess()) { oprot.writeString(struct.success); } } @Override public void read(org.apache.thrift.protocol.TProtocol prot, sayHi_result struct) throws org.apache.thrift.TException { TTupleProtocol iprot = (TTupleProtocol) prot; BitSet incoming = iprot.readBitSet(1); if (incoming.get(0)) { struct.success = iprot.readString(); struct.setSuccessIsSet(true); } } } } } ~~~ 4. 接口实现类 ~~~ package com.thrift.test; import org.apache.thrift.TException; public class DemoServiceImpl implements DemoService.Iface{ @Override public String sayHi(String name) throws TException { return "Hi " + name + ", from Thrift Server"; } } ~~~ 5. 客户端 ~~~ package com.thrift.test.client; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import com.thrift.test.DemoService; public class Client { public static void main(String[] args) throws Exception{ TSocket socket = new TSocket("127.0.0.1", 9090); socket.setTimeout(3000); TTransport transport = new TFramedTransport(socket); TProtocol protocol = new TCompactProtocol(transport); transport.open(); System.out.println("Connected to Thrfit Server"); DemoService.Client client = new DemoService.Client.Factory() .getClient(protocol); String result = client.sayHi("ITer_ZC"); System.out.println(result); } } ~~~ 6. 服务器端 ~~~ package com.thrift.test.server; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.server.TNonblockingServer; import org.apache.thrift.server.TServer; import org.apache.thrift.transport.TNonblockingServerSocket; import com.thrift.test.DemoService; import com.thrift.test.DemoService.Iface; import com.thrift.test.DemoServiceImpl; public class Server {  public static void main(String[] args){      TNonblockingServerSocket socket;      try {          socket = new TNonblockingServerSocket(9090);          TNonblockingServer.Args options = new TNonblockingServer.Args(socket);          TProcessor processor = new DemoService.Processor<Iface>(new DemoServiceImpl());          options.processor(processor);          options.protocolFactory(new TCompactProtocol.Factory());          TServer server = new TNonblockingServer(options);          System.out.println("Thrift Server is running at 9090 port");          server.serve();              } catch (Exception e) {          throw new RuntimeException(e);      }  } } ~~~ 7. 服务器端运行截图 ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-02-19_56c6c62c01c99.jpg) 8. 客户端运行截图 ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-02-19_56c6c62c13fe7.jpg) 9. 依赖的jar包 ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-02-19_56c6c62c2b018.jpg)
';

(七)– TServer服务器分析

最后更新于:2022-04-01 16:20:11

Thrift采用了TServer来作为服务器的抽象,提供了多种类型的服务器实现。用TServerTransport作为服务器的Acceptor抽象,来监听端口,创建客户端Socket连接 先来看看TServerTransport。主要有两类 1. TNonblockingServerTransport和TNonblockingServerSocket作为非阻塞IO的Acceptor,封装了ServerSocketChannel 2. TServerSocket作为阻塞同步IO的Acceptor,封装了ServerSocket ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-02-19_56c6c62bc0db8.jpg) ~~~ public class TNonblockingServerSocket extends TNonblockingServerTransport { private ServerSocketChannel serverSocketChannel = null; } protected TNonblockingSocket acceptImpl() throws TTransportException { if (serverSocket_ == null) { throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket."); } try { SocketChannel socketChannel = serverSocketChannel.accept(); if (socketChannel == null) { return null; } TNonblockingSocket tsocket = new TNonblockingSocket(socketChannel); tsocket.setTimeout(clientTimeout_); return tsocket; } catch (IOException iox) { throw new TTransportException(iox); } } public void registerSelector(Selector selector) { try { // Register the server socket channel, indicating an interest in // accepting new connections serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (ClosedChannelException e) { // this shouldn't happen, ideally... // TODO: decide what to do with this. } } public class TServerSocket extends TServerTransport { private ServerSocket serverSocket_ = null; } protected TSocket acceptImpl() throws TTransportException { if (serverSocket_ == null) { throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket."); } try { Socket result = serverSocket_.accept(); TSocket result2 = new TSocket(result); result2.setTimeout(clientTimeout_); return result2; } catch (IOException iox) { throw new TTransportException(iox); } } ~~~ 再看TServer的类层次结构,主要也是两类,非阻塞IO和同步IO 非阻塞IO的Server有: 1. TNonblockingServer是单线程的,只有一个SelectAcceptThread线程来轮询IO就绪事件,调用就绪的channel来相应Accept, Read, Write事件,并且还是使用这个线程来同步调用实际的方法实现。 2. THsHaServer是所谓的半同步半异步的服务器。所谓半同步是说使用一个SelectAcceptThread线程来轮询IO就绪事件,调用就绪的channel来相应Accept, Read, Write事件。所谓的半异步是说方法的调用是封装成一个Runnable交给线程池来执行的,交给线程池立刻返回,不同步等待方法执行完毕,方法执行完毕的写返回是有线程池中的线程来做的,实现了所谓的异步访问的模式。 3. TThreadSelectorServer,这个服务器类比较有意思,是多线程Reactor模式的一种实现。 3.1 采用了一个AcceptorThread来专门监听端口,处理Accept事件,然后创建SocketChannel。创建完成之后交给一个线程池来处理后续动作,将SocketChannel放到SelecotrThread的阻塞队列acceptedQueue中 3.2 采用多个SelectorThread来处理创建好的SocketChannel。每个SelectorThread绑定一个Selector,这样将SocketChannel分给多个Selector。同时SelectorThread又维护了一个阻塞队列acceptedQueue,从acceptedQueue中拿新创建好的SocketChannel,来注册读事件 同步的TServer有TThreadPoolServer,关联一个TServerSocket,采用同步IO的方式来Accept,然后交给一个线程池来处理后续动作 ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-02-19_56c6c62bd0e7d.jpg) 这里有一篇老外写的文章比较各种服务器的性能,[https://github.com/m1ch1/mapkeeper/wiki/Thrift-Java-Servers-Compared](https://github.com/m1ch1/mapkeeper/wiki/Thrift-Java-Servers-Compared) 结论是TThreadSelectorServer在吞吐量和服务器响应时间的表现都是最优的
';

(六)– Transport传输层分析

最后更新于:2022-04-01 16:20:09

RPC作为一种特殊的网络编程,会封装一层传输层来支持底层的网络通信。Thrift使用了Transport来封装传输层,但Transport不仅仅是底层网络传输,它还是上层流的封装。 关于Transport的设计,从架构上看,IO流和网络流都是IO的范畴,用一个统一的接口来抽象并无不可,但是个人感觉看Thrift的代码时,都用的Transport来表示流,不知道是普通IO流还是底层的网络流。还不如用Java的方式,把普通IO和网络接口用不同抽象隔离,至少代码逻辑比较清晰 废话不多说,看看Trasport的类结构。 TTransport作为顶层的抽象,使用了抽象类,没有使用接口。个人感觉这种做法还是没有使用接口作为顶层抽象来得好,接口扩展性更好。 有几个关注点: 1. TIOStreamTransport和TSocket这两个类的结构对应着阻塞同步IO, TSocket封装了Socket接口 2. TNonblockingTrasnsort,TNonblockingSocket这两个类对应着非阻塞IO 3. TMemoryInputTransport封装了一个字节数组byte[]来做输入流的封装 4. TMemoryBuffer使用字节数组输出流ByteArrayOutputStream做输出流的封装 5. TFramedTransport则封装了TMemoryInputTransport做输入流,封装了TByteArryOutPutStream做输出流,作为内存读写缓冲区的一个封装。TFramedTransport的flush方法时,会先写4个字节的输出流的长度作为消息头,然后写消息体。和FrameBuffer的读消息对应起来。FrameBuffer对消息时,先读4个字节的长度,再读消息体 6. TFastFramedTransport是内存利用率更高的一个内存读写缓存区,它使用自动增长的byte[](不够长度才new),而不是每次都new一个byte[],提高了内存的使用率。其他和TFramedTransport一样,flush时也会写4个字节的消息头表示消息长度。 ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-02-19_56c6c62baa4ab.jpg) 和Java的IO一样,Thrift的Transport也采用了装饰器模式实现了所谓的包装流。我们也使用包装流和节点流的概念来区分一下各个Transport。 节点流表示自身采用byte[]来提供IO读写的类: AutoExpandingBufferReadTransport AutoExpandingBufferWriteTransport TMemoryInputTransport TByteArrayOutputStream TMemoryBuffer 两个网络相关的比较特殊,我们也可以认为它们是节点流,它们是直接操作网络读写的对象 TNonblockingSocket TSocket 包装流表示封装了其他Transport,流来提供IO读写的类: TFramedTransport TFastFramedTransport Thrift提供的包装流主要就是两个以TFrame开头的Transort,这两个Transport在写完消息flush的时候,会加上4字节表示长度的消息头,读消息是会先读4字节表示长度的消息头。 既然Thrift的NIO服务器端读消息时,使用了FrameBuffer来做缓冲区,并且解码时先读4字节长度的消息头,那么可以推断出,客户端发消息时,是使用TFramedXXXTransport包装流来传输数据的。 我们来一个实际的客户端对象构造情况 ~~~ TSocket socket = new TSocket(host, port); socket.setTimeout(timeout); TTransport transport = new TFramedTransport(socket); TProtocol protocol = new TCompactProtocol(transport); transport.open(); ~~~ 另外我们在讲Thrift协议的时候说了,Thrift的协议是和具体的传输对象绑定的,协议使用具体的Transport来读写数据
';

(五)– FrameBuffer类分析

最后更新于:2022-04-01 16:20:07

FrameBuffer是ThriftNIO服务器端的一个核心组件,它一方面承担了NIO编程中的缓冲区的功能,另一方面还承担了RPC方法调用的职责。 ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-02-19_56c6c62b9330f.jpg) FrameBufferState定义了FrameBuffer作为缓冲区的读写状态 ~~~ private enum FrameBufferState { // in the midst of reading the frame size off the wire // 读Frame消息头,实际是4字节表示Frame长度  READING_FRAME_SIZE, // reading the actual frame data now, but not all the way done yet // 读Frame消息体  READING_FRAME, // completely read the frame, so an invocation can now happen // 读满包  READ_FRAME_COMPLETE, // waiting to get switched to listening for write events // 等待注册写  AWAITING_REGISTER_WRITE, // started writing response data, not fully complete yet // 写半包  WRITING, // another thread wants this framebuffer to go back to reading // 等待注册读  AWAITING_REGISTER_READ, // we want our transport and selection key invalidated in the selector // thread // 等待关闭  AWAITING_CLOSE } ~~~ 值得注意的是,FrameBuffer读数据时, 1. 先读4字节的Frame消息头, 2. 然后改变FrameBufferState,从READING_FRMAE_SIZE到READING_FRAME,并根据读到的Frame长度修改Buffer的长度 3. 再次读Frame消息体,如果读完就修改状态到READ_FRAME_COMPLETE,否则还是把FrameBuffer绑定到SelectionKey,下次继续读 ~~~ public boolean read() { if (state_ == FrameBufferState.READING_FRAME_SIZE) { // try to read the frame size completely if (!internalRead()) { return false; } // if the frame size has been read completely, then prepare to read the // actual frame. if (buffer_.remaining() == 0) { // pull out the frame size as an integer. int frameSize = buffer_.getInt(0); if (frameSize <= 0) { LOGGER.error("Read an invalid frame size of " + frameSize + ". Are you using TFramedTransport on the client side?"); return false; } // if this frame will always be too large for this server, log the // error and close the connection. if (frameSize > MAX_READ_BUFFER_BYTES) { LOGGER.error("Read a frame size of " + frameSize + ", which is bigger than the maximum allowable buffer size for ALL connections."); return false; } // if this frame will push us over the memory limit, then return. // with luck, more memory will free up the next time around. if (readBufferBytesAllocated.get() + frameSize > MAX_READ_BUFFER_BYTES) { return true; } // increment the amount of memory allocated to read buffers readBufferBytesAllocated.addAndGet(frameSize); // reallocate the readbuffer as a frame-sized buffer buffer_ = ByteBuffer.allocate(frameSize); state_ = FrameBufferState.READING_FRAME; } else { // this skips the check of READING_FRAME state below, since we can't // possibly go on to that state if there's data left to be read at // this one. return true; } } // it is possible to fall through from the READING_FRAME_SIZE section // to READING_FRAME if there's already some frame data available once // READING_FRAME_SIZE is complete. if (state_ == FrameBufferState.READING_FRAME) { if (!internalRead()) { return false; } // since we're already in the select loop here for sure, we can just // modify our selection key directly. if (buffer_.remaining() == 0) { // get rid of the read select interests selectionKey_.interestOps(0); state_ = FrameBufferState.READ_FRAME_COMPLETE; } return true; } // if we fall through to this point, then the state must be invalid. LOGGER.error("Read was called but state is invalid (" + state_ + ")"); return false; } ~~~ internalRead方法实际调用了SocketChannel来读数据。注意SocketChannel返回值小于0的情况: n 有数据的时候返回读取到的字节数。 0 没有数据并且没有达到流的末端时返回0。 -1 当达到流末端的时候返回-1。 当Channel有数据时并且是最后的数据 时,实际会读两次,第一次返回字节数,第二次返回-1。这个是底层Selector实现的。 ~~~ private boolean internalRead() { try { if (trans_.read(buffer_) < 0) { return false; } return true; } catch (IOException e) { LOGGER.warn("Got an IOException in internalRead!", e); return false; } } ~~~ 在看写缓冲时的情况 1. 写之前必须把FrameBuffer的状态改成WRITING,后面会有具体例子 2. 如果没写任何数据,就返回false 3. 如果写完了,就需要把SelectionKey注册的写事件取消。Thrift是直接把SelectionKey注册事件改成读了,而常用的做法一般是把写事件取消就行了。关于更多NIO写事件的注册问题,看这篇:[http://blog.csdn.net/iter_zc/article/details/39291129](http://blog.csdn.net/iter_zc/article/details/39291129) ~~~ public boolean write() { if (state_ == FrameBufferState.WRITING) { try { if (trans_.write(buffer_) < 0) { return false; } } catch (IOException e) { LOGGER.warn("Got an IOException during write!", e); return false; } // we're done writing. now we need to switch back to reading. if (buffer_.remaining() == 0) { prepareRead(); } return true; } LOGGER.error("Write was called, but state is invalid (" + state_ + ")"); return false; } ~~~ FrameBuffer可以根据SelectionKey的状态来切换自身状态,也可以根据自身状态来选择注册的Channel事件 ~~~ public void changeSelectInterests() { if (state_ == FrameBufferState.AWAITING_REGISTER_WRITE) { // set the OP_WRITE interest selectionKey_.interestOps(SelectionKey.OP_WRITE); state_ = FrameBufferState.WRITING; } else if (state_ == FrameBufferState.AWAITING_REGISTER_READ) { prepareRead(); } else if (state_ == FrameBufferState.AWAITING_CLOSE) { close(); selectionKey_.cancel(); } else { LOGGER.error("changeSelectInterest was called, but state is invalid (" + state_ + ")"); } } ~~~ 说完了FrameBuffer作为NIO缓冲区的功能,再看看它作为RPC方法调用模型的重要组件的功能。 FrameBuffer提供了invoker方法,当读满包时,从消息头拿到要调用的方法,然后通过它管理的Processor来完成实际方法调用。然后切换到写模式来写消息体 具体的调用模型看这篇: [http://blog.csdn.net/iter_zc/article/details/39692951](http://blog.csdn.net/iter_zc/article/details/39692951) ~~~ public void invoke() { TTransport inTrans = getInputTransport(); TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans); TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport()); try { processorFactory_.getProcessor(inTrans).process(inProt, outProt); responseReady(); return; } catch (TException te) { LOGGER.warn("Exception while invoking!", te); } catch (Throwable t) { LOGGER.error("Unexpected throwable while invoking!", t); } // This will only be reached when there is a throwable. state_ = FrameBufferState.AWAITING_CLOSE; requestSelectInterestChange(); } public void responseReady() {      // the read buffer is definitely no longer in use, so we will decrement      // our read buffer count. we do this here as well as in close because      // we'd like to free this read memory up as quickly as possible for other      // clients.      readBufferBytesAllocated.addAndGet(-buffer_.array().length);      if (response_.len() == 0) {        // go straight to reading again. this was probably an oneway method        state_ = FrameBufferState.AWAITING_REGISTER_READ;        buffer_ = null;      } else {        buffer_ = ByteBuffer.wrap(response_.get(), 0, response_.len());        // set state that we're waiting to be switched to write. we do this        // asynchronously through requestSelectInterestChange() because there is        // a possibility that we're not in the main thread, and thus currently        // blocked in select(). (this functionality is in place for the sake of        // the HsHa server.)        state_ = FrameBufferState.AWAITING_REGISTER_WRITE;      }      requestSelectInterestChange();    } ~~~ 写消息体responseReday()方法时,我们看到Thrift是如何处理写的 1. 创建ByteBuffer 2. 修改状态到AWAITING_REGISTER_WRITE 3. 调用requestSelecInteresetChange()方法来注册Channel的写事件 4. 当Selector根据isWriteable状态来调用要写的Channel时,会调用FrameBuffer的write方法,上面说了write方法写满包后,会取消注册的写事件。
';

(四)– 方法调用模型分析

最后更新于:2022-04-01 16:20:05

RPC调用本质上就是一种网络编程,客户端向服务器发送消息,服务器拿到消息之后做后续动作。只是RPC这种消息比较特殊,它封装了方法调用,包括方法名,方法参数。服务端拿到这个消息之后,解码消息,然后要通过方法调用模型来完成实际服务器端业务方法的调用。 这篇讲讲Thrfit的方法调用模型。Thrift的方法调用模型很简单,就是通过方法名和实际方法实现类的注册完成,没有使用反射机制,类加载机制。 和方法调用相关的几个核心类: 1. 自动生成的Iface接口,是远程方法的顶层接口 2. 自动生成的Processor类及相关父类,包括TProcessor接口,TBaseProcess抽象类 3. ProcessFunction抽象类,抽象了一个具体的方法调用,包含了方法名信息,调用方法的抽象过程等 4. TNonblcokingServer,是NIO服务器的默认实现,通过Args参数来配置Processor等信息 5. FrameBuffer类,服务器NIO的缓冲区对象,这个对象在服务器端收到全包并解码后,会调用Processor去完成实际的方法调用 6. 服务器端的方法的具体实现类,实现Iface接口 ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-02-19_56c6c62b7baa8.jpg) 下面逐个来分析相关的类。 Iface接口是自动生成的,描述了方法的接口。 服务器端服务提供方DemoService要实现Iface接口 ~~~ public class DemoService { public interface Iface { public int demoMethod(String param1, Parameter param2, Map<String,String> param3) throws org.apache.thrift.TException; } } public class DemoServiceImpl implements DemoService.Iface{  @Override  public int demoMethod(String param1, Parameter param2,          Map<String, String> param3) throws TException {            return 0;  } } ~~~ 来看TProcess相关类和接口 1. TProcessor就定义了一个顶层的调用方法process,参数是输入流和输出流 2. 抽象类TBaseProcessor提供了TProcessor的process的默认实现,先读消息头,拿到要调用的方法名,然后从维护的一个Map中取ProcessFunction对象。ProcessFunction对象是实际方法的抽象,调用它的process方法,实际是调用了实际的方法。 3. Processor类是自动生成了,它依赖Iface接口,负责把实际的方法实现和方法的key关联起来,放到Map中维护 ~~~ public interface TProcessor { public boolean process(TProtocol in, TProtocol out) throws TException; } public abstract class TBaseProcessor<I> implements TProcessor { private final I iface; private final Map<String,ProcessFunction<I, ? extends TBase>> processMap; protected TBaseProcessor(I iface, Map<String, ProcessFunction<I, ? extends TBase>> processFunctionMap) {    this.iface = iface;    this.processMap = processFunctionMap; } @Override public boolean process(TProtocol in, TProtocol out) throws TException {    TMessage msg = in.readMessageBegin();    ProcessFunction fn = processMap.get(msg.name);    if (fn == null) {      TProtocolUtil.skip(in, TType.STRUCT);      in.readMessageEnd();      TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");      out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));      x.write(out);      out.writeMessageEnd();      out.getTransport().flush();      return true;    }    fn.process(msg.seqid, in, out, iface);    return true; } } ~~~ ~~~ public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor { public Processor(I iface) { super(iface, getProcessMap(new HashMap<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>())); } protected Processor(I iface, Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) { super(iface, getProcessMap(processMap)); } private static <I extends Iface> Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) { processMap.put("demoMethod", new demoMethod()); return processMap; } private static class demoMethod<I extends Iface> extends org.apache.thrift.ProcessFunction<I, demoMethod_args> { public demoMethod() { super("demoMethod"); } protected demoMethod_args getEmptyArgsInstance() { return new demoMethod_args(); } protected demoMethod_result getResult(I iface, demoMethod_args args) throws org.apache.thrift.TException { demoMethod_result result = new demoMethod_result(); result.success = iface.demoMethod(args.param1, args.param2, args.param3); result.setSuccessIsSet(true); return result; } } } ~~~ 自动生成的demoMethod类继承了ProcessFunction类,它负载把方法参数,iface, 方法返回值这些抽象的概念组合在一起,通过抽象模型来完成实际方法的调用。实际方法的实现者实现了Iface接口。 TNonblockingServer是NIO服务器的实现,它通过Selector来检查IO就绪状态,进而调用相关的Channel。就方法调用而言,它处理的是读事件,用handelRead()来进一步处理 ~~~ private void select() { try { // wait for io events. selector.select(); // process the io events we received Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); while (!stopped_ && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selectedKeys.remove(); // skip if not valid if (!key.isValid()) { cleanupSelectionKey(key); continue; } // if the key is marked Accept, then it has to be the server // transport. if (key.isAcceptable()) { handleAccept(); } else if (key.isReadable()) { // deal with reads handleRead(key); } else if (key.isWritable()) { // deal with writes handleWrite(key); } else { LOGGER.warn("Unexpected state in select! " + key.interestOps()); } } } catch (IOException e) { LOGGER.warn("Got an IOException while selecting!", e); } } protected void handleRead(SelectionKey key) {      FrameBuffer buffer = (FrameBuffer) key.attachment();      if (!buffer.read()) {        cleanupSelectionKey(key);        return;      }      // if the buffer's frame read is complete, invoke the method.      <strong>if (buffer.isFrameFullyRead()) {        if (!requestInvoke(buffer)) {          cleanupSelectionKey(key);        }      }</strong>    } protected boolean requestInvoke(FrameBuffer frameBuffer) {    frameBuffer.invoke();    return true; } ~~~ 非阻塞同步IO的NIO服务器都会使用缓冲区来存放读写的中间状态。FrameBuffer就是这样的一个缓冲区,它由于涉及到方法调用,所以提供了invoke()方法来实现对Processor的调用。 ~~~ public void invoke() { TTransport inTrans = getInputTransport(); TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans); TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport()); try { processorFactory_.getProcessor(inTrans).process(inProt, outProt); responseReady(); return; } catch (TException te) { LOGGER.warn("Exception while invoking!", te); } catch (Throwable t) { LOGGER.error("Unexpected throwable while invoking!", t); } // This will only be reached when there is a throwable. state_ = FrameBufferState.AWAITING_CLOSE; requestSelectInterestChange(); } ~~~ FrameBuffer使用了processorFactory来获得Processor。ProcessorFactory是在创建服务器的时候传递过来的,只是对Processor的简单封装。 ~~~ protected TServer(AbstractServerArgs args) { processorFactory_ = args.processorFactory; serverTransport_ = args.serverTransport; inputTransportFactory_ = args.inputTransportFactory; outputTransportFactory_ = args.outputTransportFactory; inputProtocolFactory_ = args.inputProtocolFactory; outputProtocolFactory_ = args.outputProtocolFactory; } public class TProcessorFactory { private final TProcessor processor_; public TProcessorFactory(TProcessor processor) {    processor_ = processor; } public TProcessor getProcessor(TTransport trans) {    return processor_; } } public T processor(TProcessor processor) {      this.processorFactory = new TProcessorFactory(processor);      return (T) this;    } ~~~ 下面是一个实际的TNonblockingServer的配置实例 除了配置服务器运行的基本参数,最重要的就是把实际的服务提供者通过服务器参数的方式作为Processor传递给TNonblockingServer,供FrameBuffer调用。 ~~~ public class DemoServiceImpl implements DemoService.Iface{ @Override public int demoMethod(String param1, Parameter param2, Map<String, String> param3) throws TException { return 0; } public static void main(String[] args){ TNonblockingServerSocket socket; try { socket = new TNonblockingServerSocket(9090); TNonblockingServer.Args options = new TNonblockingServer.Args(socket); TProcessor processor = new DemoService.Processor(new DemoServiceImpl()); options.processor(processor); options.protocolFactory(new TCompactProtocol.Factory()); TServer server = new TNonblockingServer(options); server.serve(); } catch (Exception e) { throw new RuntimeException(e); } } } ~~~
';

(三)– IDL和生成代码分析

最后更新于:2022-04-01 16:20:02

IDL是很多RPC框架用来支持跨语言环境调用的一个服务描述组件,一般都是采用文本格式来定义。 更多IDL的思考查看[《理解WSDL, IDL》](http://blog.csdn.net/iter_zc/article/details/39338367) Thrift的不同版本定义IDL的语法也不太相同,这里使用Thrift-0.8.0这个版本来介绍Java下的IDL定义 1. namespace 定义包名 2. struct 定义服务接口的参数,返回值使用到的类结构。如果接口的参数都是基本类型,则不需要定义struct 3. service 定义接口 一个简单的例子,IDL文件以.thrift为后缀名。  demo.thrift 1. 定义了生成的Java文件的包名为com.thrift.test 2. 定义了一个struct类结构作为参数 3.定义了一个service接口,返回值是int,方法名叫demoMethod,参数有三个,第一个是字符串类型,第二个是上面定义的类Parameter,第三个是Map类型 ~~~ namespace java com.thrift.test struct Parameter{ 1: required i32 id; 2: required string name; } service DemoService{ i32 demoMethod(1:string param1, 2:Parameter param2, 3:map<string,string> param3); } ~~~ IDL支持的数据类型包括以下部分 ~~~ bool 布尔型 byte 8位整数 i16 16位整数 i32 32位整数 i64 64位整数 double 双精度浮点数 string 字符串 binary 字节数组 list<i16> List集合,必须指明泛型 map<string, string> Map类型,必须指明泛型 set<i32> Set集合,必须指明泛型 ~~~ 有了IDL之后,就可以使用thrift来自动生成辅助代码,包括客户端代码和序列化接口的代码 ~~~ thrift -r --gen java demo.thrift ~~~ 生成的代码如下 ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-02-19_56c6c62b14d87.jpg) 每个Struct会单独生成一个类,每个Service会生成一个类。 看一下生成类的具体结构 ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-02-19_56c6c62b25fe8.jpg) 生成的类主要有5个部分 **1. 接口类型**,默认名称都是Iface。这个接口类型被服务器和客户端共同使用。服务器端使用它来做顶层接口,编写实现类。客户端代码使用它作为生成代理的服务接口。 自动生成的接口有两个,一个是同步调用的Iface,一个是异步调用的AsyncIface。异步调用的接口多了一个回调参数。 ~~~ public interface Iface { public int demoMethod(String param1, Parameter param2, Map<String,String> param3) throws org.apache.thrift.TException; } public interface AsyncIface { public void demoMethod(String param1, Parameter param2, Map<String,String> param3, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.demoMethod_call> resultHandler) throws org.apache.thrift.TException; } ~~~ **2. 客户端类型**,一个同步调用的客户端Client,一个异步调用的客户端AsyncClient **3. Processor**,用来支持方法调用,每个服务的实现类都要使用Processor来注册,这样最后服务器端调用接口实现时能定位到具体的实现类。后面会有专门的文章介绍 **4.方法参数的封装类**,以"方法名_args"命名 **5.方法返回值的封装类**,以"方法名_result"命名 看一下生成的同步调用客户端Client的具体代码 1. 提供一个工厂方法来创建Client对象 2.接口方法的客户端代理,只做了两件事,发送方法调用请求;接收返回值 发送方法调用请求做了2件事 1. 创建方法参数对象,封装方法参数 2. 调用父类的sendBase方法来发送消息。发送消息时先通过writeMessageBegin发送消息头,再调用方法参数对象的write(TProtocol)方法发送消息体,最后结束发送 接受调用返回值做了2件事 1. 创建方法返回值对象,封装方法返回值 2. 调用父类的receiveBase方法接收方法返回值。先通过receiveMessage接收消息体,处理异常,然后调用方法参数对象的read(TProtocol)方法来接收消息体,最后结束接收 ~~~ public static class Client extends org.apache.thrift.TServiceClient implements Iface { public static class Factory implements org.apache.thrift.TServiceClientFactory<Client> { public Factory() {} public Client getClient(org.apache.thrift.protocol.TProtocol prot) { return new Client(prot); } public Client getClient(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) { return new Client(iprot, oprot); } }   public int demoMethod(String param1, Parameter param2, Map<String,String> param3) throws org.apache.thrift.TException { send_demoMethod(param1, param2, param3); return recv_demoMethod(); } public void send_demoMethod(String param1, Parameter param2, Map<String,String> param3) throws org.apache.thrift.TException { demoMethod_args args = new demoMethod_args(); args.setParam1(param1); args.setParam2(param2); args.setParam3(param3); sendBase("demoMethod", args); } public int recv_demoMethod() throws org.apache.thrift.TException { demoMethod_result result = new demoMethod_result(); receiveBase(result, "demoMethod"); if (result.isSetSuccess()) { return result.success; } throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "demoMethod failed: unknown result"); } } //org.apache.thrift.TServiceClient.sendBase,客户端的父类方法 protected void sendBase(String methodName, TBase args) throws TException {   // 发送消息头    oprot_.writeMessageBegin(new TMessage(methodName, TMessageType.CALL, ++seqid_)); // 发送消息体,由方法参数对象自己处理编解码    args.write(oprot_);    oprot_.writeMessageEnd();    oprot_.getTransport().flush(); } protected void receiveBase(TBase result, String methodName) throws TException { // 接收消息头    TMessage msg = iprot_.readMessageBegin();    if (msg.type == TMessageType.EXCEPTION) {      TApplicationException x = TApplicationException.read(iprot_);      iprot_.readMessageEnd();      throw x;    }    if (msg.seqid != seqid_) {      throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName + " failed: out of sequence response");    } //由返回值对象自己处理编解码    result.read(iprot_);    iprot_.readMessageEnd(); } ~~~ 看一下方法参数对象 方法参数实现了TBase接口,TBase接口定义了一个对象在某种协议下的编解码接口。 ~~~ public interface TBase<T extends TBase<?,?>, F extends TFieldIdEnum> extends Comparable<T>, Serializable { public void read(TProtocol iprot) throws TException; public void write(TProtocol oprot) throws TException; } ~~~ 方法参数对象主要做了2件事 1. 创建每个参数的元数据,包括参数类型,顺序号。顺序号是在IDL定义的时候设置的,用来识别参数的位置,在编解码的时候有用 2. **实现自己的编解码方法, read(TProtocol), write(TProtocol)。这里又把具体的编解码功能委托给了XXXScheme类** ~~~ public static class demoMethod_args implements org.apache.thrift.TBase<demoMethod_args, demoMethod_args._Fields>, java.io.Serializable, Cloneable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("demoMethod_args"); private static final org.apache.thrift.protocol.TField PARAM1_FIELD_DESC = new org.apache.thrift.protocol.TField("param1", org.apache.thrift.protocol.TType.STRING, (short)1); private static final org.apache.thrift.protocol.TField PARAM2_FIELD_DESC = new org.apache.thrift.protocol.TField("param2", org.apache.thrift.protocol.TType.STRUCT, (short)2); private static final org.apache.thrift.protocol.TField PARAM3_FIELD_DESC = new org.apache.thrift.protocol.TField("param3", org.apache.thrift.protocol.TType.MAP, (short)3); private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); static { schemes.put(StandardScheme.class, new demoMethod_argsStandardSchemeFactory()); schemes.put(TupleScheme.class, new demoMethod_argsTupleSchemeFactory()); } public String param1; // required public Parameter param2; // required public Map<String,String> param3; // required /**The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { PARAM1((short)1, "param1"), PARAM2((short)2, "param2"), PARAM3((short)3, "param3"); private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); static { for (_Fields field : EnumSet.allOf(_Fields.class)) { byName.put(field.getFieldName(), field); } }   // 对象自己负责解码  public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {      schemes.get(iprot.getScheme()).getScheme().read(iprot, this);    } <pre name="code" class="java">  // 对象自己负责编码    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {      schemes.get(oprot.getScheme()).getScheme().write(oprot, this);    } ~~~ 再来看XXXScheme类,这也是自动生成的,是方法参数XXX_args的内部类。 Scheme有两类,一个是StandardScheme,使用消息头+消息体的方式来编解码对象。一个是TupleScheme,直接采用写消息体的方式编解码,编码字节流更小。 ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-02-19_56c6c62b69bd2.jpg) 拿demoMethod_argsStandardScheme举例, 1. 它的编码方法就是从writeStructBegin开始逐个写字段,每个字段写之前会writeFieldBegin开始,写字段类型和字段的顺序号。如果字段是一个类Struct,就调用这个类自己的编码方法write(TProtocol)。Thrift会给每个Struct生成类,这些类里面定义了这个类的编解码方法。最后写完之后以writeStructEnd结束 2. 它的解码方法从readStructBegin开始,然后读字段元数据readFieldBegin,读1个字节的字段类型,2个字段的字节顺序号,然后根据字段类型,来读相应类型长度的数据。直到读完用readStructEnd结束。 ~~~ private static class demoMethod_argsStandardScheme extends StandardScheme<demoMethod_args> { public void read(org.apache.thrift.protocol.TProtocol iprot, demoMethod_args struct) throws org.apache.thrift.TException { org.apache.thrift.protocol.TField schemeField; iprot.readStructBegin(); while (true) { schemeField = iprot.readFieldBegin(); if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { break; } switch (schemeField.id) { case 1: // PARAM1 if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { struct.param1 = iprot.readString(); struct.setParam1IsSet(true); } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; case 2: // PARAM2 if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) { struct.param2 = new Parameter(); struct.param2.read(iprot); struct.setParam2IsSet(true); } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; case 3: // PARAM3 if (schemeField.type == org.apache.thrift.protocol.TType.MAP) { { org.apache.thrift.protocol.TMap _map0 = iprot.readMapBegin(); struct.param3 = new HashMap<String,String>(2*_map0.size); for (int _i1 = 0; _i1 < _map0.size; ++_i1) { String _key2; // required String _val3; // required _key2 = iprot.readString(); _val3 = iprot.readString(); struct.param3.put(_key2, _val3); } iprot.readMapEnd(); } struct.setParam3IsSet(true); } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } iprot.readFieldEnd(); } iprot.readStructEnd(); // check for required fields of primitive type, which can't be checked in the validate method struct.validate(); } public void write(org.apache.thrift.protocol.TProtocol oprot, demoMethod_args struct) throws org.apache.thrift.TException { struct.validate(); oprot.writeStructBegin(STRUCT_DESC); if (struct.param1 != null) { oprot.writeFieldBegin(PARAM1_FIELD_DESC); oprot.writeString(struct.param1); oprot.writeFieldEnd(); } if (struct.param2 != null) { oprot.writeFieldBegin(PARAM2_FIELD_DESC); struct.param2.write(oprot); oprot.writeFieldEnd(); } if (struct.param3 != null) { oprot.writeFieldBegin(PARAM3_FIELD_DESC); { oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, struct.param3.size())); for (Map.Entry<String, String> _iter4 : struct.param3.entrySet()) { oprot.writeString(_iter4.getKey()); oprot.writeString(_iter4.getValue()); } oprot.writeMapEnd(); } oprot.writeFieldEnd(); } oprot.writeFieldStop(); oprot.writeStructEnd(); } } ~~~ ~~~ private static class demoMethod_argsTupleScheme extends TupleScheme<demoMethod_args> { @Override public void write(org.apache.thrift.protocol.TProtocol prot, demoMethod_args struct) throws org.apache.thrift.TException { TTupleProtocol oprot = (TTupleProtocol) prot; BitSet optionals = new BitSet(); if (struct.isSetParam1()) { optionals.set(0); } if (struct.isSetParam2()) { optionals.set(1); } if (struct.isSetParam3()) { optionals.set(2); } oprot.writeBitSet(optionals, 3); if (struct.isSetParam1()) { oprot.writeString(struct.param1); } if (struct.isSetParam2()) { struct.param2.write(oprot); } if (struct.isSetParam3()) { { oprot.writeI32(struct.param3.size()); for (Map.Entry<String, String> _iter5 : struct.param3.entrySet()) { oprot.writeString(_iter5.getKey()); oprot.writeString(_iter5.getValue()); } } } } @Override public void read(org.apache.thrift.protocol.TProtocol prot, demoMethod_args struct) throws org.apache.thrift.TException { TTupleProtocol iprot = (TTupleProtocol) prot; BitSet incoming = iprot.readBitSet(3); if (incoming.get(0)) { struct.param1 = iprot.readString(); struct.setParam1IsSet(true); } if (incoming.get(1)) { struct.param2 = new Parameter(); struct.param2.read(iprot); struct.setParam2IsSet(true); } if (incoming.get(2)) { { org.apache.thrift.protocol.TMap _map6 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32()); struct.param3 = new HashMap<String,String>(2*_map6.size); for (int _i7 = 0; _i7 < _map6.size; ++_i7) { String _key8; // required String _val9; // required _key8 = iprot.readString(); _val9 = iprot.readString(); struct.param3.put(_key8, _val9); } } struct.setParam3IsSet(true); } } } } ~~~ 方法返回值封装类的结构和方法参数封装类的结构和原理完全一致,这里不再赘述。 [   ](http://blog.csdn.net/iter_zc/article/details/39338367)
';

(二)– 协议和编解码

最后更新于:2022-04-01 16:20:00

协议和编解码是一个网络应用程序的核心问题之一,客户端和服务器通过约定的协议来传输消息(数据),通过特定的格式来编解码字节流,并转化成业务消息,提供给上层框架调用。 Thrift的协议比较简单,它把协议和编解码整合在了一起。抽象类TProtocol定义了协议和编解码的顶层接口。个人感觉采用抽象类而不是接口的方式来定义顶层接口并不好,TProtocol关联了一个TTransport传输对象,而不是提供一个类似getTransport()的接口,导致抽象类的扩展性比接口差。 TProtocol主要做了两个事情: 1. 关联TTransport对象 2.定义一系列读写消息的编解码接口,包括两类,一类是复杂数据结构比如readMessageBegin, readMessageEnd,  writeMessageBegin, writMessageEnd.还有一类是基本数据结构,比如readI32, writeI32, readString, writeString ~~~ public abstract class TProtocol { /** * Transport */ protected TTransport trans_;  public abstract void writeMessageBegin(TMessage message) throws TException; public abstract void writeMessageEnd() throws TException; public abstract void writeStructBegin(TStruct struct) throws TException; public abstract void writeStructEnd() throws TException; public abstract void writeFieldBegin(TField field) throws TException; public abstract void writeFieldEnd() throws TException; public abstract void writeFieldStop() throws TException; public abstract void writeMapBegin(TMap map) throws TException; public abstract void writeMapEnd() throws TException; public abstract void writeListBegin(TList list) throws TException; public abstract void writeListEnd() throws TException; public abstract void writeSetBegin(TSet set) throws TException; public abstract void writeSetEnd() throws TException; public abstract void writeBool(boolean b) throws TException; public abstract void writeByte(byte b) throws TException; public abstract void writeI16(short i16) throws TException; public abstract void writeI32(int i32) throws TException; public abstract void writeI64(long i64) throws TException; public abstract void writeDouble(double dub) throws TException; public abstract void writeString(String str) throws TException; public abstract void writeBinary(ByteBuffer buf) throws TException; /** * Reading methods. */ public abstract TMessage readMessageBegin() throws TException; public abstract void readMessageEnd() throws TException; public abstract TStruct readStructBegin() throws TException; public abstract void readStructEnd() throws TException; public abstract TField readFieldBegin() throws TException; public abstract void readFieldEnd() throws TException; public abstract TMap readMapBegin() throws TException; public abstract void readMapEnd() throws TException; public abstract TList readListBegin() throws TException; public abstract void readListEnd() throws TException; public abstract TSet readSetBegin() throws TException; public abstract void readSetEnd() throws TException; public abstract boolean readBool() throws TException; public abstract byte readByte() throws TException; public abstract short readI16() throws TException; public abstract int readI32() throws TException; public abstract long readI64() throws TException; public abstract double readDouble() throws TException; public abstract String readString() throws TException; public abstract ByteBuffer readBinary() throws TException; /** * Reset any internal state back to a blank slate. This method only needs to * be implemented for stateful protocols. */ public void reset() {} /** * Scheme accessor */ public Class<? extends IScheme> getScheme() {    return StandardScheme.class; } } ~~~ 所谓协议就是客户端和服务器端约定传输什么数据,如何解析传输的数据。对于一个RPC调用的协议来说,要传输的数据主要有: 调用方 1. 方法的名称,包括类的名称和方法的名称 2. 方法的参数,包括类型和参数值 3.一些附加的数据,比如附件,超时事件,自定义的控制信息等等 返回方 1. 调用的返回码 2. 返回值 3.异常信息 从TProtocol的定义我们可以看出Thrift的协议约定如下事情: 1. 先writeMessageBegin表示开始传输消息了,写消息头。Message里面定义了方法名,调用的类型,版本号,消息seqId 2.接下来是写方法的参数,实际就是写消息体。如果参数是一个类,就writeStructBegin 3. 接下来写字段,writeFieldBegin, 这个方法会写接下来的字段的数据类型和顺序号。这个顺序号是Thrfit对要传输的字段的一个编码,从1开始 4. 如果是一个集合就writeListBegin/writeMapBegin,如果是一个基本数据类型,比如int, 就直接writeI32 5. 每个复杂数据类型写完都调用writeXXXEnd,直到writeMessageEnd结束 6. 读消息时根据数据类型读取相应的长度 每个writeXXX都是采用消息头+消息体的方式。我们来看TBinaryProtocol的实现。 1.writeMessgeBegin方法写了消息头,包括4字节的版本号和类型信息,字符串类型的方法名,4字节的序列号seqId 2. writeFieldBegin,写了1个字节的字段数据类型,和2个字节字段的顺序号 3. writeI32,写了4个字节的字节数组 4. writeString,先写4字节消息头表示字符串长度,再写字符串字节 5. writeBinary,先写4字节消息头表示字节数组长度,再写字节数组内容 6.readMessageBegin时,先读4字节版本和类型信息,再读字符串,再读4字节序列号 7.readFieldBegin,先读1个字节的字段数据类型,再读2个字节的字段顺序号 8. readString时,先读4字节字符串长度,再读字符串内容。**字符串统一采用UTF-8编码** ~~~ public void writeMessageBegin(TMessage message) throws TException { if (strictWrite_) { int version = VERSION_1 | message.type; writeI32(version); writeString(message.name); writeI32(message.seqid); } else { writeString(message.name); writeByte(message.type); writeI32(message.seqid); } } public void writeFieldBegin(TField field) throws TException {    writeByte(field.type);    writeI16(field.id); } private byte[] i32out = new byte[4]; public void writeI32(int i32) throws TException {    i32out[0] = (byte)(0xff & (i32 >> 24));    i32out[1] = (byte)(0xff & (i32 >> 16));    i32out[2] = (byte)(0xff & (i32 >> 8));    i32out[3] = (byte)(0xff & (i32));    trans_.write(i32out, 0, 4); } public void writeString(String str) throws TException {    try {      byte[] dat = str.getBytes("UTF-8");      writeI32(dat.length);      trans_.write(dat, 0, dat.length);    } catch (UnsupportedEncodingException uex) {      throw new TException("JVM DOES NOT SUPPORT UTF-8");    } } public void writeBinary(ByteBuffer bin) throws TException {    int length = bin.limit() - bin.position();    writeI32(length);    trans_.write(bin.array(), bin.position() + bin.arrayOffset(), length); } public TMessage readMessageBegin() throws TException {    int size = readI32();    if (size < 0) {      int version = size & VERSION_MASK;      if (version != VERSION_1) {        throw new TProtocolException(TProtocolException.BAD_VERSION, "Bad version in readMessageBegin");      }      return new TMessage(readString(), (byte)(size & 0x000000ff), readI32());    } else {      if (strictRead_) {        throw new TProtocolException(TProtocolException.BAD_VERSION, "Missing version in readMessageBegin, old client?");      }      return new TMessage(readStringBody(size), readByte(), readI32());    } } public TField readFieldBegin() throws TException {    byte type = readByte();    short id = type == TType.STOP ? 0 : readI16();    return new TField("", type, id); } public String readString() throws TException {    int size = readI32();    if (trans_.getBytesRemainingInBuffer() >= size) {      try {        String s = new String(trans_.getBuffer(), trans_.getBufferPosition(), size, "UTF-8");        trans_.consumeBuffer(size);        return s;      } catch (UnsupportedEncodingException e) {        throw new TException("JVM DOES NOT SUPPORT UTF-8");      }    }    return readStringBody(size); } ~~~ TProtocol定义了基本的协议信息,包括传输什么数据,如何解析传输的数据的基本方法。 ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-02-19_56c6c620781cd.jpg) 还存在一个问题,就是服务器端如何知道客户端发送过来的数据是怎么组合的,比如第一个字段是字符串类型,第二个字段是int。这个信息是在IDL生成客户端时生成的代码时提供了。Thrift生成的客户端代码提供了读写参数的方法,这两个方式是一一对应的,包括字段的序号,类型等等。客户端使用写参数的方法,服务器端使用读参数的方法。 关于IDL生成的客户端代码会在后面的文章具体描述。下面简单看一下自动生成的代码 1. 方法的调用从writeMessageBegin开始,发送了消息头信息 2. 写方法的参数,也就是写消息体。方法参数由一个统一的接口TBase描述,提供了read和write的统一接口。自动生成的代码提供了read, write方法参数的具体实现 3. 写完结束  ~~~ public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException { prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("handle", org.apache.thrift.protocol.TMessageType.CALL, 0)); handle_args args = new handle_args(); args.setIdentity(identity); args.setUid(uid); args.setSid(sid); args.setType(type); args.setMessage(message); args.setParams(params); args.write(prot); prot.writeMessageEnd(); } public interface TBase<T extends TBase<?,?>, F extends TFieldIdEnum> extends Comparable<T>,  Serializable { public void read(TProtocol iprot) throws TException; public void write(TProtocol oprot) throws TException; } public static class handle_args <strong>implements org.apache.thrift.TBase</strong><handle_args, handle_args._Fields>, java.io.Serializable, Cloneable   {    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("handle_args");    private static final org.apache.thrift.protocol.TField IDENTITY_FIELD_DESC = new org.apache.thrift.protocol.TField("identity", org.apache.thrift.protocol.TType.STRING, (short)1);    private static final org.apache.thrift.protocol.TField UID_FIELD_DESC = new org.apache.thrift.protocol.TField("uid", org.apache.thrift.protocol.TType.I64, (short)2);    private static final org.apache.thrift.protocol.TField SID_FIELD_DESC = new org.apache.thrift.protocol.TField("sid", org.apache.thrift.protocol.TType.STRING, (short)3);    private static final org.apache.thrift.protocol.TField TYPE_FIELD_DESC = new org.apache.thrift.protocol.TField("type", org.apache.thrift.protocol.TType.I32, (short)4);    private static final org.apache.thrift.protocol.TField MESSAGE_FIELD_DESC = new org.apache.thrift.protocol.TField("message", org.apache.thrift.protocol.TType.STRING, (short)5);    private static final org.apache.thrift.protocol.TField PARAMS_FIELD_DESC = new org.apache.thrift.protocol.TField("params", org.apache.thrift.protocol.TType.MAP, (short)6);    private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();    static {      schemes.put(StandardScheme.class, new handle_argsStandardSchemeFactory());      schemes.put(TupleScheme.class, new handle_argsTupleSchemeFactory());    }    public String identity; // required    public long uid; // required    public String sid; // required    public int type; // required    public String message; // required    public Map<String,String> params; // required    /**The set of fields this struct contains, along with convenience methods for finding and manipulating them. */    public enum _Fields implements org.apache.thrift.TFieldIdEnum {      IDENTITY((short)1, "identity"),      UID((short)2, "uid"),      SID((short)3, "sid"),      TYPE((short)4, "type"),      MESSAGE((short)5, "message"),      PARAMS((short)6, "params"); } //  自动生成的写方法参数的方法,按照字段顺序写,给客户端代码使用      public void write(org.apache.thrift.protocol.TProtocol oprot, handle_args struct) throws org.apache.thrift.TException {        struct.validate();        oprot.writeStructBegin(STRUCT_DESC);        if (struct.identity != null) {          oprot.writeFieldBegin(IDENTITY_FIELD_DESC);          oprot.writeString(struct.identity);          oprot.writeFieldEnd();        }        oprot.writeFieldBegin(UID_FIELD_DESC);        oprot.writeI64(struct.uid);        oprot.writeFieldEnd();        if (struct.sid != null) {          oprot.writeFieldBegin(SID_FIELD_DESC);          oprot.writeString(struct.sid);          oprot.writeFieldEnd();        }        oprot.writeFieldBegin(TYPE_FIELD_DESC);        oprot.writeI32(struct.type);        oprot.writeFieldEnd();        if (struct.message != null) {          oprot.writeFieldBegin(MESSAGE_FIELD_DESC);          oprot.writeString(struct.message);          oprot.writeFieldEnd();        } } ~~~ <pre name="code" class="java">//  自动生成的读方法参数的方法,按照字段顺序读,给服务器端代码使用 ~~~ public void read(org.apache.thrift.protocol.TProtocol iprot, handle_args struct) throws org.apache.thrift.TException {        org.apache.thrift.protocol.TField schemeField;        iprot.readStructBegin();        while (true)        {          schemeField = iprot.readFieldBegin();          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {            break;          }          switch (schemeField.id) {            case 1: // IDENTITY              if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {                struct.identity = iprot.readString();                struct.setIdentityIsSet(true);              } else {                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);              }              break;            case 2: // UID              if (schemeField.type == org.apache.thrift.protocol.TType.I64) {                struct.uid = iprot.readI64();                struct.setUidIsSet(true);              } else {                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);              }              break;            case 3: // SID              if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {                struct.sid = iprot.readString();                struct.setSidIsSet(true);              } else {                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);              }              break;            case 4: // TYPE              if (schemeField.type == org.apache.thrift.protocol.TType.I32) {                struct.type = iprot.readI32();                struct.setTypeIsSet(true);              } else {                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);              }              break; } ~~~
';

(一)– 基本概念

最后更新于:2022-04-01 16:19:58

我所在的公司使用Thrift作为基础通信组件,相当一部分的RPC服务基于Thrift框架。公司的日UV在千万级别,Thrift很好地支持了高并发访问,并且Thrift相对简单地编程模型也提高了服务地开发效率。 Thrift源于Facebook,目前已经作为开源项目提交给了Apahce。Thrift解决了Facebook各系统的大数据量传输通信和内部不同语言环境的跨平台调用。 Thrift的官方网站: [http://thrift.apache.org/](http://thrift.apache.org/) 作为一个高性能的RPC框架,Thrift的主要特点有 1. 基于二进制的高性能的编解码框架 2. 基于NIO的底层通信 3. 相对简单的服务调用模型 4. 使用IDL支持跨平台调用 这张图来自[《深入浅出RPC - 深入篇》](http://blog.csdn.net/mindfloating/article/details/39474123) 描述了一个RPC框架的基本组件,包括服务器端发布和调用服务组件,NIO组件,协议和编解码组件,客户端调用组件,客户端代理组件等等 ![](image/d41d8cd98f00b204e9800998ecf8427e.svg) 对照这个模型,Thrift的核心组件有: TProtocol 协议和编解码组件 TTransport 传输组件 TProcessor 服务调用组件 TServer,Client 服务器和客户端组件 IDL 服务描述组件,负责生产跨平台客户端 这个系列会结合源码,深入分析Thrfit的RPC调用模型和核心组件
';

前言

最后更新于:2022-04-01 16:19:55

> 原文出处:[Thrift源码分析](http://blog.csdn.net/column/details/thrift.html) 作者:[iter_zc](http://blog.csdn.net/iter_zc) **本系列文章经作者授权在看云整理发布,未经作者允许,请勿转载!** # Thrift源码分析 > 本专栏从RPC模型的角度分析了Thrift的方方面面,包括Thrift的基本概念,协议和编解码,网络传输,客户端,服务器,IDL,方法调用模型,缓冲区等等,并结合代码分析Thrift的设计和实现
';