本文讲解Jafka的通信协议,其实就是传输数据的约定格式。

Jafka的通信发生在producer和broker、consumer和broker之间,目前producer和broker之间通信是单向的(producer->broker),consumer和broker之间通信是双向的(consumer<->broker)。主要涉及到的类有Message MessageSet Request Send Receive,下面分别介绍这些类。

Message(com.sohu.jafka.message)

Message类是具体的消息数据,在传递的过程中,其字节序列的组成及含义如下:

字节大小(Byte) 含义
4 length,该条消息总长度
1 version(magic byte),消息版本,适应后面更改通信协议,目前只有一种协议
1 attribute,目前主要用于指明压缩算法:0–NoCompression 1–GzipCompression
4 crc32,消息完整性校验
x 实际数据,x=length-1-1-4

Message类即对这个字节序列作了封装,其主要的属性和方法如下:

 1 //消息总长度length
 2 private final int messageSize;
 3 public int getSizeInBytes() {
 4         return messageSize;
 5 }
 6 
 7 
 8 //存储字节序列length之后的数据
 9 final ByteBuffer buffer;
10 
11 //自定义version类型,目前只有这一个
12 private static final byte MAGIC_VERSION2 = 1;
13 //当前version值
14 public static final byte CurrentMagicValue = 1;
15 //version值的偏移和长度
16 public static final byte MAGIC_OFFSET = 0;
17 public static final byte MAGIC_LENGTH = 1;
18 public byte magic() {
19         return buffer.get(MAGIC_OFFSET);
20 }
21 
22 
23 //属性值的偏移和长度
24 public static final byte ATTRIBUTE_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH;
25 public static final byte ATTRIBUT_ELENGTH = 1;
26 public byte attributes() {
27         return buffer.get(ATTRIBUTE_OFFSET);
28 }
29 
30 
31 //属性中只用最后两位指明压缩算法
32 public static final int CompressionCodeMask = 0x03; //
33 //0表明不使用压缩算法
34 public static final int NoCompression = 0;
35 
36 //crc长度
37 public static final byte CrcLength = 4;
38 
39 //计算crc的偏移
40 public static int crcOffset(byte magic) {
41         switch (magic) {
42             case MAGIC_VERSION2:
43                 return ATTRIBUTE_OFFSET + ATTRIBUT_ELENGTH;
44         }
45         throw new UnknownMagicByteException(format("Magic byte value of %d is unknown", magic));
46     }
47 
48 
49 //消息数据的偏移
50 public static int payloadOffset(byte magic) {
51         return crcOffset(magic) + CrcLength;
52 }
53 //获取实际的消息数据
54 public ByteBuffer payload() {
55         ByteBuffer payload = buffer.duplicate();
56         payload.position(headerSize(magic()));
57         payload = payload.slice();
58         payload.limit(payloadSize());
59         payload.rewind();
60         return payload;
61     }

从上面的注释中我们可以看到,Message类就是对定义的字节序列格式进行了一个封装,对外提供了方便的调用函数,其主要的一个构造函数如下:

 1 //这里的bytes为实际的消息数据,该构造函数会根据传入的参数,自动生成消息的header数据
 2 public Message(long checksum, byte[] bytes, CompressionCodec compressionCodec) {
 3 	//初始化buffer的大小=headerSize+messageLength
 4         this(ByteBuffer.allocate(Message.headerSize(Message.CurrentMagicValue) + bytes.length));
 5         //存储version,1Byte
 6         buffer.put(CurrentMagicValue);
 7         byte attributes = 0;
 8         if (compressionCodec.codec > 0) {
 9             attributes = (byte) (attributes | (CompressionCodeMask & compressionCodec.codec));
10         }
11         //存储attribute,1Byte
12         buffer.put(attributes);
13         //存储crc32
14         Utils.putUnsignedInt(buffer, checksum);
15         //存储消息数据
16         buffer.put(bytes);
17         //定位buffer到头部,以备buffer被读取使用
18         buffer.rewind();
19     }

MessageSet(com.sohu.jafka.message)

从MessageSet的类名中,大家也可以猜到这个类是多个Message的集合,它有两个子类:ByteBufferMessageSet和FileMessageSet,前者供producer和consumer使用,后者供broker使用。首先我们来看下MessageSet的源码。

 1 public abstract class MessageSet implements Iterable<MessageAndOffset> {
 2 
 3     public static final MessageSet Empty = new ByteBufferMessageSet(ByteBuffer.allocate(0));
 4     //代表消息长度占用字节数:4个
 5     public static final int LogOverhead = 4;
 6 
 7 	//将多条message封装到一个ByteBuffer中
 8     public static ByteBuffer createByteBuffer(CompressionCodec compressionCodec, Message... messages) {
 9         if (compressionCodec == CompressionCodec.NoCompressionCodec) {
10             ByteBuffer buffer = ByteBuffer.allocate(messageSetSize(messages));
11             for (Message message : messages) {
12                 message.serializeTo(buffer);
13             }
14             buffer.rewind();
15             return buffer;
16         }
17         //
18         if (messages.length == 0) {
19             ByteBuffer buffer = ByteBuffer.allocate(messageSetSize(messages));
20             buffer.rewind();
21             return buffer;
22         }
23         //
24         Message message = CompressionUtils.compress(messages, compressionCodec);
25         ByteBuffer buffer = ByteBuffer.allocate(message.serializedSize());
26         message.serializeTo(buffer);
27         buffer.rewind();
28         return buffer;
29     }
30 
31 	//每条消息的长度,就是上面讲到Message的字节序列,包含length
32     public static int entrySize(Message message) {
33         return LogOverhead + message.getSizeInBytes();
34     }
35 
36     public static int messageSetSize(Iterable<Message> messages) {
37         int size = 0;
38         for (Message message : messages) {
39             size += entrySize(message);
40         }
41         return size;
42     }
43 
44     public static int messageSetSize(Message... messages) {
45         int size = 0;
46         for (Message message : messages) {
47             size += entrySize(message);
48         }
49         return size;
50     }
51 
52     public abstract long getSizeInBytes();
53 
54     public void validate() {
55         for (MessageAndOffset messageAndOffset : this)
56             if (!messageAndOffset.message.isValid()) {
57                 throw new InvalidMessageException();
58             }
59     }
60 	//将消息数据写入channel中
61     public abstract long writeTo(GatheringByteChannel channel, long offset, long maxSize) throws IOException;
62 }

由上面的代码可以知道,MessageSet封装产生的ByteBuffer就是多个Message首尾相连构造而成。这里要注意一点:这些Message并不一定是单条消息数据,还可能是多条消息数据经过压缩后组成的一条Message,将该Message解压后得到的其实也是一个MessageSet。这一点大家在阅读ByteBufferMessageAndOffset遍历部分代码的时候会看到。 该类可以看作是message的工具类,负责消息数据的批量读和写。另外此类实现了Iterable,可以遍历MessageAndOffset对象,该对象封装了message数据和下一条message的offset信息。在[consumer](/2012/07/24/jafka-consumer/)里,我们可以使用如下代码遍历获取的消息。

1 //获取消息数据集合
2 ByteBufferMessageSet messageSet = consumer.fetch(fetch);
3 //遍历消息数据集合
4 for(MessageAndOffset messageAndOffset:messageSet){
5 ...
6 }

MessageSet的两个子类从名字上可以看出它们封装消息的来源:一个来自ByteBuffer,一个来自File(即jafka文件)。下面分别介绍下这两个类。

ByteBufferMessageSet

该类主要被Producer和Consumer使用,前者将要传送的Message封装成ByteBufferMessageSet,然后传送到broker;后者将fetch到的Message封装成ByteBufferMessageSet,然后遍历消费。那么我们先来看下其提供的封装入口:

 1     public ByteBufferMessageSet(ByteBuffer buffer) {
 2         this(buffer,0L,ErrorMapping.NoError);
 3     }
 4     public ByteBufferMessageSet(ByteBuffer buffer,long initialOffset,ErrorMapping errorCode) {
 5         this.buffer = buffer;
 6         this.initialOffset = initialOffset;
 7         this.errorCode = errorCode;
 8         this.validBytes = shallowValidBytes();
 9     }
10 
11     public ByteBufferMessageSet(CompressionCodec compressionCodec,Message...messages) {
12         this(MessageSet.createByteBuffer(compressionCodec, messages),0L,ErrorMapping.NoError);
13     }
14     public ByteBufferMessageSet(Message...messages) {
15         this(CompressionCodec.NoCompressionCodec,messages);
16     }

由传入参数可知,前两个构造函数为consumer使用,后两个为producer使用。在讲解producer的使用时,有讲到一个配置参数serializer.class,它的作用是将ProducerData<K,V>中的K类对象转化为Message,也就是这里构造函数的传入参数Message。

ByteBufferMessageSet的一个重要接口是遍历消息数据,即其iterator()方法,其实现这里不详细讲了,原理简单和大家说一下。前面提到过一个Message可能是多条消息数据缩后构成的,所以在遍历的时候便存在一个是否要遍历压缩的Message中每条消息数据的问题,其由isShallow参数决定:true不遍历,false遍历。ByteBufferMessageSet的iterator方法是调用的是return internalIterator(false);,是会遍历包括压缩Message中的所有消息数据的。实现方式是通过topIter遍历一级Message,当遇到压缩的Message时,将其解压缩并且用innerIter记录其遍历情况,当遍历结束后,回到topIter继续遍历。

ByteBufferMessageSet的writeTo(Channel)的方法代码如下,将数据写入指定的channel。这里的channel是FileChannel,即该方法的调用时机是broker写数据文件,读者独到源码时可以看到。

1 public long writeTo(GatheringByteChannel channel, long offset, long maxSize) throws IOException {
2         buffer.mark();
3         int written = channel.write(buffer);
4         buffer.reset();
5         return written;
6     }

FileMessageSet

该类主要由broker使用,我们来看下它的构造函数:

 1 public FileMessageSet(FileChannel channel, long offset, long limit, //
 2             boolean mutable, AtomicBoolean needRecover) throws IOException {
 3         super();
 4         this.channel = channel;
 5         this.offset = offset;
 6         this.mutable = mutable;
 7         this.needRecover = needRecover;
 8         if (mutable) {
 9             if (limit < Long.MAX_VALUE || offset > 0) throw new IllegalArgumentException(
10                     "Attempt to open a mutable message set with a view or offset, which is not allowed.");
11 
12             if (needRecover.get()) {
13                 // set the file position to the end of the file for appending messages
14                 long startMs = System.currentTimeMillis();
15                 long truncated = recover();
16                 logger.info("Recovery succeeded in " + (System.currentTimeMillis() - startMs) / 1000 + " seconds. " + truncated + " bytes truncated.");
17             } else {
18                 setSize.set(channel.size());
19                 setHighWaterMark.set(getSizeInBytes());
20                 channel.position(channel.size());
21             }
22         } else {
23             setSize.set(Math.min(channel.size(), limit) - offset);
24             setHighWaterMark.set(getSizeInBytes());
25         }
26     }
27 
28     public FileMessageSet(FileChannel channel, boolean mutable) throws IOException {
29         this(channel, 0, Long.MAX_VALUE, mutable, new AtomicBoolean(false));
30     }
31 
32     public FileMessageSet(File file, boolean mutable) throws IOException {
33         this(Utils.openChannel(file, mutable), mutable);
34     }
35 
36     public FileMessageSet(FileChannel channel, boolean mutable, AtomicBoolean needRecover) throws IOException {
37         this(channel, 0, Long.MAX_VALUE, mutable, needRecover);
38     }
39 
40     public FileMessageSet(File file, boolean mutable, AtomicBoolean needRecover) throws IOException {
41         this(Utils.openChannel(file, mutable), mutable, needRecover);
42     }

由第一个构造函数可知,构造一个FileMessageSet需要以下几点:

  • FileChannel,即打开一个文件,并且指明是否是mutable(可写)。
  • offset,读入文件的起始位置
  • limit,读入文件的大小
  • mutable,是否可写

fileChannel打开的文件即为jafka文件,其中存储着message,存储的格式与MessageSet是相同的,也是message首尾相连存储。FileMessageSet的遍历比较简单,顺序从channel中读取出来组装成MessageAndOffset即可,这里没有考虑Message是否压缩,原因应该是没有使用的需求。代码就不贴了,大家可以自己去阅读iterator方法。

FileMessageSet的writeTo方法要特别强调一下,之前我们有提到jafka使用了sendfile这个高级系统调用,大大提升了传输效率,对应代码就在这里。

1 public long writeTo(GatheringByteChannel destChannel, long writeOffset, long maxSize) throws IOException {
2         return channel.transferTo(offset + writeOffset, Math.min(maxSize, getSizeInBytes()), destChannel);
3     }

FileMessageSet还有两个方法与消息数据的持久化有关。

 1 //将messages添加如当前的messageOffset
 2  public long[] append(MessageSet messages) throws IOException {
 3         checkMutable();
 4         long written = 0L;
 5         while (written < messages.getSizeInBytes())
 6             written += messages.writeTo(channel, 0, messages.getSizeInBytes());
 7         long beforeOffset = setSize.getAndAdd(written);
 8         return new long[] {  written,beforeOffset };
 9     }
10 
11 //flush消息到磁盘
12     public void flush() throws IOException {
13         checkMutable();
14         long startTime = System.currentTimeMillis();
15         channel.force(true);
16         long elapsedTime = System.currentTimeMillis() - startTime;
17         LogFlushStats.recordFlushRequest(elapsedTime);
18         logger.debug("flush time " + elapsedTime);
19         setHighWaterMark.set(getSizeInBytes());
20         logger.debug("flush high water mark:" + highWaterMark());
21     }

第一个函数的作用便是将producer传递过来的messages添加到当前messageset对象(channel)中,虽然调用了writeTo方法,但是由于操作系统缓冲的存在,数据可能还没有真正写入磁盘,而flush方法的作用便是强制写磁盘。这两个方法便完成了消息数据持久化到磁盘的操作。 另外FileMessageSet还提供了read方法,可以读取指定offset到offset+limit的所有消息,这里就不贴代码了。

Request(com.sohu.jafka.network)

Request是producer consumer向broker发出请求的封装类,下图是其简单的类图: request_clz

用户只要实现Request接口,便可以添加自己的Request类,来实现自己的需求。我们来看下Request相关的源码为:

 1 public interface Request extends ICalculable {
 2 
 3 	//请求类型,enum
 4     RequestKeys getRequestKey();
 5 	//将该request写入buffer,是序列化的过程,从这里可以看到request的协议格式
 6     void writeTo(ByteBuffer buffer);
 7 }
 8 //request 的所有类型
 9 public enum RequestKeys {
10     PRODUCE, //0
11     FETCH, //1
12     MULTIFETCH, //2
13     MULTIPRODUCE, //3
14     OFFSETS,//4
15     CREATE,//5
16     DELETE;//6
17 
18     public int value = ordinal();
19 
20     final static int size = values().length;
21 
22     public static RequestKeys valueOf(int ordinal) {
23         if (ordinal < 0 || ordinal >= size) return null;
24         return values()[ordinal];
25     }
26 }

从上面可以看到RequestKeys的类型与类图中Request的子类是一一对应的,Request在传送时通过writeTo(ByteBuffer)将自身序列化,有自己的协议格式,这里以ProducerRequest举例,其发送时,ProducerRequest会被封装在一个BoundedByteBufferSend类中,该类会在字节序列中添加消息总长度和request类型这两个基本信息,最终producerRequest的协议格式如下:

字节数(Byte) 含义
4 消息长度,length
2 Request类型,对应RequestKeys
2 topic length
x topic
4 partition
4 messageset length
x messageset

当request传递到broker时,在上一篇文章中我们分析processor源码时,曾经提到过的Send handle(SelectionKey key, Receive request)方法,其中有以下代码:

final short requestTypeId = request.buffer().getShort();

final RequestKeys requestType = RequestKeys.valueOf(requestTypeId);

其中request封装了客户端传递来的request字节序列,此处先读取2个字节,获取request的类型,然后选取对应的handler来处理,感兴趣的读者可以自行去查看相应的代码。

其他Request的协议类型,大家可以自行去查看其writeTo方法,这里就不逐个列举了。

Send(com.sohu.jafka.network)

Send类用于封装发送数据,调用自身的write方法,将数据发送到目的地。其类图如下:

send_clz_diagram

我们先来看下Send的源码:

 1 public interface Send extends Transmission {
 2 
 3 //将数据写入channel,返回写入的数据大小。
 4     int writeTo(java.nio.channels.GatheringByteChannel channel) throws IOException;
 5 
 6 //数据过大时,要分多次写才能完整发送数据,这便是此方法的作用。
 7     int writeCompletely(java.nio.channels.GatheringByteChannel channel) throws IOException;
 8 }
 9 
10 public abstract class AbstractSend extends AbstractTransmission implements Send {
11     public int writeCompletely(GatheringByteChannel channel) throws IOException {
12         int written = 0;
13         while(!complete()) {
14             written += writeTo(channel);
15         }
16         return written;
17     }
18 
19 }

AbstractSend实现了writeCompletely方法,实现也很简单,循环检测是否complete(),直到全部写完再返回,这也是实际使用中调用的方法,setCompleted()实际在writeTo方法中被调用。

从Send子类的类名中我们可以看出其send的类,比如MessageSetSend是封装并发送MessageSet的,ByteBufferSend是发送ByteBuffer的,BoundedByteBufferSend类会在传入ByteBuffer对象前面添加4个字节的length,它也可以封装Request对象,前面讲解ProducerRequest格式的时候有提到这一点,此时它还会添加2个字节的request类型数据。send的使用是很简单的,producer发送数据时会用到如下的代码:

 1 BoundedByteBufferSend send = new BoundedByteBufferSend(request);
 2 ...
 3     getOrMakeConnection();
 4     int written = -1;
 5     try {
 6         written = send.writeCompletely(channel);
 7     } catch (IOException e) {
 8         // no way to tell if write succeeded. Disconnect and re-throw exception to let client handle retry
 9         disconnect();
10         throw new RuntimeException(e);
11     } finally {
12         if (logger.isDebugEnabled()) {
13             logger.debug(format("write %d bytes data to %s:%d", written, host, port));
14         }
15     }

上述代码可以看到,使用简单,先构造一个Send对象,然后调用其writeCompletely方法即可将数据写入到对应的channel了。Send每个子类这里就不详细介绍了,留给大家自己去研究。

Receive(com.sohu.jafka.network)

Receive类负责从socket中接收数据,其类图如下:

receive_clz

我们来看看Receive的源码。

1 public interface Receive extends Transmission {
2 //返回读取到的数据,不包括表示length的4个Byte
3     ByteBuffer buffer();
4 //由channel读取数据
5     int readFrom(ReadableByteChannel channel) throws IOException;
6 //数据过多时,要多次read才可以,此方法保证一次性读取所有数据
7     int readCompletely(ReadableByteChannel channel) throws IOException;
8 }

具体方法的含义,大家可以看代码中的注释,其方法与Send接口正好是相对应的,一个读,一个写。BoundedByteBufferReceive是实际中使用的类,其实现也很简单,一个4字节的sizeBuffer读取消息长度,一个contentBuffer用于读取实际的数据,代码这里就不贴了。

小结

本文主要讲解了Jafka运行中producer consumer与broker之间通信所遵循的协议及其相关的类,希望对大家理解有所帮助。