本文笔者会尝试给大家讲解producer的源码脉络,希望对大家有所帮助。

在讲解producer使用的文章中,有如下代码,我们就从这里开始。

 1 Properties props = new Properties();
 2 //指明获取发送地点的地址
 3 props.setProperty("zk.connect","localhost:2181");
 4 props.setProperty("serializer.class", StringEncoder.class.getName());
 5 Producer<String,String> producer = new Producer<String, String>(new  ProducerConfig(props));
 6 for(int i=0;i<1000;i++){
 7 	//构造消息并发送
 8 	producer.send(new StringProducerData("hehe","hehe-data"+i));
 9 }
10 producer.close();

我们来看下producer.send的时序图:

jafka_producer

下面我们结合上面的时序图和部分源码和大家说明producer调用send后发生了哪些事情。

  • 调用send方法后,会根据是否启用zookeeper来决定调用zkSend或者configSend,这里采用zkSend(1.1)进行说明。
 1 private void zkSend(ProducerData<K, V> data) {
 2  int numRetries = 0;
 3  Broker brokerInfoOpt = null;
 4  Partition brokerIdPartition = null;
 5  //由zookeeper中获取可用的broker-partition列表
 6  while (numRetries <= config.getZkReadRetries() && brokerInfoOpt == null) {
 7      if (numRetries > 0) {
 8          logger.info("Try #" + numRetries + " ZK producer cache is stale. Refreshing it by reading from ZK again");
 9          brokerPartitionInfo.updateInfo();
10      }
11      List<Partition> partitions = new ArrayList<Partition>(getPartitionListForTopic(data));
12      //选择传递消息的broker-partition:随机或者依据用户指定的partitioner
13      brokerIdPartition = partitions.get(getPartition(data.getKey(), partitions.size()));
14      if (brokerIdPartition != null) {
15          brokerInfoOpt = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.brokerId);
16      }
17      numRetries++;
18  }
19  if (brokerInfoOpt == null) {
20      throw new NoBrokersForPartitionException("Invalid Zookeeper state. Failed to get partition for topic: " + data.getTopic() + " and key: "
21              + data.getKey());
22  }
23  //封装现有数据为ProducerPoolData对象
24  ProducerPoolData<V> ppd = producerPool.getProducerPoolData(data.getTopic(),//
25          new Partition(brokerIdPartition.brokerId, brokerIdPartition.partId),//
26          data.getData());
27  //使用producerPool发送数据
28  producerPool.send(ppd);
29 }

zkSend主要做了以下的事情:

  1. 连接zookeeper获取topic相关可用的broker-partition列表(1.1.1), 然后调用1.1.2 getPartition方法选取一个partition,选取的策略是如果用户配置了partitioner.class,则调用该类选择,否则随机选择一个partition。

  2. 将data和partition封装为ProducerPoolData对象(1.1.3),之后调用producerPool的send方法(1.1.4)。该方法源码如下:

 1 public void send(ProducerPoolData<V> ppd) {
 2  //判断同步或异步发送
 3  if (sync) {
 4      //将消息封装成ByteBufferMessageSet,以便序列化为字节数组
 5      Message[] messages = new Message[ppd.data.size()];
 6      int index = 0;
 7      for (V v : ppd.data) {
 8          messages[index] = serializer.toMessage(v);
 9          index++;
10      }
11      ByteBufferMessageSet bbms = new ByteBufferMessageSet(config.getCompressionCodec(), messages);
12      ProducerRequest request = new ProducerRequest(ppd.topic, ppd.partition.partId, bbms);
13      SyncProducer producer = syncProducers.get(ppd.partition.brokerId);
14      if (producer == null) {
15          throw new UnavailableProducerException("Producer pool has not been initialized correctly. " + "Sync Producer for broker "
16                  + ppd.partition.brokerId + " does not exist in the pool");
17      }
18      producer.send(request.topic, request.partition, request.messages);
19  } else {
20      //异步发送,逐个将data发送出去
21      AsyncProducer<V> asyncProducer = asyncProducers.get(ppd.partition.brokerId);
22      for (V v : ppd.data) {
23          asyncProducer.send(ppd.topic, v, ppd.partition.partId);
24      }
25  }
26 }

该方法通过sync来判断是同步发送还是异步发送,如果是同步发送,则最后调用syncProducer(1.1.4.1),否则调用AsyncProducer发送(1.1.4.2)。sync是在初始化Producer时,读取producer.type的设置来确定,如果为async则表明是异步发送。

至此发送消息的过程便完结了,是不是很简单?

syncProducer的创建时机

我们一直没有讲syncProducer是何时建立的,或者说producer是什么时候建立到broker连接的,这是很关键的一部分,因为要没有连接,你的数据就没有传输管道了。其实从producerPool的类名,我们可以猜测这是一个集中了多个producer的池子,调用者依据需要从这个池子中取出producer,然后用它发送数据。那一个合情合理的设计便是这池子里的每一个producer对应一个broker,调用者依据自己发送的broker来获取producer。实际的设计也是这样的。其初始化的代码在producer的构造函数中,如下:

 1 //获取所有的brokerPartition信息
 2  this.zkEnabled = config.getZkConnect() != null;
 3  if (this.brokerPartitionInfo == null) {
 4      if (this.zkEnabled) {
 5          Properties zkProps = new Properties();
 6          zkProps.put("zk.connect", config.getZkConnect());
 7          zkProps.put("zk.sessiontimeout.ms", "" + config.getZkSessionTimeoutMs());
 8          zkProps.put("zk.connectiontimeout.ms", "" + config.getZkConnectionTimeoutMs());
 9          zkProps.put("zk.synctime.ms", "" + config.getZkSyncTimeMs());
10          this.brokerPartitionInfo = new ZKBrokerPartitionInfo(new ZKConfig(zkProps), this);
11      } else {
12          this.brokerPartitionInfo = new ConfigBrokerPartitionInfo(config);
13      }
14  }
15  //建立到所有broker的连接,每个broker对应一个producer,SyncProducer或者AsyncProducer
16  if (this.populateProducerPool) {
17      for (Map.Entry<Integer, Broker> e : this.brokerPartitionInfo.getAllBrokerInfo().entrySet()) {
18          Broker b = e.getValue();
19          producerPool.addProducer(new Broker(e.getKey(), b.host, b.host, b.port));
20      }
21  }

首先从zookeeper中读取broker的相关信息,然后遍历所有的broker,调用producerPool的addProducer方法,建立producer,也就建立到broker的连接,addProducer的源码如下:

 1 public void addProducer(Broker broker) {
 2  Properties props = new Properties();
 3  props.put("host", broker.host);
 4  props.put("port", "" + broker.port);
 5  props.putAll(config.getProperties());
 6  //根据同步异步配置来建立producer
 7  if (sync) {
 8      SyncProducer producer = new SyncProducer(new SyncProducerConfig(props));
 9      logger.info("Creating sync producer for broker id = " + broker.id + " at " + broker.host + ":" + broker.port);
10      syncProducers.put(broker.id, producer);
11  } else {
12      AsyncProducer<V> producer = new AsyncProducer<V>(new AsyncProducerConfig(props),//
13              new SyncProducer(new SyncProducerConfig(props)),//
14              serializer,//
15              eventHandler,//
16              config.getEventHandlerProperties(),//
17              this.callbackHandler, //
18              config.getCbkHandlerProperties());
19      producer.start();
20      logger.info("Creating async producer for broker id = " + broker.id + " at " + broker.host + ":" + broker.port);
21      asyncProducers.put(broker.id, producer);
22  }
23 }

这段代码不难理解,放在这里的目的是让大家注意下创建AsyncProducer时,后面调用了start方法,也就是启动了一个异步发送的线程,后面会详细讲。

同步发送

producerPool的send方法在调用syncProducer发送数据之前,首先对要发送的多条数据进行了封装,将多条数据组装成ByteBufferMessageSet,这个类我们在之前的文章有提到,不熟悉的读者可以去看下,之后又将其传入ProducerRequest,最终调用了syncProducer的send方法。这里将数据转化为Message的代码serializer.toMessage(v),即用户自定义的serializer.class发挥作用的地方。

send方法

下面我们来看下syncProducer的send方法的源码。

 1  private void send(Request request) {
 2     //构造send对象,以备发送
 3         BoundedByteBufferSend send = new BoundedByteBufferSend(request);
 4         synchronized (lock) {
 5             verifySendBuffer(send.getBuffer().slice());
 6             //确认到broker的连接依然可用
 7             getOrMakeConnection();
 8             int written = -1;
 9             try {
10             //写数据
11                 written = send.writeCompletely(channel);
12             } catch (IOException e) {
13                 // no way to tell if write succeeded. Disconnect and re-throw exception to let client handle retry
14                 disconnect();
15                 throw new RuntimeException(e);
16             } finally {
17                 if (logger.isDebugEnabled()) {
18                     logger.debug(format("write %d bytes data to %s:%d", written, host, port));
19                 }
20             }
21             //记录连接次数,判断是否需要重新连接
22             sentOnConnection++;
23             if (sentOnConnection >= config.reconnectInterval//
24                     || (config.reconnectTimeInterval >= 0 && System.currentTimeMillis() - lastConnectionTime >= config.reconnectTimeInterval)) {
25                 disconnect();
26                 channel = connect();
27                 sentOnConnection = 0;
28                 lastConnectionTime = System.currentTimeMillis();
29             }
30         }
31     }

该方法的过程也很简单:构造一个Send对象来准备发送;检查到broker连接是不是可用的;写数据。

Send对象在前面的文章中有提到,不了解的读者可以前往查看。同步发送的逻辑并不复杂,这里不多说明了。

异步发送

首先我们来看看AsyncProducer的构造函数。

 1 public AsyncProducer(AsyncProducerConfig config) {
 2  this(config//
 3          , new SyncProducer(config)//
 4          , (Encoder<T>)Utils.getObject(config.getSerializerClass())//
 5          , (EventHandler<T>)Utils.getObject(config.getEventHandler())//
 6          , config.getEventHandlerProperties()//
 7          , (CallbackHandler<T>)Utils.getObject(config.getCbkHandler())//
 8          , config.getCbkHandlerProperties());
 9 }
10 
11 public AsyncProducer(AsyncProducerConfig config, //
12       SyncProducer producer, //
13       Encoder<T> serializer, //
14       EventHandler<T> eventHandler,//
15       Properties eventHandlerProperties, //
16       CallbackHandler<T> callbackHandler, //
17       Properties callbackHandlerProperties) {
18   super();
19   this.config = config;
20   //一个SyncProducer类
21   this.producer = producer;
22   this.serializer = serializer;
23   //消息可发送时的处理类
24   this.eventHandler = eventHandler;
25   this.eventHandlerProperties = eventHandlerProperties;
26   this.callbackHandler = callbackHandler;
27   this.callbackHandlerProperties = callbackHandlerProperties;
28   this.enqueueTimeoutMs = config.getEnqueueTimeoutMs();
29   //消息队列,缓冲待发送的消息
30   this.queue  = new LinkedBlockingQueue<QueueItem<T>>(config.getQueueSize());
31    //
32    if(eventHandler != null) {
33        eventHandler.init(eventHandlerProperties);
34    }
35    if(callbackHandler!=null) {
36        callbackHandler.init(callbackHandlerProperties);
37    }
38     //创建发送的线程
39    this.sendThread = new ProducerSendThread<T>("ProducerSendThread-" + asyncProducerID,
40            queue, //
41            serializer,//
42            producer, //
43            eventHandler!=null?eventHandler//发送事件触发的类
44                    :new DefaultEventHandler<T>(new ProducerConfig(config.getProperties()),callbackHandler), //
45            callbackHandler, //
46            config.getQueueTime(), //
47            config.getBatchSize());
48    this.sendThread.setDaemon(false);
49    AsyncProducerQueueSizeStats<T> stats = new AsyncProducerQueueSizeStats<T>(queue);
50    stats.setMbeanName(ProducerQueueSizeMBeanName+"-"+asyncProducerID);
51          Utils.registerMBean(stats);
52     }

关于AsyncProducer的一些变量含义,不清楚的读者可以去查看producer的文章,这里就不再详述了。其中的eventHandler的使用时机时当asyncProducer发送消息时,下面会讲到。实际使用中的类是DefaultEventHandler,callbackHandler很少用到,感兴趣的读者自己去研究吧,这里不细讲了。另外asyncproducer中都会有一个syncProducer,用它来完成最后的发送消息工作。而sendThread的线程则负责定时定量的发送消息数据。

AsyncProducer初始化后,会调用start方法,即启动一个线程,那么我们就来看看这个线程的run方法都做了什么。

 1 public void start() {
 2         sendThread.start();//ProducerSendThread
 3 }
 4 
 5 //ProducerSendThread的run方法
 6 public void run() {
 7         try {
 8             List<QueueItem<T>> remainingEvents = processEvents();
 9             //handle remaining events
10             if (remainingEvents.size() > 0) {
11                 logger.debug(format("Dispatching last batch of %d events to the event handler", remainingEvents.size()));
12                 tryToHandle(remainingEvents);
13             }
14         } catch (Exception e) {
15             logger.error("Error in sending events: ", e);
16         } finally {
17             shutdownLatch.countDown();
18         }
19     }
20 
21 private List<QueueItem<T>> processEvents() {
22         long lastSend = System.currentTimeMillis();
23         final List<QueueItem<T>> events = new ArrayList<QueueItem<T>>();
24         boolean full = false;
25         while (!shutdown) {
26             try {
27                 //由消息队列中取数据
28                 QueueItem<T> item = queue.poll(Math.max(0, (lastSend + queueTime) - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
29                 long elapsed =  System.currentTimeMillis()- lastSend;
30                 boolean expired = item == null;
31                 if (item != null) {
32                     if (callbackHandler != null) {
33                         events.addAll(callbackHandler.afterDequeuingExistingData(item));
34                     } else {
35                         events.add(item);
36                     }
37                     full = events.size() >= batchSize;
38                 }
39 
40                 //判断是否队列已满或者已经超时
41                 if (full || expired) {
42                     if (logger.isDebugEnabled()) {
43                         if (expired) {
44                             logger.debug(elapsed + " ms elapsed. Queue time reached. Sending..");
45                         } else {
46                             logger.debug(format("Batch(%d) full. Sending..", batchSize));
47                         }
48                     }
49                     tryToHandle(events);
50                     lastSend = System.currentTimeMillis();
51                     events.clear();
52                 }
53             } catch (InterruptedException e) {
54                 logger.warn(e.getMessage(), e);
55             }
56         }
57         if (queue.size() > 0) {
58             throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, " + queue.size() + " remaining items in the queue");
59         }
60         if (this.callbackHandler != null) {
61             events.addAll(callbackHandler.lastBatchBeforeClose());
62         }
63         return events;
64     }

AsyncProducer的start方法中调用了sendThread的start方法,其为ProducerSendThread类,在该类的run方法中可以看到其主要方法为processEvents,该方法的while循环做的事情是:

  • 从消息队列(queue)中取数据,获取的方法为poll,该方法是阻塞获取值直到超时(queue.time)。
  • 如果获取了数据,将其加入到event列表中,并判断是否达到了配置的一次发送最大消息个数(batch.size),如果两个满足其一,则调用tryToHandle方法,该方法的源码也简单,最终它调用了DefaultEventHandler类的handler方法,其中调用栏syncProducer.multiSend方法,将events中封装的数据发送出去,代码这里就不帖了。

在上面的讲解中我们提到了一个消息队列(queue),它的作用是缓冲待发送的消息数据,其长度是由queue.size指定的,那么向其添加数据的代码在哪里?聪明的读者一定已经想到了send方法,这个我们本该一开始就讲的方法,其代码如下:

 1 public void send(String topic,T event,int partition) {
 2         AsyncProducerStats.recordEvent();
 3         if(closed.get()) {
 4             throw new QueueClosedException("Attempt to add event to a closed queue.");
 5         }
 6         //简单封装下数据
 7         QueueItem<T> data = new QueueItem<T>(event, partition, topic);
 8         if(this.callbackHandler!=null) {
 9             data = this.callbackHandler.beforeEnqueue(data);
10         }
11 
12         //向队列中添加该数据
13         boolean added = false;
14         try {
15             if(enqueueTimeoutMs==0) {
16                 added = queue.offer(data);
17             }else if(enqueueTimeoutMs<0) {
18                     queue.put(data);
19                     added = true;
20                 }else {
21                     added = queue.offer(data, enqueueTimeoutMs, TimeUnit.MILLISECONDS);
22                 }
23         } catch (InterruptedException e) {
24             throw new AsyncProducerInterruptedException(e);
25         }
26         if(this.callbackHandler!=null) {
27             this.callbackHandler.afterEnqueue(data, added);
28         }
29         if(!added) {
30             AsyncProducerStats.recordDroppedEvents();
31             throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + event);
32         }
33         
34     }

上面这段代码的核心逻辑很简单:封装data和将data添加到queue中。不过由于queue是有大小限制的(防止数据过多,占用大量内存),所以添加的时候有一定的策略,该策略可以通过queue.enqueueTimeout.ms来配置,即enqueueTimeoutMs。策略如下: * 等于0—调用offer方法,无论是否成功,直接返回,意味着如果queue满了,消息会被舍弃,并返回false。 * 小于0—调用put方法,阻塞直到可以成功加入queue * 大于0—调用offer(e,time,unit)方法,等待一段时间,超时的话返回false

这下异步发送的逻辑,大家应该理清了吧,简单来讲,send向queue里面填数据,sendThread定时定量的发送数据。其简单的时序图如下:

jafka_producer_async

上面的图只是简单地描绘了AsyncProducer的实现原理,并不对应实际方法。另外这只是一个AsyncProducer的图形,实际运行中,一个broker对应一个AsyncProducer,每一个producer都有自己的queue和sendThread。

小结

本文主要讲解了Jafka中Producer调用send方法后的逻辑,讲解了同步和异步发送实现原理和源码,希望对大家有所帮助。