Jafka Broker源码阅读之LogManager 09 August 2012
本文主要讲解Jafka Broker中LogManager类,介绍Jafka是如何组织其数据文件,来实现在O(1)时间复杂度内完成消息写和快速读取数据的。
Logmanager启动代码
在前面讲解Jafka broker代码中,涉及Logmanager的代码如下:
1 //初始化消息数据管理类LogManager,并将所有的消息数据按照一定格式读入内存(非数据内容本身)
2 this.logManager = new LogManager(config,//
3 scheduler,//
4 1000L * 60 * config.getLogCleanupIntervalMinutes(),//
5 1000L * 60 * 60 * config.getLogRetentionHours(),//
6 needRecovery);
7 this.logManager.setRollingStategy(config.getRollingStrategy());
8 logManager.load();
9
10 ...
11
12 //如果开启了zookeeper连接,则将该broker信息注册到zookeeper中,并开启定时flush消息数据的线程
13 logManager.startup();
上面代码主要涉及了LogManager的三个函数:构造函数、load函数和startup函数,下面我们一起来看下这几个函数。
LogManager的构造函数
1 public class LogManager implements PartitionChooser, Closeable {
2
3 final ServerConfig config;
4 //清理数据文件的定时器
5 private final Scheduler scheduler;
6
7 final long logCleanupIntervalMs;
8
9 final long logCleanupDefaultAgeMs;
10
11 final boolean needRecovery;
12
13
14 final int numPartitions;
15
16 final File logDir;
17
18 final int flushInterval;
19
20 private final Object logCreationLock = new Object();
21
22 final Random random = new Random();
23
24 final CountDownLatch startupLatch;
25
26 //以<topic,<partitionNum,Log>>的形式存储所有的jafka数据文件
27 private final Pool<String, Pool<Integer, Log>> logs = new Pool<String, Pool<Integer, Log>>();
28
29 //flush jafka文件的定时器
30 private final Scheduler logFlusherScheduler = new Scheduler(1, "jafka-logflusher-", false);
31
32 private final LinkedBlockingQueue<TopicTask> topicRegisterTasks = new LinkedBlockingQueue<TopicTask>();
33
34 private volatile boolean stopTopicRegisterTasks = false;
35
36 final Map<String, Integer> logFlushIntervalMap;
37
38 final Map<String, Long> logRetentionMSMap;
39
40 final int logRetentionSize;
41 //负责见该broker注册到zookeeper的对象
42 private ServerRegister serverRegister;
43
44 //<topic,partitionTotalNumber>的配置信息
45 private final Map<String, Integer> topicPartitionsMap;
46
47 private RollingStrategy rollingStategy;
48
49 public LogManager(ServerConfig config,Scheduler scheduler,long logCleanupIntervalMs,long logCleanupDefaultAgeMs,boolean needRecovery) {
50 super();
51 //传入配置参数
52 this.config = config;
53 //传入执行日志清除工作的定时器
54 this.scheduler = scheduler;
55 // this.time = time;
56 //各种参数配置
57 this.logCleanupIntervalMs = logCleanupIntervalMs;
58 this.logCleanupDefaultAgeMs = logCleanupDefaultAgeMs;
59 this.needRecovery = needRecovery;
60 //
61 this.logDir = Utils.getCanonicalFile(new File(config.getLogDir()));
62 this.numPartitions = config.getNumPartitions();
63 this.flushInterval = config.getFlushInterval();
64 this.topicPartitionsMap = config.getTopicPartitionsMap();
65 this.startupLatch = config.getEnableZookeeper() ? new CountDownLatch(1) : null;
66 this.logFlushIntervalMap = config.getFlushIntervalMap();
67 this.logRetentionSize = config.getLogRetentionSize();
68 this.logRetentionMSMap = getLogRetentionMSMap(config.getLogRetentionHoursMap());
69 //
70 }
71
72 }
LogManager的成员变量中logs负责组织所有的jafka文件,组织方式也简单,map的数据结构,最终形成<topic,partition>
对应一个Log
对象的形式,该Log对象其实是一批jafka文件。构造函数主要工作便是初始化配置参数,参数意义可以参见之前讲解broker使用的文章。
LogManager.load
1 public void load() throws IOException {
2 if (this.rollingStategy == null) {
3 this.rollingStategy = new FixedSizeRollingStrategy(config.getLogFileSize());
4 }
5
6 //检查log.dir配置的文件夹是否存在,不存在的话创建
7 if (!logDir.exists()) {
8 logger.info("No log directory found, creating '" + logDir.getAbsolutePath() + "'");
9 logDir.mkdirs();
10 }
11 if (!logDir.isDirectory() || !logDir.canRead()) {
12 throw new IllegalArgumentException(logDir.getAbsolutePath() + " is not a readable log directory.");
13 }
14 File[] subDirs = logDir.listFiles();
15 //遍历其下的子文件夹,命名方式为topic-partition
16 if (subDirs != null) {
17 for (File dir : subDirs) {
18 if (!dir.isDirectory()) {
19 logger.warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?");
20 } else {
21 logger.info("Loading log from " + dir.getAbsolutePath());
22 final String topicNameAndPartition = dir.getName();
23 //检测是否符合topic-partition的格式
24 if(-1 == topicNameAndPartition.indexOf('-')) {
25 throw new IllegalArgumentException("error topic directory: "+dir.getAbsolutePath());
26 }
27 //从文件夹名称中获取topic和partition数
28 final KV<String, Integer> topicPartion = Utils.getTopicPartition(topicNameAndPartition);
29 final String topic = topicPartion.k;
30 final int partition = topicPartion.v;
31 //新建一个Log对象
32 Log log = new Log(dir, partition, this.rollingStategy, flushInterval, needRecovery);
33
34 //将该topic-partition文件夹对应的log对象放入到logs中,建立topic-partition的映射关系
35 logs.putIfNotExists(topic, new Pool<Integer, Log>());
36 Pool<Integer, Log> parts = logs.get(topic);
37
38 parts.put(partition, log);
39 int configPartition = getPartition(topic);
40 if(configPartition < partition) {
41 topicPartitionsMap.put(topic, partition);
42 }
43 }
44 }
45 }
46
47 /* Schedule the cleanup task to delete old logs */
48 //启动定时清除日志的线程
49 if (this.scheduler != null) {
50 logger.info("starting log cleaner every " + logCleanupIntervalMs + " ms");
51 this.scheduler.scheduleWithRate(new Runnable() {
52
53 public void run() {
54 try {
55 cleanupLogs();
56 } catch (IOException e) {
57 logger.error("cleanup log failed.", e);
58 }
59 }
60
61 }, 60 * 1000, logCleanupIntervalMs);
62 }
63 //
64 if (config.getEnableZookeeper()) {
65 //将该broker信息注册到zookeeper上
66 this.serverRegister = new ServerRegister(config, this);
67 //建立到zookeeper的连接
68 serverRegister.startup();
69 //启动一个注册topic的线程,以阻塞方式从topicRegisterTasks中获取,当有新的topic注册时,立即向zk中注册
70 TopicRegisterTask task = new TopicRegisterTask();
71 task.setName("jafka.topicregister");
72 task.setDaemon(true);
73 task.start();
74 }
75 }
load函数代码意义见注释,其主要完成的工作便是遍历log.dir
下的所有文件夹,这些文件夹名按照topic-paritition命名,这些文件夹下是jafka文件。load依据topic parition建立一个Log对象,该对象中含有了其下所有jafka文件的句柄。然后将该log文件与其topic partition建立起映射关系,存入logs变量中。之后启动了定时清除日志的线程和注册topic到zk的线程。
按照<topic,<partition,Log»的形式组织jafka数据文件的原因是显而易见的,因为producer consumer的请求都是按照topic partition来做的。关于Log类,我们来简单看下下面这幅图。
由上到下是依次包含关系,我们从下面向上看。最底层的是FileMessageSet,这个类在前一篇通信协议的文章中已经有了详细的讲解,我们直到它通过FileChannel打开了jafka文件,可以对其进行读写操作,是最底层的类。接下来我们看其上的LogSegment类,其部分源码如下:
1 public class LogSegment implements Range, Comparable<LogSegment> {
2 //对应的jafka文件
3 private final File file;
4
5 private final FileMessageSet messageSet;
6 //通过jafka文件名获取的该文件起始偏移量
7 private final long start;
8 //标记该文件是否可删除
9 private volatile boolean deleted;
10
11 public LogSegment(File file, FileMessageSet messageSet, long start) {
12 super();
13 this.file = file;
14 this.messageSet = messageSet;
15 this.start = start;
16 this.deleted = false;
17 }
18
19 //value是传入的offset值,该方法可以判断本jafka文件是否包含该value
20 public boolean contains(long value) {
21 long size = size();
22 long start = start();
23 return ((size == 0 && value == start) //
24 || (size > 0 && value >= start && value <= start + size - 1));
25 }
26 }
由源码可知,LogSegment是一个简单的封装类,包含FileMessageSet和一些判断信息。接下来是SegmentList,从名字上就可知它包含一个LogSegment的列表,其关键源码如下:
1 public class SegmentList {
2
3 //topic-partition文件夹下的所有jafka文件
4 private final AtomicReference<List<LogSegment>> contents;
5
6 private final String name;
7
8 /**
9 * create the messages segments
10 *
11 * @param name the message topic name
12 * @param segments exist segments
13 */
14 public SegmentList(final String name, List<LogSegment> segments) {
15 this.name = name;
16 contents = new AtomicReference<List<LogSegment>>(segments);
17 }
18
19 //添加一个新的jafka文件到list后面,注意此处使用了类似CopyOnWrite的方法来避免读写冲突
20 public void append(LogSegment segment) {
21 while (true) {
22 List<LogSegment> curr = contents.get();
23 List<LogSegment> updated = new ArrayList<LogSegment>(curr);
24 updated.add(segment);
25 if (contents.compareAndSet(curr, updated)) {
26 return;
27 }
28 }
29 }
30
31 //截取某个offset之前的所有LogSegment,供删除用
32 public List<LogSegment> trunc(int newStart) {
33 if (newStart < 0) {
34 throw new IllegalArgumentException("Starting index must be positive.");
35 }
36 while (true) {
37 List<LogSegment> curr = contents.get();
38 int newLength = Math.max(curr.size() - newStart, 0);
39 List<LogSegment> updatedList = new ArrayList<LogSegment>(curr.subList(Math.min(newStart, curr.size() - 1),
40 curr.size()));
41 if (contents.compareAndSet(curr, updatedList)) {
42 return curr.subList(0, curr.size() - newLength);
43 }
44 }
45 }
46
47 //获取最后一个LogSegment,该segment是可写的
48 public LogSegment getLastView() {
49 List<LogSegment> views = getView();
50 return views.get(views.size() - 1);
51 }
52
53 }
上述代码中使用了AtomicReference+while(true)的形式采用CopyOnWrite的思想来实现线程安全,这是非常值得学习的地方。SegmentList包含了一个LogSegment的链表,并且提供了add get trunc等操作方法。好了,终于到了Log类了,这是一个很重要的类,首先来看下它的构造函数。
1 public Log(File dir, //
2 int partition,//
3 RollingStrategy rollingStategy,//
4 int flushInterval, //
5 boolean needRecovery) throws IOException {
6 super();
7 //一堆配置
8 this.dir = dir;
9 this.partition = partition;
10 this.rollingStategy = rollingStategy;
11 this.flushInterval = flushInterval;
12 this.needRecovery = needRecovery;
13 this.name = dir.getName();
14 this.logStats.setMbeanName("jafka:type=jafka.logs." + name);
15 Utils.registerMBean(logStats);
16 //载入所有的jafka文件
17 segments = loadSegments();
18 }
19
20 private SegmentList loadSegments() throws IOException {
21 List<LogSegment> accum = new ArrayList<LogSegment>();
22 File[] ls = dir.listFiles(new FileFilter() {
23
24 public boolean accept(File f) {
25 return f.isFile() && f.getName().endsWith(FileSuffix);
26 }
27 });
28 logger.info("loadSegments files from [" + dir.getAbsolutePath() + "]: " + ls.length);
29 int n = 0;
30 //遍历该文件夹下的所有jafka文件
31 for (File f : ls) {
32 n++;
33 String filename = f.getName();
34 //获取起始的offset值
35 long start = Long.parseLong(filename.substring(0, filename.length() - FileSuffix.length()));
36 final String logFormat = "LOADING_LOG_FILE[%2d], start(offset)=%d, size=%d, path=%s";
37 logger.info(String.format(logFormat, n, start, f.length(), f.getAbsolutePath()));
38 //建立FileMessageSet对象,即打开了文件,false代表以只读形式打开
39 FileMessageSet messageSet = new FileMessageSet(f, false);
40 accum.add(new LogSegment(f, messageSet, start));
41 }
42 if (accum.size() == 0) {
43 //如果没有jafka文件,则以可读写形式新建一个文件
44 File newFile = new File(dir, Log.nameFromOffset(0));
45 FileMessageSet fileMessageSet = new FileMessageSet(newFile, true);
46 accum.add(new LogSegment(newFile, fileMessageSet, 0));
47 } else {
48 //将accum中的jafka文件按照start值排序
49 Collections.sort(accum);
50 //检测日志数据完整性
51 validateSegments(accum);
52 }
53 //以读写方式打开最后一个文件,以供新消息数据写
54 LogSegment last = accum.remove(accum.size() - 1);
55 last.getMessageSet().close();
56 logger.info("Loading the last segment " + last.getFile().getAbsolutePath() + " in mutable mode, recovery " + needRecovery);
57 LogSegment mutable = new LogSegment(last.getFile(), new FileMessageSet(last.getFile(), true, new AtomicBoolean(
58 needRecovery)), last.start());
59 accum.add(mutable);
60 return new SegmentList(name, accum);
61 }
Log的初始化函数完成系列配置后,调用loadSegments方法载入所有的jafka文件,并将文件按照其offset值由大到小排序,这样做的目的是为了通过二分查找可以快速定位某offset所在的文件。另外最后一个jafka文件要以读写方式打开,其他文件以只读方式打开,从而做到了顺序读写,也就可以在O(1)的时间复杂度内完成消息数据写操作。Log类提供了读写消息的方法,读方法如下:
1 //读取自offset始,最长length的所有消息
2 public MessageSet read(long offset, int length) throws IOException {
3 List<LogSegment> views = segments.getView();
4 //二分查找符合条件的log文件爱你
5 LogSegment found = findRange(views, offset, views.size());
6 if (found == null) {
7 if (logger.isTraceEnabled()) {
8 logger.trace(format("NOT FOUND MessageSet from Log[%s], offset=%d, length=%d", name, offset, length));
9 }
10 return MessageSet.Empty;
11 }
12 //调用FileMessageSet的read方法,读取消息数据
13 return found.getMessageSet().read(offset - found.start(), length);
14 }
15
16 public static <T extends Range> T findRange(List<T> ranges, long value, int arraySize) {
17 if (ranges.size() < 1) return null;
18 T first = ranges.get(0);
19 T last = ranges.get(arraySize - 1);
20 // check out of bounds
21 if (value < first.start() || value > last.start() + last.size()) {
22 throw new OffsetOutOfRangeException("offset " + value + " is out of range");
23 }
24
25 // check at the end
26 if (value == last.start() + last.size()) return null;
27
28 //二分查找的代码
29 int low = 0;
30 int high = arraySize - 1;
31 while (low <= high) {
32 int mid = (high + low) / 2;
33 T found = ranges.get(mid);
34
35 if (found.contains(value)) {
36 return found;
37 } else if (value < found.start()) {
38 high = mid - 1;
39 } else {
40 low = mid + 1;
41 }
42 }
43 return null;
44 }
上面便是读消息的相关代码,相信读者结合注释很容易便能读懂。二分查找是一个亮点,另外在FileMessageSet的read方法还是有很多细节要注意的,比如如果length指定的位置不是一条消息的结尾时如何处理等等,感兴趣的读者可以自己去看下源码是如何解决这些问题的。
下面来看下写消息的代码。
1 public List<Long> append(ByteBufferMessageSet messages) {
2 //validate the messages
3 int numberOfMessages = 0;
4 for (MessageAndOffset messageAndOffset : messages) {
5 if (!messageAndOffset.message.isValid()) {
6 throw new InvalidMessageException();
7 }
8 numberOfMessages += 1;
9 }
10
11 ByteBuffer validByteBuffer = messages.getBuffer().duplicate();
12 long messageSetValidBytes = messages.getValidBytes();
13 if (messageSetValidBytes > Integer.MAX_VALUE || messageSetValidBytes < 0) throw new InvalidMessageSizeException(
14 "Illegal length of message set " + messageSetValidBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests");
15
16 validByteBuffer.limit((int) messageSetValidBytes);
17 ByteBufferMessageSet validMessages = new ByteBufferMessageSet(validByteBuffer);
18
19 // they are valid, insert them in the log
20 synchronized (lock) {
21 try {
22 //获取最后一个logsegment对象,append数据即可
23 LogSegment lastSegment = segments.getLastView();
24 long[] writtenAndOffset = lastSegment.getMessageSet().append(validMessages);
25 if (logger.isTraceEnabled()) {
26 logger.trace(String.format("[%s,%s] save %d messages, bytes %d", name, lastSegment.getName(),
27 numberOfMessages, writtenAndOffset[0]));
28 }
29 //检查是否要flush数据到磁盘
30 maybeFlush(numberOfMessages);
31 //检测该文件是否达到定义的文件大小,如果达到了,要新建一个文件
32 maybeRoll(lastSegment);
33
34 } catch (IOException e) {
35 logger.fatal("Halting due to unrecoverable I/O error while handling producer request", e);
36 Runtime.getRuntime().halt(1);
37 } catch (RuntimeException re) {
38 throw re;
39 }
40 }
41 return (List<Long>) null;
42 }
写文件的代码也很简单,获取最后一个LogSegment,然后append就可以了,之后检查一下是否要flush和roll就好了。
另外该类也提供了markDeletedWhile和getOffsetsBefore方法,分别用于标记jafka文件是否可删除和在某时间之前的offset值,这里就不展开讲了,感兴趣的读者可以自行去阅读。
小结
本文主要讲述了LogManager相关的类,jafka数据文件的组织方式等,希望对大家理解这部分源码有所帮助。