Jafka源码阅读之Consumer 15 August 2012
本文将介绍Jafka中Consumer的源码框架,在之前consumer的使用教程中已经讲过,消息消费有同步和异步两种方式,我们针对这两种方式分别讲解其源码实现。
同步消费源码实现
同步消费的代码如下:
1 SimpleConsumer consumer = new SimpleConsumer("127.0.0.1",9092);
2 long offset = 0;
3 //指定fetch请求的参数:topic partition offset maxSize
4 FetchRequest fetch = new FetchRequest("person",0,offset,1024*1024);
5 try {
6 //获取消息数据集合
7 ByteBufferMessageSet messageSet = consumer.fetch(fetch);
8 //遍历消息数据集合
9 for(MessageAndOffset messageAndOffset:messageSet){
10 //从消息中获取代表实际数据的byte数组
11 ByteBuffer buffer = messageAndOffset.message.payload();
12 byte[] objBytes = new byte[buffer.remaining()];
13 buffer.get(objBytes);
14 //反序列化字节数组为对象
15 ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(objBytes));
16 Person tmpPerson = (Person)ois.readObject();
17 System.out.println("person:"+tmpPerson.getName()+","+tmpPerson.getId());
18 ois.close();
19 }
20 } catch (IOException e) {
21 e.printStackTrace();
22 } catch (ClassNotFoundException e) {
23 e.printStackTrace();
24 }
上述代码的逻辑很简单:
-
初始化一个SimpleConsumer对象,传入broker的url和端口号。
-
创建一个FetchRequest,传入topic、partition、起始offset值和要抓取的最大字节数。
-
调用consumer的fetch方法,返回ByteBufferMessageSet,该类中封装了获取的消息。
-
遍历ByteBufferMessageSet中的消息,进行处理
下面我们来看下consumer.fetch方法的时序图。
我们结合时序图来说明下consumer fetch的过程。
- SimpleConsumer继承了SimpleOperation,consumer.fetch会调用其继承下来的send方法(
2
),send方法的代码如下:
1 public KV<Receive, ErrorMapping> send(Request request) throws IOException {
2 return new SimpleCommand(request).run();
3 }
其创建了一个SimpleCommand对象(2.1
),并调用其run方法(2.2
),返回了一个<key,value>对,key是一个Receive对象,其中包含了取得的消息数据。我们来看下其run方法。
1 public KV<Receive, ErrorMapping> run() throws IOException {
2 synchronized (lock) {
3 //建立到broker的连接
4 getOrMakeConnection();
5 try {
6 //发送请求
7 sendRequest(request);
8 //获取返回值
9 return getResponse();
10 } catch (IOException e) {
11 logger.info("Reconnect in fetch request due to socket error:", e);
12 try {
13 channel = connect();
14 sendRequest(request);
15 return getResponse();
16 } catch (IOException e2) {
17 throw e2;
18 }
19 }
20 //
21 }
22 }
上述代码的逻辑很简单:建立到broker的连接;发送请求;获取返回值。getOrMakeConnection
在consumer和broker之间建立阻塞连接,这里就不呈现其源码了。我们来看下sendRequest的源码。
1 protected void sendRequest(Request request) throws IOException {
2 new BoundedByteBufferSend(request).writeCompletely(channel);
3 }
sendRequest方法新建了一个BoundedByteBufferSend对象,并调用其writeCompletely方法,将request的内容发送到了broker。关于send的知识,在这里讲过,就不再赘述了。至此,fetch的请求就发送出去了,那么来看getResponse的代码。
1 protected KV<Receive, ErrorMapping> getResponse() throws IOException {
2 BoundedByteBufferReceive response = new BoundedByteBufferReceive();
3 response.readCompletely(channel);
4 return new KV<Receive, ErrorMapping>(response, ErrorMapping.valueOf(response.buffer().getShort()));
5 }
getResponse首先创建了一个BoundedByteBufferReceive对象,然后调用其readCompletely方法,从channel中将收到的数据读取到response中,并作为key返回。fetch方法最终的返回值是一个ByteBufferMessageSet对象,它封装了response.k.buffer()
,buffer()返回的便是broker传回给consumer的结果。至此,consumer的fetch过程也就结束了。
异步消费源码实现
同步消费是单线程阻塞完成的,从其代码调用上,我们可以看到其api封装比较靠近底层,对使用者不友好,一般没有特殊需求不推荐使用。异步消费封装了对用户友好的api,在同步消费的基础上实现了多线程消费数据的功能。我们先来理一下它的相关代码:
1 Properties props = new Properties();
2 //指明zookeeper地址
3 props.setProperty("zk.connect","localhost:2181");
4 //指明consumer group的名字
5 props.setProperty("groupid","test_group");
6 ConsumerConfig config = new ConsumerConfig(props);
7 //创建了ZookeeperConsumerConnector,连接zookeeper,获取当前topic的数据信息
8 ConsumerConnector connector = Consumer.create(config);
9 //指明每一个topic的消费线程数
10 Map<String,Integer> topicCountMap = new HashMap<String, Integer>();
11 topicCountMap.put("hehe",2);
12 topicCountMap.put("hehe3",1);
13 //创建消费消息流,key为topic,value为MessageStream的list,大小为上面map中指定的大小
14 Map<String,List<MessageStream<String>>> streams = connector.createMessageStreams(topicCountMap,new StringDecoder());
15 List<MessageStream<String>> messageStreamList = streams.get("hehe");
16 messageStreamList.addAll(streams.get("hehe3"));
17 final AtomicInteger count = new AtomicInteger(0);
18 final AtomicInteger streamCount = new AtomicInteger(0);
19 //创建线程池,该线程池数目必须不小于上面所有的消费线程数
20 ExecutorService executor = Executors.newFixedThreadPool(3);
21 //提交消费任务,开始消费消息
22 for(final MessageStream<String> stream:messageStreamList){
23 executor.execute(new Runnable() {
24 @Override
25 public void run() {
26 int threadNum = streamCount.incrementAndGet();
27 //从stream中获取消息,此处为阻塞式消费,即当没有新消息到来时,阻塞直到新消息到来或者线程结束
28 //通过BlockingQueue实现,后续内容会详细讲解
29 for(String msg:stream){
30 System.out.println("stream#"+threadNum+":msg#"+count.incrementAndGet()+"=>"+msg);
31 }
32 }
33 });
34 }
35 try {
36 executor.awaitTermination(1, TimeUnit.HOURS);
37 } catch (InterruptedException e) {
38 e.printStackTrace();
39 }
其基本的流程如下:
-
构建配置信息,将consumer的配置写入其中,这里配置了zookeeper的连接地址和consumer group的名字,这是最基本的配置。
-
根据配置创建ConsumerConnector,由名字可猜测该对象负责consumer连接zk、broker并获取数据的工作。这里使用的是其子类ZookeeperConsumerConnector。
-
指明要消费的topic以及并行消费的线程数目,可以指定多个topic。
-
使用connector来创建消息消费流(MessageStream),这里
List<MessageStream>
的数目是由上面配置的线程数来决定的。 -
每一个MessageStream中包含了从broker传来的消息,遍历它便可以获取数据,进行自己的操作
这个流程虽然有些复杂,但使用者只要了解了并行消费的特点,应该可以很好地使用。我们来看下其时序图:
是不是看的有些晕?没关系,我们来一步步地看。
- Consumer.create方法创建了一个ZookeeperConsumerConnector对象,其初始化函数如下:
1 public ZookeeperConsumerConnector(ConsumerConfig config) {
2 this(config, true);
3 }
4
5 public ZookeeperConsumerConnector(ConsumerConfig config, boolean enableFetcher) {
6 this.config = config;
7 this.enableFetcher = enableFetcher;
8 //
9 this.topicRegistry = new Pool<String, Pool<Partition, PartitionTopicInfo>>();
10 this.queues = new Pool<StringTuple, BlockingQueue<FetchedDataChunk>>();
11 //
12 // 建立到zookeeper的连接
13 connectZk();
14 //创建一个Fetcher,用于抓取数据
15 //todo:此处fetcher在consumer确定要消费的partition后,调用 fetcher.startConnection,开始连接到broker,抓取数据
16 createFetcher();
17 if (this.config.isAutoCommit()) {
18 logger.info("starting auto committer every " + config.getAutoCommitIntervalMs() + " ms");
19 //启动自动提交消费offset的线程
20 scheduler.scheduleWithRate(new AutoCommitTask(), config.getAutoCommitIntervalMs(),
21 config.getAutoCommitIntervalMs());
22 }
23 }
这里有几个重要的变量介绍一下:
topicRegistry:记录了该consumer消费的所有topic对应的partition,从中可以获取消费的当前offset等等(存储在PartitionTopicInfo中),本身是一个map结构,内容为<topic,<partition,partitionTopicInfo»。
queues:StringTuple为一个二元组,这里就是<String,String>类型,queues也是一个map结构,其内容为<(topic,threadId),dataInfoList)。其value中存储了该topic下的消费线程threadId从broker拉取到的数据,在实时抓取的应用场景下,当有消息到达broker后,该consumer便会间隔一段时间后将数据获取,填入dataInfoList,之后返回给前台用户来消费就可以了。
该connector还建立了到zookeeper的连接,以备后面读取zk上注册的信息,又建立了一个fetcher,为之后建立到broker的连接做好准备。最后,如果用户配置了自动将消费的offset信息提交到zookeeper存储的话,这里会开启一个后台线程,定期地将本consumer的消费信息(offset值)保存到zookeeper。
- createMessageStreams是connector的关键函数,它负责创建consumer的多个消费线程,拉取数据。再讲解其源码之前,我们来考虑其实现的几个关键问题,然后带着这些问题去看源码,会大大提高效率。
1.consumer连接了zookeeper,它从zookeeper获取了哪些信息?
我们可以先来猜测下,每个topic的具体数据是存放在各个broker中的,并且以broker-partition的形式存储在broker中,那么consumer要消费某个topic就一定要获取这个topic存放的broker,以及这个broker上该topic的broker-partition列表,另外该consumer还应该直到它所在consumer group下有哪些个consumer,保证自己分配到的broker-partition不与其他consumer冲突。总结一下:broker信息、要消费topic的所有broker-partition、同一组的其他consumer。实际上是否如此哪?让我们到源码中去验证。
2.consumer是如何获取自己可以请求数据的broker-partition列表的?或者说consumer group是如何分配某topic下所有的broker-partition给其下多个consumer的?
3.consumer获取自己的broker-partition列表后是何时建立到broker的连接,以及如何实时获取新数据的?
下面我们在源码中找答案。createMessageStreams调用了自身的consume方法,这个方法有些长,如下:
1 private <T> Map<String, List<MessageStream<T>>> consume(Map<String, Integer> topicCountMap, Decoder<T> decoder) {
2 if (topicCountMap == null) {
3 throw new IllegalArgumentException("topicCountMap is null");
4 }
5 //初始化zk上consumer相关的路径名称
6 ZkGroupDirs dirs = new ZkGroupDirs(config.getGroupId());
7 //<topic,msgStreamList>
8 Map<String, List<MessageStream<T>>> ret = new HashMap<String, List<MessageStream<T>>>();
9 String consumerUuid = config.getConsumerId();
10 if (consumerUuid == null) {
11 //自动生成consumerUuid=> hostname-currenttime-uuid.sub(8)
12 consumerUuid = generateConsumerId();
13 }
14 logger.info(format("create message stream by consumerid [%s] with groupid [%s]", consumerUuid,
15 config.getGroupId()));
16 //
17 //consumerIdString => groupid_consumerid
18 final String consumerIdString = config.getGroupId() + "_" + consumerUuid;
19 final TopicCount topicCount = new TopicCount(consumerIdString, topicCountMap);
20 for (Map.Entry<String, Set<String>> e : topicCount.getConsumerThreadIdsPerTopic().entrySet()) {
21 final String topic = e.getKey();
22 final Set<String> threadIdSet = e.getValue();
23 final List<MessageStream<T>> streamList = new ArrayList<MessageStream<T>>();
24 for (String threadId : threadIdSet) {
25 LinkedBlockingQueue<FetchedDataChunk> stream = new LinkedBlockingQueue<FetchedDataChunk>(
26 config.getMaxQueuedChunks());
27 queues.put(new StringTuple(topic, threadId), stream);
28 streamList.add(new MessageStream<T>(topic, stream, config.getConsumerTimeoutMs(), decoder));
29 }
30 ret.put(topic, streamList);
31 logger.debug("adding topic " + topic + " and stream to map.");
32 }
33 //
34 //listener to consumer and partition changes
35 ZKRebalancerListener<T> loadBalancerListener = new ZKRebalancerListener<T>(config.getGroupId(),
36 consumerIdString, ret);
37 this.rebalancerListeners.add(loadBalancerListener);
38 loadBalancerListener.start();
39 registerConsumerInZK(dirs, consumerIdString, topicCount);
40
41 //register listener for session expired event
42 zkClient.subscribeStateChanges(new ZKSessionExpireListener<T>(dirs, consumerIdString, topicCount,
43 loadBalancerListener));
44 //监控consumer的变化
45 zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener);
46
47 for (String topic : ret.keySet()) {
48 //register on broker partition path changes
49 final String partitionPath = ZkUtils.BrokerTopicsPath + "/" + topic;
50 zkClient.subscribeChildChanges(partitionPath, loadBalancerListener);
51 }
52
53 loadBalancerListener.syncedRebalance();
54 return ret;
55 }
这个方法列出了异步消费的主干代码,我们来看看都做了什么:
-
创建ZkGroupDirs对象,内部封装了zookeeper的几个路径,如:/consumers/groups/[groupid]等,方便后面调用。之后获取consumerId,
-
获取或者系统生成consumerId,一般交由系统生成,格式为groupId_hostname-currenttime-uuid.sub(8)(uuid为调用函数库生成的uuid)。
-
创建TopicCount对象,调用其getConsumerThreadIdsPerTopic方法会根据topicCountMap为每个topic及其线程数生成每个consumerThreadId的名称,规则就是consumerId后面加”-“和线程数,以此作为这些消费线程的唯一性标识。其返回的数据是map类型,诸如<topic,[groupid_consumerid-0,groupid_consumerid-1,groupid_consumerid-2]>这种形式。
-
遍历上述map数据,将每个topic的consumerThreadId列表转换为MessageStream,并添加返回值ret中。大家注意,MessageStream出现了,这是API中出现过的类,我们来简单看下它的源码。
1 public class MessageStream<T> implements Iterable<T> {
2
3 final String topic;
4
5 final BlockingQueue<FetchedDataChunk> queue;
6
7 final int consumerTimeoutMs;
8
9 final Decoder<T> decoder;
10
11 private final ConsumerIterator<T> consumerIterator;
12
13 public MessageStream(String topic, BlockingQueue<FetchedDataChunk> queue, int consumerTimeoutMs, Decoder<T> decoder) {
14 super();
15 this.topic = topic;
16 this.queue = queue;
17 this.consumerTimeoutMs = consumerTimeoutMs;
18 this.decoder = decoder;
19 this.consumerIterator = new ConsumerIterator<T>(topic, queue, consumerTimeoutMs, decoder);
20 }
21
22 public Iterator<T> iterator() {
23 return consumerIterator;
24 }
25
26 /**
27 * This method clears the queue being iterated during the consumer
28 * rebalancing. This is mainly to reduce the number of duplicates
29 * received by the consumer
30 */
31 public void clear() {
32 consumerIterator.clearCurrentChunk();
33 }
34 }
可以看到每一个MessageStream对应一个topic,还有自己的数据队列queue,而迭代的实现是通过ConsumerIterator,这个迭代器的实现就不细讲了,它的功能就是不断地从queue中取出数据,这里T就是我们发送消息时指定的数据类型,一般都是用String,比如我们上面举的例子。有了这个迭代器,便有了代码中for(String msg:stream)
类似的代码。
MessageStream的实现是简单的,可是这最后的返回值都构造好了,我们上面提到的3个问题却还是没有答案,那我们就接着往下看吧。
-
35-51行是与zookeeper有关的代码,创建了一个负载均衡的listener,将该consumer注册到zookeeper上,监听consumer和broker的变化,变化时触发负载均衡的listener。
-
后面调用了loadbalancer的syncedRebalance方法,然后就return ret了。那我们那3个问题的答案就只能在这个方法里了。我们来看下这个方法的代码:
1 public void syncedRebalance() {
2 synchronized (rebalanceLock) {
3 for (int i = 0; i < config.getMaxRebalanceRetries(); i++) {
4 if (isShuttingDown.get()) {//do nothing while shutting down
5 return;
6 }
7 logger.info(format("[%s] rebalancing starting. try #%d", consumerIdString, i));
8 final long start = System.currentTimeMillis();
9 boolean done = false;
10 //读取所有的broker
11 Cluster cluster = ZkUtils.getCluster(zkClient);
12 try {
13 done = rebalance(cluster);
14 } catch (Exception e) {
15 logger.info("exception during rebalance ", e);
16 }
17 ...
18 }
19 }
20
21 private boolean rebalance(Cluster cluster) {
22 //以topic做key组织threadId---当前consumer
23 Map<String, Set<String>> myTopicThreadIdsMap = ZkUtils.getTopicCount(zkClient, group, consumerIdString)
24 .getConsumerThreadIdsPerTopic();
25 //以topic做key组织threadid---所有的consumer
26 //这里的consumer应该是每一个thread,即线程级别的
27 Map<String, List<String>> consumersPerTopicMap = ZkUtils.getConsumersPerTopic(zkClient, group);
28 //topic的broker-partition列表
29 Map<String, List<String>> brokerPartitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient,
30 myTopicThreadIdsMap.keySet());
31 //关闭当前抓取线程,否则会造成消息重复消费
32 closeFetchers(cluster, messagesStreams, myTopicThreadIdsMap);
33 //释放该consumer对topic对应broker-partition的绑定,因为重新分配后,该broker-partition可能由其他consumer处理
34 releasePartitionOwnership(topicRegistry);
35 Map<StringTuple, String> partitionOwnershipDecision = new HashMap<StringTuple, String>();
36 Pool<String, Pool<Partition, PartitionTopicInfo>> currentTopicRegistry = new Pool<String, Pool<Partition, PartitionTopicInfo>>();
37 //遍历当前consumer的topic下面的所有topicThread
38 for (Map.Entry<String, Set<String>> e : myTopicThreadIdsMap.entrySet()) {
39 final String topic = e.getKey();
40 currentTopicRegistry.put(topic, new Pool<Partition, PartitionTopicInfo>());
41 ZkGroupTopicDirs topicDirs = new ZkGroupTopicDirs(group, topic);
42 //消费该topic的所有consumer列表
43 List<String> curConsumers = consumersPerTopicMap.get(topic);
44 //该topic下所有的broker-partition列表
45 List<String> curBrokerPartitions = brokerPartitionsPerTopicMap.get(topic);
46 //计算每个consumer要分配的partition的数目
47 final int nPartsPerConsumer = curBrokerPartitions.size() / curConsumers.size();
48 //计算平均分配后多出的partition数目
49 final int nConsumersWithExtraPart = curBrokerPartitions.size() % curConsumers.size();
50
51 //consumerThreadId=> groupid_consumerid-index (index from count)
52 //遍历当前consumer的所有stream
53 for (String consumerThreadId : e.getValue()) {
54 final int myConsumerPosition = curConsumers.indexOf(consumerThreadId);
55 assert (myConsumerPosition >= 0);
56 final int startPart = nPartsPerConsumer * myConsumerPosition + Math.min(myConsumerPosition,
57 nConsumersWithExtraPart);
58 final int nParts = nPartsPerConsumer + ((myConsumerPosition + 1 > nConsumersWithExtraPart) ? 0 : 1);
59
60 if (nParts <= 0) {
61 logger.warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic);
62 } else {
63 for (int i = startPart; i < startPart + nParts; i++) {
64 //获取对应的broker-partition
65 String brokerPartition = curBrokerPartitions.get(i);
66 logger.info("[" + consumerThreadId + "] ==> " + brokerPartition + " claimming");
67
68 addPartitionTopicInfo(currentTopicRegistry, topicDirs, brokerPartition, topic,
69 consumerThreadId);
70 partitionOwnershipDecision.put(new StringTuple(topic, brokerPartition), consumerThreadId);
71 }
72 }
73 }
74 }
75 //将任务分配的结果注册到zookeeper上
76 if (reflectPartitionOwnershipDecision(partitionOwnershipDecision)) {
77 logger.debug("Updating the cache");
78 logger.debug("Partitions per topic cache " + brokerPartitionsPerTopicMap);
79 logger.debug("Consumers per topic cache " + consumersPerTopicMap);
80 //记录当前consumer每个topic下面的其消费的broker-partition的信息<topic,<broker-partition,offset...>>
81 topicRegistry = currentTopicRegistry;
82 //启动fetcher抓取信息
83 updateFetcher(cluster, messagesStreams);
84 return true;
85 } else {
86 return false;
87 }
88 }
89
90
这个方法有些长,中间省去了非关键代码,我们来看下它的流程。
-
11行:ZkUtils.getCluster方法,从zookeeper中获取所有的broker信息,之后调用rebalance方法。
-
23行-24: ZkUtils.getTopicCount.getConsumerThreadIdsPerTopic方法,从zookeeper中获取之前注册的当前consumer下每个topic的消费threadId
-
27行: 从zookeeper上获取所有topic对应的consumerThreadId,即获取同group的其他consumer信息。
-
29-30行: 从zookeeper中获取每个topic的broker-partition列表,要注意的是此处getPartitionsForTopics方法中有
Collections.sort(partList);
,这就意味着该列表是有序的,其顺序如下:0-0,0-1,0-2,1-0,1-1,2-0,2-1;保证同broker的在也一起,这样分配的时候也能保证同一个broker的partition分配给一个consumer,减少连接数目。
至此,第一个问题显然已经解决了,与我们预料的基本相同。
-
接下来关闭当前所有的抓取线程,防止出现消息重复消费的情况。因为重新分配后,该broker-partition可能被其他consumer消费,而如果该fetcher不停的话,它依然会去消费该broker-partition的数据。之后释放该consumer关联的broker-partition。
-
38-74行:该循环即是负载均衡的主体,其实现理念可以参见前面consumer的文章,实现代码也清晰明了。这里简单说一下addPartitionTopicInfo方法,该方法将consumerThreadId与partition关联在一起,方法为构建一个PartitionTopiCInfo对象,该对象包含了topic partition queue(数据队列) offset等信息,offset的确定也是在该方法中进行的。如果是第一次读,根据用户的配置,是从最近的消息获取,还是从最早的消息获取,否则从上一次消费的位置继续消费。但这里也只是完成了分配,即第二个问题解决了,那取数据的操作是在哪里完成的?
-
76-83行:首先将分配的信息注册到zookeeper上,然后调用了
updateFetcher
方法,我们来看下它的源码。
1 private void updateFetcher(Cluster cluster, Map<String, List<MessageStream<T>>> messagesStreams2) {
2 if (fetcher != null) {
3 List<PartitionTopicInfo> allPartitionInfos = new ArrayList<PartitionTopicInfo>();
4 for (Pool<Partition, PartitionTopicInfo> p : topicRegistry.values()) {
5 allPartitionInfos.addAll(p.values());
6 }
7 //创立连接
8 fetcher.startConnections(allPartitionInfos, cluster, messagesStreams2);
9 }
10 }
11
12
该方法首先从topicRegistry中获取了该consumer下所有的PartitionTopicInfo对象,该对象中包含了拉取数据所需要的所有信息,如broker offset queue等等。之后调用fetch.startConnections方法。从名字上我们也知道,第三个问题的答案找到了,我们还是来看一下这个方法的源码吧!
1 public <T> void startConnections(Iterable<PartitionTopicInfo> topicInfos,Cluster cluster,//
2 Map<String,List<MessageStream<T>>> messageStreams){
3 if(topicInfos == null) {
4 return;
5 }
6
7 //re-arrange by broker id
8 //<brokerid,partition>
9 Map<Integer, List<PartitionTopicInfo>> m = new HashMap<Integer, List<PartitionTopicInfo>>();
10 for (PartitionTopicInfo info : topicInfos) {
11 if (cluster.getBroker(info.brokerId) == null) {
12 throw new IllegalStateException("Broker " + info.brokerId + " is unavailable, fetchers could not be started");
13 }
14 List<PartitionTopicInfo> list = m.get(info.brokerId);
15 if (list == null) {
16 list = new ArrayList<PartitionTopicInfo>();
17 m.put(info.brokerId, list);
18 }
19 list.add(info);
20 }
21 //
22 final List<FetcherRunnable> fetcherThreads = new ArrayList<FetcherRunnable>();
23 //创建到所有broker的连接,启动线程,开始定时抓取数据
24 for(Map.Entry<Integer, List<PartitionTopicInfo>> e:m.entrySet()) {
25 FetcherRunnable fetcherThread = new FetcherRunnable("FetchRunnable-"+e.getKey(), //
26 zkClient, //
27 config, //
28 cluster.getBroker(e.getKey()), //
29 e.getValue());
30 fetcherThreads.add(fetcherThread);
31 fetcherThread.start();
32 }
33 //
34 this.fetcherThreads = fetcherThreads;
35 }
36
该方法主要两个步骤:按brokerId划分所有的PartitionTopicInfo对象和为每个broker开启一个线程来拉取数据。FetcherRunnable的代码这里就不贴了,感兴趣的读者可以自己去看,它将PartitionTopicInfo对象列表中的信息封装成一个MultiFetchRequest对象,然后调用SimpleConsumer的multiFetch方法来拉取数据,获得数据后将其添加到PartitionTopicInfo的queue中(也就添加到了MessageStream的queue中,这样前台就能立即消费数据了),之后更新其fetchOffSet等信息,继续抓取数据,如果抓取不到数据,则睡眠一段时间再去抓取,这样便实现了即时消费的功能。
至此三个问题都解决了,大家对consumer的代码框架应该也了然于胸了吧。
小结
本文主要讲解了jafka中同步消费和异步消费的源码实现,异步消费是建立在同步消费基础上的高等api,实现了并行消费消息的功能,其实现方式值得大家好好研究。本文只是尝试讲解其代码的脉络,希望对大家有所帮助。