Jafka 源码阅读之通信协议及相关类解析 03 August 2012
本文讲解Jafka的通信协议,其实就是传输数据的约定格式。
Jafka的通信发生在producer和broker、consumer和broker之间,目前producer和broker之间通信是单向的(producer->broker),consumer和broker之间通信是双向的(consumer<->broker)。主要涉及到的类有Message MessageSet Request Send Receive,下面分别介绍这些类。
Message(com.sohu.jafka.message)
Message类是具体的消息数据,在传递的过程中,其字节序列的组成及含义如下:
字节大小(Byte) | 含义 |
---|---|
4 | length,该条消息总长度 |
1 | version(magic byte),消息版本,适应后面更改通信协议,目前只有一种协议 |
1 | attribute,目前主要用于指明压缩算法:0–NoCompression 1–GzipCompression |
4 | crc32,消息完整性校验 |
x | 实际数据,x=length-1-1-4 |
Message类即对这个字节序列作了封装,其主要的属性和方法如下:
1 //消息总长度length
2 private final int messageSize;
3 public int getSizeInBytes() {
4 return messageSize;
5 }
6
7
8 //存储字节序列length之后的数据
9 final ByteBuffer buffer;
10
11 //自定义version类型,目前只有这一个
12 private static final byte MAGIC_VERSION2 = 1;
13 //当前version值
14 public static final byte CurrentMagicValue = 1;
15 //version值的偏移和长度
16 public static final byte MAGIC_OFFSET = 0;
17 public static final byte MAGIC_LENGTH = 1;
18 public byte magic() {
19 return buffer.get(MAGIC_OFFSET);
20 }
21
22
23 //属性值的偏移和长度
24 public static final byte ATTRIBUTE_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH;
25 public static final byte ATTRIBUT_ELENGTH = 1;
26 public byte attributes() {
27 return buffer.get(ATTRIBUTE_OFFSET);
28 }
29
30
31 //属性中只用最后两位指明压缩算法
32 public static final int CompressionCodeMask = 0x03; //
33 //0表明不使用压缩算法
34 public static final int NoCompression = 0;
35
36 //crc长度
37 public static final byte CrcLength = 4;
38
39 //计算crc的偏移
40 public static int crcOffset(byte magic) {
41 switch (magic) {
42 case MAGIC_VERSION2:
43 return ATTRIBUTE_OFFSET + ATTRIBUT_ELENGTH;
44 }
45 throw new UnknownMagicByteException(format("Magic byte value of %d is unknown", magic));
46 }
47
48
49 //消息数据的偏移
50 public static int payloadOffset(byte magic) {
51 return crcOffset(magic) + CrcLength;
52 }
53 //获取实际的消息数据
54 public ByteBuffer payload() {
55 ByteBuffer payload = buffer.duplicate();
56 payload.position(headerSize(magic()));
57 payload = payload.slice();
58 payload.limit(payloadSize());
59 payload.rewind();
60 return payload;
61 }
从上面的注释中我们可以看到,Message类就是对定义的字节序列格式进行了一个封装,对外提供了方便的调用函数,其主要的一个构造函数如下:
1 //这里的bytes为实际的消息数据,该构造函数会根据传入的参数,自动生成消息的header数据
2 public Message(long checksum, byte[] bytes, CompressionCodec compressionCodec) {
3 //初始化buffer的大小=headerSize+messageLength
4 this(ByteBuffer.allocate(Message.headerSize(Message.CurrentMagicValue) + bytes.length));
5 //存储version,1Byte
6 buffer.put(CurrentMagicValue);
7 byte attributes = 0;
8 if (compressionCodec.codec > 0) {
9 attributes = (byte) (attributes | (CompressionCodeMask & compressionCodec.codec));
10 }
11 //存储attribute,1Byte
12 buffer.put(attributes);
13 //存储crc32
14 Utils.putUnsignedInt(buffer, checksum);
15 //存储消息数据
16 buffer.put(bytes);
17 //定位buffer到头部,以备buffer被读取使用
18 buffer.rewind();
19 }
MessageSet(com.sohu.jafka.message)
从MessageSet的类名中,大家也可以猜到这个类是多个Message的集合,它有两个子类:ByteBufferMessageSet和FileMessageSet,前者供producer和consumer使用,后者供broker使用。首先我们来看下MessageSet的源码。
1 public abstract class MessageSet implements Iterable<MessageAndOffset> {
2
3 public static final MessageSet Empty = new ByteBufferMessageSet(ByteBuffer.allocate(0));
4 //代表消息长度占用字节数:4个
5 public static final int LogOverhead = 4;
6
7 //将多条message封装到一个ByteBuffer中
8 public static ByteBuffer createByteBuffer(CompressionCodec compressionCodec, Message... messages) {
9 if (compressionCodec == CompressionCodec.NoCompressionCodec) {
10 ByteBuffer buffer = ByteBuffer.allocate(messageSetSize(messages));
11 for (Message message : messages) {
12 message.serializeTo(buffer);
13 }
14 buffer.rewind();
15 return buffer;
16 }
17 //
18 if (messages.length == 0) {
19 ByteBuffer buffer = ByteBuffer.allocate(messageSetSize(messages));
20 buffer.rewind();
21 return buffer;
22 }
23 //
24 Message message = CompressionUtils.compress(messages, compressionCodec);
25 ByteBuffer buffer = ByteBuffer.allocate(message.serializedSize());
26 message.serializeTo(buffer);
27 buffer.rewind();
28 return buffer;
29 }
30
31 //每条消息的长度,就是上面讲到Message的字节序列,包含length
32 public static int entrySize(Message message) {
33 return LogOverhead + message.getSizeInBytes();
34 }
35
36 public static int messageSetSize(Iterable<Message> messages) {
37 int size = 0;
38 for (Message message : messages) {
39 size += entrySize(message);
40 }
41 return size;
42 }
43
44 public static int messageSetSize(Message... messages) {
45 int size = 0;
46 for (Message message : messages) {
47 size += entrySize(message);
48 }
49 return size;
50 }
51
52 public abstract long getSizeInBytes();
53
54 public void validate() {
55 for (MessageAndOffset messageAndOffset : this)
56 if (!messageAndOffset.message.isValid()) {
57 throw new InvalidMessageException();
58 }
59 }
60 //将消息数据写入channel中
61 public abstract long writeTo(GatheringByteChannel channel, long offset, long maxSize) throws IOException;
62 }
由上面的代码可以知道,MessageSet封装产生的ByteBuffer就是多个Message首尾相连构造而成。这里要注意一点:这些Message并不一定是单条消息数据,还可能是多条消息数据经过压缩后组成的一条Message,将该Message解压后得到的其实也是一个MessageSet。这一点大家在阅读ByteBufferMessageAndOffset遍历部分代码的时候会看到。
该类可以看作是message的工具类,负责消息数据的批量读和写。另外此类实现了Iterable
1 //获取消息数据集合
2 ByteBufferMessageSet messageSet = consumer.fetch(fetch);
3 //遍历消息数据集合
4 for(MessageAndOffset messageAndOffset:messageSet){
5 ...
6 }
MessageSet的两个子类从名字上可以看出它们封装消息的来源:一个来自ByteBuffer,一个来自File(即jafka文件)。下面分别介绍下这两个类。
ByteBufferMessageSet
该类主要被Producer和Consumer使用,前者将要传送的Message封装成ByteBufferMessageSet,然后传送到broker;后者将fetch到的Message封装成ByteBufferMessageSet,然后遍历消费。那么我们先来看下其提供的封装入口:
1 public ByteBufferMessageSet(ByteBuffer buffer) {
2 this(buffer,0L,ErrorMapping.NoError);
3 }
4 public ByteBufferMessageSet(ByteBuffer buffer,long initialOffset,ErrorMapping errorCode) {
5 this.buffer = buffer;
6 this.initialOffset = initialOffset;
7 this.errorCode = errorCode;
8 this.validBytes = shallowValidBytes();
9 }
10
11 public ByteBufferMessageSet(CompressionCodec compressionCodec,Message...messages) {
12 this(MessageSet.createByteBuffer(compressionCodec, messages),0L,ErrorMapping.NoError);
13 }
14 public ByteBufferMessageSet(Message...messages) {
15 this(CompressionCodec.NoCompressionCodec,messages);
16 }
由传入参数可知,前两个构造函数为consumer使用,后两个为producer使用。在讲解producer的使用时,有讲到一个配置参数serializer.class,它的作用是将ProducerData<K,V>中的K类对象转化为Message,也就是这里构造函数的传入参数Message。
ByteBufferMessageSet的一个重要接口是遍历消息数据,即其iterator()方法,其实现这里不详细讲了,原理简单和大家说一下。前面提到过一个Message可能是多条消息数据缩后构成的,所以在遍历的时候便存在一个是否要遍历压缩的Message中每条消息数据的问题,其由isShallow参数决定:true不遍历,false遍历。ByteBufferMessageSet的iterator方法是调用的是return internalIterator(false);
,是会遍历包括压缩Message中的所有消息数据的。实现方式是通过topIter遍历一级Message,当遇到压缩的Message时,将其解压缩并且用innerIter记录其遍历情况,当遍历结束后,回到topIter继续遍历。
ByteBufferMessageSet的writeTo(Channel)的方法代码如下,将数据写入指定的channel。这里的channel是FileChannel,即该方法的调用时机是broker写数据文件,读者独到源码时可以看到。
1 public long writeTo(GatheringByteChannel channel, long offset, long maxSize) throws IOException {
2 buffer.mark();
3 int written = channel.write(buffer);
4 buffer.reset();
5 return written;
6 }
FileMessageSet
该类主要由broker使用,我们来看下它的构造函数:
1 public FileMessageSet(FileChannel channel, long offset, long limit, //
2 boolean mutable, AtomicBoolean needRecover) throws IOException {
3 super();
4 this.channel = channel;
5 this.offset = offset;
6 this.mutable = mutable;
7 this.needRecover = needRecover;
8 if (mutable) {
9 if (limit < Long.MAX_VALUE || offset > 0) throw new IllegalArgumentException(
10 "Attempt to open a mutable message set with a view or offset, which is not allowed.");
11
12 if (needRecover.get()) {
13 // set the file position to the end of the file for appending messages
14 long startMs = System.currentTimeMillis();
15 long truncated = recover();
16 logger.info("Recovery succeeded in " + (System.currentTimeMillis() - startMs) / 1000 + " seconds. " + truncated + " bytes truncated.");
17 } else {
18 setSize.set(channel.size());
19 setHighWaterMark.set(getSizeInBytes());
20 channel.position(channel.size());
21 }
22 } else {
23 setSize.set(Math.min(channel.size(), limit) - offset);
24 setHighWaterMark.set(getSizeInBytes());
25 }
26 }
27
28 public FileMessageSet(FileChannel channel, boolean mutable) throws IOException {
29 this(channel, 0, Long.MAX_VALUE, mutable, new AtomicBoolean(false));
30 }
31
32 public FileMessageSet(File file, boolean mutable) throws IOException {
33 this(Utils.openChannel(file, mutable), mutable);
34 }
35
36 public FileMessageSet(FileChannel channel, boolean mutable, AtomicBoolean needRecover) throws IOException {
37 this(channel, 0, Long.MAX_VALUE, mutable, needRecover);
38 }
39
40 public FileMessageSet(File file, boolean mutable, AtomicBoolean needRecover) throws IOException {
41 this(Utils.openChannel(file, mutable), mutable, needRecover);
42 }
由第一个构造函数可知,构造一个FileMessageSet需要以下几点:
- FileChannel,即打开一个文件,并且指明是否是mutable(可写)。
- offset,读入文件的起始位置
- limit,读入文件的大小
- mutable,是否可写
fileChannel打开的文件即为jafka文件,其中存储着message,存储的格式与MessageSet是相同的,也是message首尾相连存储。FileMessageSet的遍历比较简单,顺序从channel中读取出来组装成MessageAndOffset即可,这里没有考虑Message是否压缩,原因应该是没有使用的需求。代码就不贴了,大家可以自己去阅读iterator方法。
FileMessageSet的writeTo方法要特别强调一下,之前我们有提到jafka使用了sendfile这个高级系统调用,大大提升了传输效率,对应代码就在这里。
1 public long writeTo(GatheringByteChannel destChannel, long writeOffset, long maxSize) throws IOException {
2 return channel.transferTo(offset + writeOffset, Math.min(maxSize, getSizeInBytes()), destChannel);
3 }
FileMessageSet还有两个方法与消息数据的持久化有关。
1 //将messages添加如当前的messageOffset
2 public long[] append(MessageSet messages) throws IOException {
3 checkMutable();
4 long written = 0L;
5 while (written < messages.getSizeInBytes())
6 written += messages.writeTo(channel, 0, messages.getSizeInBytes());
7 long beforeOffset = setSize.getAndAdd(written);
8 return new long[] { written,beforeOffset };
9 }
10
11 //flush消息到磁盘
12 public void flush() throws IOException {
13 checkMutable();
14 long startTime = System.currentTimeMillis();
15 channel.force(true);
16 long elapsedTime = System.currentTimeMillis() - startTime;
17 LogFlushStats.recordFlushRequest(elapsedTime);
18 logger.debug("flush time " + elapsedTime);
19 setHighWaterMark.set(getSizeInBytes());
20 logger.debug("flush high water mark:" + highWaterMark());
21 }
第一个函数的作用便是将producer传递过来的messages添加到当前messageset对象(channel)中,虽然调用了writeTo方法,但是由于操作系统缓冲的存在,数据可能还没有真正写入磁盘,而flush方法的作用便是强制写磁盘。这两个方法便完成了消息数据持久化到磁盘的操作。 另外FileMessageSet还提供了read方法,可以读取指定offset到offset+limit的所有消息,这里就不贴代码了。
Request(com.sohu.jafka.network)
Request是producer consumer向broker发出请求的封装类,下图是其简单的类图:
用户只要实现Request接口,便可以添加自己的Request类,来实现自己的需求。我们来看下Request相关的源码为:
1 public interface Request extends ICalculable {
2
3 //请求类型,enum
4 RequestKeys getRequestKey();
5 //将该request写入buffer,是序列化的过程,从这里可以看到request的协议格式
6 void writeTo(ByteBuffer buffer);
7 }
8 //request 的所有类型
9 public enum RequestKeys {
10 PRODUCE, //0
11 FETCH, //1
12 MULTIFETCH, //2
13 MULTIPRODUCE, //3
14 OFFSETS,//4
15 CREATE,//5
16 DELETE;//6
17
18 public int value = ordinal();
19
20 final static int size = values().length;
21
22 public static RequestKeys valueOf(int ordinal) {
23 if (ordinal < 0 || ordinal >= size) return null;
24 return values()[ordinal];
25 }
26 }
从上面可以看到RequestKeys的类型与类图中Request的子类是一一对应的,Request在传送时通过writeTo(ByteBuffer)
将自身序列化,有自己的协议格式,这里以ProducerRequest举例,其发送时,ProducerRequest会被封装在一个BoundedByteBufferSend类中,该类会在字节序列中添加消息总长度和request类型这两个基本信息,最终producerRequest的协议格式如下:
字节数(Byte) | 含义 |
---|---|
4 | 消息长度,length |
2 | Request类型,对应RequestKeys |
2 | topic length |
x | topic |
4 | partition |
4 | messageset length |
x | messageset |
当request传递到broker时,在上一篇文章中我们分析processor源码时,曾经提到过的Send handle(SelectionKey key, Receive request)
方法,其中有以下代码:
final short requestTypeId = request.buffer().getShort();
final RequestKeys requestType = RequestKeys.valueOf(requestTypeId);
其中request封装了客户端传递来的request字节序列,此处先读取2个字节,获取request的类型,然后选取对应的handler来处理,感兴趣的读者可以自行去查看相应的代码。
其他Request的协议类型,大家可以自行去查看其writeTo方法,这里就不逐个列举了。
Send(com.sohu.jafka.network)
Send类用于封装发送数据,调用自身的write方法,将数据发送到目的地。其类图如下:
我们先来看下Send的源码:
1 public interface Send extends Transmission {
2
3 //将数据写入channel,返回写入的数据大小。
4 int writeTo(java.nio.channels.GatheringByteChannel channel) throws IOException;
5
6 //数据过大时,要分多次写才能完整发送数据,这便是此方法的作用。
7 int writeCompletely(java.nio.channels.GatheringByteChannel channel) throws IOException;
8 }
9
10 public abstract class AbstractSend extends AbstractTransmission implements Send {
11 public int writeCompletely(GatheringByteChannel channel) throws IOException {
12 int written = 0;
13 while(!complete()) {
14 written += writeTo(channel);
15 }
16 return written;
17 }
18
19 }
AbstractSend
实现了writeCompletely方法,实现也很简单,循环检测是否complete()
,直到全部写完再返回,这也是实际使用中调用的方法,setCompleted()
实际在writeTo方法中被调用。
从Send子类的类名中我们可以看出其send的类,比如MessageSetSend
是封装并发送MessageSet的,ByteBufferSend是发送ByteBuffer的,BoundedByteBufferSend类会在传入ByteBuffer对象前面添加4个字节的length,它也可以封装Request对象,前面讲解ProducerRequest格式的时候有提到这一点,此时它还会添加2个字节的request类型数据。send的使用是很简单的,producer发送数据时会用到如下的代码:
1 BoundedByteBufferSend send = new BoundedByteBufferSend(request);
2 ...
3 getOrMakeConnection();
4 int written = -1;
5 try {
6 written = send.writeCompletely(channel);
7 } catch (IOException e) {
8 // no way to tell if write succeeded. Disconnect and re-throw exception to let client handle retry
9 disconnect();
10 throw new RuntimeException(e);
11 } finally {
12 if (logger.isDebugEnabled()) {
13 logger.debug(format("write %d bytes data to %s:%d", written, host, port));
14 }
15 }
上述代码可以看到,使用简单,先构造一个Send对象,然后调用其writeCompletely方法即可将数据写入到对应的channel了。Send每个子类这里就不详细介绍了,留给大家自己去研究。
Receive(com.sohu.jafka.network)
Receive类负责从socket中接收数据,其类图如下:
我们来看看Receive的源码。
1 public interface Receive extends Transmission {
2 //返回读取到的数据,不包括表示length的4个Byte
3 ByteBuffer buffer();
4 //由channel读取数据
5 int readFrom(ReadableByteChannel channel) throws IOException;
6 //数据过多时,要多次read才可以,此方法保证一次性读取所有数据
7 int readCompletely(ReadableByteChannel channel) throws IOException;
8 }
具体方法的含义,大家可以看代码中的注释,其方法与Send接口正好是相对应的,一个读,一个写。BoundedByteBufferReceive是实际中使用的类,其实现也很简单,一个4字节的sizeBuffer读取消息长度,一个contentBuffer用于读取实际的数据,代码这里就不贴了。
小结
本文主要讲解了Jafka运行中producer consumer与broker之间通信所遵循的协议及其相关的类,希望对大家理解有所帮助。