Producer使用示例与配置简要说明 23 July 2012
本文主要讲解Jafka中Producer的详细使用,还会讲解其配置的用处,在看本文前,读者请确认已经对Jafka的使用有了初步的了解,如果还没有,狂点这里。另外,本文使用整合了zookeeper的Jafka。 简单来讲,生产者发送消息到服务端包含两个步骤:构造消息和发送消息。 先上段简单的代码:
1 Properties props = new Properties();
2 //指明获取发送地点的地址
3 props.setProperty("zk.connect","localhost:2181");
4 props.setProperty("serializer.class", StringEncoder.class.getName());
5 Producer<String,String> producer = new Producer<String, String>(new ProducerConfig(props));
6 for(int i=0;i<1000;i++){
7 //构造消息并发送
8 producer.send(new StringProducerData("hehe","hehe-data"+i));
9 }
10 producer.close();
上面的代码完成的功能如下: 1.构造信息,使用StringProducerData类,该类继承了ProducerData类,该类内容如下:
class ProducerData<K, V> {
private String topic;
private K key;
private List<V> data;
........
}
ProducerData类由topic(消息主题名称)、key(消息分配到固定partition的依据,后面会有介绍)、data(数据)组成。用户完全可以继承该类实现自己的消息构造类。
2.发送消息。发送前首先要获取服务端(broker)的地址,该信息是由zookeeper获取的,所以在配置信息中要指明zookeeper的连接地址,然后调用send函数即可将消息发送出去。 如上就是Jafka中Producer的使用,简单明了。实际使用中推荐单例模式。producer是线程安全的,新建producer时会连接zookeeper来获取broker等信息,这个过程比较耗费时间的,使用单例只连接一次即可。
Producer配置简要说明
Producer的所有配置见这里,其中有一个配置为producer.type=sync|async,是指Producer有同步和异步两种发送形式,区别在于前者立即发送,而后者延迟发送。显而易见的是,异步发送对于使用producer的程序影响更小,不至于因为发送消息过慢而影响当前程序的响应。这两种发送方式在使用上的区别在于配置文件的使用,下面会对二者的使用进行说明,但我们首先来看下二者使用上相同的配置。
参数名 | 默认值 | 参数意义 |
---|---|---|
serializer.class | com.sohu.jafka.producer.serializer.DefaultEncoders | 其实现了com.sohu.jafka.producer.serializer.Encoder/Decoder接口,serializer只要实现Encoder即可,常用的如StringEncoder。将传送的数据封装为Message,其实就是一个序列化的过程。 |
artitioner.class | com.sohu.jafka.producer.DefaultPartitioner | 根据用户为消息指定的key来判断消息应该存储在哪一个分区(partition)中。 |
Serializer.class
Serializer.class让用户自定义类,可以将要发送的数据封装为Message,只要用户实现com.sohu.jafka.producer.serializer.Encoder接口即可,该接口如下:
1 public interface Encoder<T> {
2 Message toMessage(T event);
3 }
T即为消息的类型,Message的构造方法为new Message(byte[]),所以这个方法要做的就是将T对象序列化为byte数组。
Partitioner.class
Partitioner.class是为发送的消息选择分区,这里首先说明下分区的概念。Jafka以分区(partition)的形式存储每个topic,分区的个数可以在服务端设定,或者后期通过脚本来修改,这些以后会涉及。分区的意义有两个:
-
一是使得consumer可以多线程消费消息,后面讲到consumer的时候有详细讲解;
-
二是用户可以限定同一topic下具有某些特性(key)的消息发送到同一个partition下面,这样consumer可以通过fetch指定partition来获取这一组数据(在consumer章节会详细讲解)。 用户自定义Partitioner,只要实现
1 public interface Partitioner<T> {
2 int partition(T key, int numPartitions);
3 }
只要实现partition方法就可以,key是在ProducerData里面指定的如:
StringProducerData data = new StringProducerData("topic","data");
//key如果不设置的话,不会调用partitioner
data.setKey("hehe");
key的类型也是可以任意指定的,ProducerData<K, V>中的K就是key的类型,而V是数据的类型。下面的代码简单地实现传送用户自定义bean对象的消息。
用户自定义代码举例
1 //用户自定义类
2 public class Person implements Serializable{
3 private String name;
4 private int id;
5 Person(String name, int id) {
6 this.name = name;
7 this.id = id;
8 }
9 // ……
10 }
11 //用户自定义bean的serializer类
12 public class PersonDataEncoder implements Encoder<Person> {
13 @Override
14 public Message toMessage(Person event) {
15 try {
16 //序列化对象
17 ByteArrayOutputStream baos = new ByteArrayOutputStream();
18 ObjectOutputStream oos = new ObjectOutputStream(baos);
19 oos.writeObject(event);
20 oos.close();
21 byte[] tmpBytes = baos.toByteArray();
22 baos.close();
23 return new Message(tmpBytes);
24 } catch (IOException e) {
25 e.printStackTrace();
26 }
27 return null;
28 }
29 }
30 //Partitioner类
31 public class PersonDataPartitioner implements Partitioner<String>{
32 @Override
33 public int partition(String key, int numPartitions) {
34 return key.length()%numPartitions;
35 }
36 }
37 //构造producer发送消息
38 Properties props = new Properties();
39 props.setProperty("zk.connect","localhost:2181");
40 props.setProperty("serializer.class",PersonDataEncoder.class.getName());
41 //只有在zookeeper时,可以使用
42 props.setProperty("partitioner.class",PersonDataPartitioner.class.getName());
43 Producer<String,Person> producer = new Producer<String, Person>(new ProducerConfig(props));
44 for(int i=0;i<100000;i++){
45 PersonProducerData data = new PersonProducerData("person",new Person("name",i));
46 data.setKey("haha"+i);
47 producer.send(data);
48 }
49 producer.close();
同步发送配置
上面的producer代码就是同步发送的示例。同步发送的常用配置如下:
参数名 | 默认值 | 参数意义 |
---|---|---|
buffer.size | 102400 | socket通信缓冲区大小,byte |
connect.timeout.ms | 5000 | 连接broker的超时时间,超过时报错 |
socket.timeout.ms | 30000 | socket连接超时时间 |
reconnect.interval | 30000 | producer的请求数达到该数目后,重新建立到broker的连接。 |
max.message.size | 1000000 | producer发送一条消息的最大字节数 |
异步发送配置
参数名 | 默认值 | 参数意义 |
---|---|---|
queue.time | 5000 | 在该段时间内没有新消息到来的话,发送消息到broker |
queue.size | 10000 | 指定异步发送队列的大小 |
batch.size | 200 | 指定异步发送每次批量发送的消息个数 |
queue.enqueueTimeout.ms | 0 | 指定入队的方法,0表示队列未满时成功入队,否则丢弃。小于0表示当队列满时,阻塞直到成功入队,大于0表示等待这些ms都无法成功入队,舍弃 |
异步发送的代码与同步发送差异不大,只需添加一行配置:
props.setProperty(“producer.type”,”async”);
异步发送的实现机制也并不复杂,它维护一个队列(queue),该队列的长度可以配置(queue.size),每一次调用producer的send方法,都是将消息封装一下,然后入队,另外会新开一个sendThread线程,不断地从这个queue中拿数据(poll),当获取的数据达到batch.size时,就批量发送给broker。queue是一个BlockingQueue,具体实现在后续源码阅读章节会给大家详细讲解。
小结
Producer的使用整体来讲还是很简单的,对于实时性要求较高的信息,采取同步发送的方法好,而对于像日志这种数据,可以采取异步发送的形式,减小对当前程序的压力。