本文主要讲解Jafka Broker中LogManager类,介绍Jafka是如何组织其数据文件,来实现在O(1)时间复杂度内完成消息写和快速读取数据的。

Logmanager启动代码

在前面讲解Jafka broker代码中,涉及Logmanager的代码如下:

 1  //初始化消息数据管理类LogManager,并将所有的消息数据按照一定格式读入内存(非数据内容本身)
 2 this.logManager = new LogManager(config,//
 3         scheduler,//
 4         1000L * 60 * config.getLogCleanupIntervalMinutes(),//
 5         1000L * 60 * 60 * config.getLogRetentionHours(),//
 6         needRecovery);
 7 this.logManager.setRollingStategy(config.getRollingStrategy());
 8 logManager.load();
 9 
10 ...
11 
12 //如果开启了zookeeper连接,则将该broker信息注册到zookeeper中,并开启定时flush消息数据的线程
13 logManager.startup();

上面代码主要涉及了LogManager的三个函数:构造函数、load函数和startup函数,下面我们一起来看下这几个函数。

LogManager的构造函数

 1 public class LogManager implements PartitionChooser, Closeable {
 2 
 3     final ServerConfig config;
 4 	//清理数据文件的定时器
 5     private final Scheduler scheduler;
 6 
 7     final long logCleanupIntervalMs;
 8 
 9     final long logCleanupDefaultAgeMs;
10 
11     final boolean needRecovery;
12 
13 
14     final int numPartitions;
15 
16     final File logDir;
17 
18     final int flushInterval;
19 
20     private final Object logCreationLock = new Object();
21 
22     final Random random = new Random();
23 
24     final CountDownLatch startupLatch;
25 
26 	//以<topic,<partitionNum,Log>>的形式存储所有的jafka数据文件
27     private final Pool<String, Pool<Integer, Log>> logs = new Pool<String, Pool<Integer, Log>>();
28 
29 	//flush jafka文件的定时器
30     private final Scheduler logFlusherScheduler = new Scheduler(1, "jafka-logflusher-", false);
31 	
32     private final LinkedBlockingQueue<TopicTask> topicRegisterTasks = new LinkedBlockingQueue<TopicTask>();
33 
34     private volatile boolean stopTopicRegisterTasks = false;
35 
36     final Map<String, Integer> logFlushIntervalMap;
37 
38     final Map<String, Long> logRetentionMSMap;
39 
40     final int logRetentionSize;
41 	//负责见该broker注册到zookeeper的对象
42     private ServerRegister serverRegister;
43 
44 	//<topic,partitionTotalNumber>的配置信息
45     private final Map<String, Integer> topicPartitionsMap;
46 
47     private RollingStrategy rollingStategy;
48 
49 public LogManager(ServerConfig config,Scheduler scheduler,long logCleanupIntervalMs,long logCleanupDefaultAgeMs,boolean needRecovery) {
50         super();
51         //传入配置参数
52         this.config = config;
53         //传入执行日志清除工作的定时器
54         this.scheduler = scheduler;
55         //        this.time = time;
56         //各种参数配置
57         this.logCleanupIntervalMs = logCleanupIntervalMs;
58         this.logCleanupDefaultAgeMs = logCleanupDefaultAgeMs;
59         this.needRecovery = needRecovery;
60         //
61         this.logDir = Utils.getCanonicalFile(new File(config.getLogDir()));
62         this.numPartitions = config.getNumPartitions();
63         this.flushInterval = config.getFlushInterval();
64         this.topicPartitionsMap = config.getTopicPartitionsMap();
65         this.startupLatch = config.getEnableZookeeper() ? new CountDownLatch(1) : null;
66         this.logFlushIntervalMap = config.getFlushIntervalMap();
67         this.logRetentionSize = config.getLogRetentionSize();
68         this.logRetentionMSMap = getLogRetentionMSMap(config.getLogRetentionHoursMap());
69         //
70     }
71 
72 }

LogManager的成员变量中logs负责组织所有的jafka文件,组织方式也简单,map的数据结构,最终形成<topic,partition>对应一个Log对象的形式,该Log对象其实是一批jafka文件。构造函数主要工作便是初始化配置参数,参数意义可以参见之前讲解broker使用的文章

LogManager.load

 1 public void load() throws IOException {
 2         if (this.rollingStategy == null) {
 3             this.rollingStategy = new FixedSizeRollingStrategy(config.getLogFileSize());
 4         }
 5 
 6         //检查log.dir配置的文件夹是否存在,不存在的话创建
 7         if (!logDir.exists()) {
 8             logger.info("No log directory found, creating '" + logDir.getAbsolutePath() + "'");
 9             logDir.mkdirs();
10         }
11         if (!logDir.isDirectory() || !logDir.canRead()) {
12             throw new IllegalArgumentException(logDir.getAbsolutePath() + " is not a readable log directory.");
13         }
14         File[] subDirs = logDir.listFiles();
15         //遍历其下的子文件夹,命名方式为topic-partition
16         if (subDirs != null) {
17             for (File dir : subDirs) {
18                 if (!dir.isDirectory()) {
19                     logger.warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?");
20                 } else {
21                     logger.info("Loading log from " + dir.getAbsolutePath());
22                     final String topicNameAndPartition = dir.getName();
23                     //检测是否符合topic-partition的格式
24                     if(-1 == topicNameAndPartition.indexOf('-')) {
25                         throw new IllegalArgumentException("error topic directory: "+dir.getAbsolutePath());
26                     }
27                     //从文件夹名称中获取topic和partition数
28                     final KV<String, Integer> topicPartion = Utils.getTopicPartition(topicNameAndPartition);
29                     final String topic = topicPartion.k;
30                     final int partition = topicPartion.v;
31                     //新建一个Log对象
32                     Log log = new Log(dir, partition, this.rollingStategy, flushInterval, needRecovery);
33 
34                     //将该topic-partition文件夹对应的log对象放入到logs中,建立topic-partition的映射关系
35                     logs.putIfNotExists(topic, new Pool<Integer, Log>());
36                     Pool<Integer, Log> parts = logs.get(topic);
37                     
38                     parts.put(partition, log);
39                     int configPartition = getPartition(topic);
40                     if(configPartition < partition) {
41                         topicPartitionsMap.put(topic, partition);
42                     }
43                 }
44             }
45         }
46 
47         /* Schedule the cleanup task to delete old logs */
48         //启动定时清除日志的线程
49         if (this.scheduler != null) {
50             logger.info("starting log cleaner every " + logCleanupIntervalMs + " ms");
51             this.scheduler.scheduleWithRate(new Runnable() {
52 
53                 public void run() {
54                     try {
55                         cleanupLogs();
56                     } catch (IOException e) {
57                         logger.error("cleanup log failed.", e);
58                     }
59                 }
60 
61             }, 60 * 1000, logCleanupIntervalMs);
62         }
63         //
64         if (config.getEnableZookeeper()) {
65             //将该broker信息注册到zookeeper上
66             this.serverRegister = new ServerRegister(config, this);
67             //建立到zookeeper的连接
68             serverRegister.startup();
69             //启动一个注册topic的线程,以阻塞方式从topicRegisterTasks中获取,当有新的topic注册时,立即向zk中注册
70             TopicRegisterTask task = new TopicRegisterTask();
71             task.setName("jafka.topicregister");
72             task.setDaemon(true);
73             task.start();
74         }
75     }

load函数代码意义见注释,其主要完成的工作便是遍历log.dir下的所有文件夹,这些文件夹名按照topic-paritition命名,这些文件夹下是jafka文件。load依据topic parition建立一个Log对象,该对象中含有了其下所有jafka文件的句柄。然后将该log文件与其topic partition建立起映射关系,存入logs变量中。之后启动了定时清除日志的线程和注册topic到zk的线程。

按照<topic,<partition,Log»的形式组织jafka数据文件的原因是显而易见的,因为producer consumer的请求都是按照topic partition来做的。关于Log类,我们来简单看下下面这幅图。

LogManager

由上到下是依次包含关系,我们从下面向上看。最底层的是FileMessageSet,这个类在前一篇通信协议的文章中已经有了详细的讲解,我们直到它通过FileChannel打开了jafka文件,可以对其进行读写操作,是最底层的类。接下来我们看其上的LogSegment类,其部分源码如下:

 1 public class LogSegment implements Range, Comparable<LogSegment> {
 2 	//对应的jafka文件
 3     private final File file;
 4 
 5     private final FileMessageSet messageSet;
 6 	//通过jafka文件名获取的该文件起始偏移量
 7     private final long start;
 8 	//标记该文件是否可删除
 9     private volatile boolean deleted;
10 
11     public LogSegment(File file, FileMessageSet messageSet, long start) {
12         super();
13         this.file = file;
14         this.messageSet = messageSet;
15         this.start = start;
16         this.deleted = false;
17     }
18 
19 	//value是传入的offset值,该方法可以判断本jafka文件是否包含该value
20     public boolean contains(long value) {
21         long size = size();
22         long start = start();
23         return ((size == 0 && value == start) //
24         || (size > 0 && value >= start && value <= start + size - 1));
25     }
26     }

由源码可知,LogSegment是一个简单的封装类,包含FileMessageSet和一些判断信息。接下来是SegmentList,从名字上就可知它包含一个LogSegment的列表,其关键源码如下:

 1 public class SegmentList {
 2 
 3 	//topic-partition文件夹下的所有jafka文件
 4     private final AtomicReference<List<LogSegment>> contents;
 5 	
 6     private final String name;
 7 
 8     /**
 9      * create the messages segments
10      * 
11      * @param name the message topic name
12      * @param segments exist segments
13      */
14     public SegmentList(final String name, List<LogSegment> segments) {
15         this.name = name;
16         contents = new AtomicReference<List<LogSegment>>(segments);
17     }
18 
19      //添加一个新的jafka文件到list后面,注意此处使用了类似CopyOnWrite的方法来避免读写冲突
20     public void append(LogSegment segment) {
21         while (true) {
22             List<LogSegment> curr = contents.get();
23             List<LogSegment> updated = new ArrayList<LogSegment>(curr);
24             updated.add(segment);
25             if (contents.compareAndSet(curr, updated)) {
26                 return;
27             }
28         }
29     }
30 
31     //截取某个offset之前的所有LogSegment,供删除用
32     public List<LogSegment> trunc(int newStart) {
33         if (newStart < 0) {
34             throw new IllegalArgumentException("Starting index must be positive.");
35         }
36         while (true) {
37             List<LogSegment> curr = contents.get();
38             int newLength = Math.max(curr.size() - newStart, 0);
39             List<LogSegment> updatedList = new ArrayList<LogSegment>(curr.subList(Math.min(newStart, curr.size() - 1),
40                     curr.size()));
41             if (contents.compareAndSet(curr, updatedList)) {
42                 return curr.subList(0, curr.size() - newLength);
43             }
44         }
45     }
46 
47 	//获取最后一个LogSegment,该segment是可写的
48     public LogSegment getLastView() {
49         List<LogSegment> views = getView();
50         return views.get(views.size() - 1);
51     }
52 
53 }

上述代码中使用了AtomicReference+while(true)的形式采用CopyOnWrite的思想来实现线程安全,这是非常值得学习的地方。SegmentList包含了一个LogSegment的链表,并且提供了add get trunc等操作方法。好了,终于到了Log类了,这是一个很重要的类,首先来看下它的构造函数。

 1 public Log(File dir, //
 2             int partition,//
 3             RollingStrategy rollingStategy,//
 4             int flushInterval, //
 5             boolean needRecovery) throws IOException {
 6         super();
 7         //一堆配置
 8         this.dir = dir;
 9         this.partition = partition;
10         this.rollingStategy = rollingStategy;
11         this.flushInterval = flushInterval;
12         this.needRecovery = needRecovery;
13         this.name = dir.getName();
14         this.logStats.setMbeanName("jafka:type=jafka.logs." + name);
15         Utils.registerMBean(logStats);
16         //载入所有的jafka文件
17         segments = loadSegments();
18     }
19 
20 private SegmentList loadSegments() throws IOException {
21         List<LogSegment> accum = new ArrayList<LogSegment>();
22         File[] ls = dir.listFiles(new FileFilter() {
23 
24             public boolean accept(File f) {
25                 return f.isFile() && f.getName().endsWith(FileSuffix);
26             }
27         });
28         logger.info("loadSegments files from [" + dir.getAbsolutePath() + "]: " + ls.length);
29         int n = 0;
30         //遍历该文件夹下的所有jafka文件
31         for (File f : ls) {
32             n++;
33             String filename = f.getName();
34             //获取起始的offset值
35             long start = Long.parseLong(filename.substring(0, filename.length() - FileSuffix.length()));
36             final String logFormat = "LOADING_LOG_FILE[%2d], start(offset)=%d, size=%d, path=%s";
37             logger.info(String.format(logFormat, n, start, f.length(), f.getAbsolutePath()));
38             //建立FileMessageSet对象,即打开了文件,false代表以只读形式打开
39             FileMessageSet messageSet = new FileMessageSet(f, false);
40             accum.add(new LogSegment(f, messageSet, start));
41         }
42         if (accum.size() == 0) {
43             //如果没有jafka文件,则以可读写形式新建一个文件
44             File newFile = new File(dir, Log.nameFromOffset(0));
45             FileMessageSet fileMessageSet = new FileMessageSet(newFile, true);
46             accum.add(new LogSegment(newFile, fileMessageSet, 0));
47         } else {
48             //将accum中的jafka文件按照start值排序
49             Collections.sort(accum);
50             //检测日志数据完整性
51             validateSegments(accum);
52         }
53         //以读写方式打开最后一个文件,以供新消息数据写
54         LogSegment last = accum.remove(accum.size() - 1);
55         last.getMessageSet().close();
56         logger.info("Loading the last segment " + last.getFile().getAbsolutePath() + " in mutable mode, recovery " + needRecovery);
57         LogSegment mutable = new LogSegment(last.getFile(), new FileMessageSet(last.getFile(), true, new AtomicBoolean(
58                 needRecovery)), last.start());
59         accum.add(mutable);
60         return new SegmentList(name, accum);
61     }

Log的初始化函数完成系列配置后,调用loadSegments方法载入所有的jafka文件,并将文件按照其offset值由大到小排序,这样做的目的是为了通过二分查找可以快速定位某offset所在的文件。另外最后一个jafka文件要以读写方式打开,其他文件以只读方式打开,从而做到了顺序读写,也就可以在O(1)的时间复杂度内完成消息数据写操作。Log类提供了读写消息的方法,读方法如下:

 1 //读取自offset始,最长length的所有消息
 2 public MessageSet read(long offset, int length) throws IOException {
 3         List<LogSegment> views = segments.getView();
 4         //二分查找符合条件的log文件爱你
 5         LogSegment found = findRange(views, offset, views.size());
 6         if (found == null) {
 7             if (logger.isTraceEnabled()) {
 8                 logger.trace(format("NOT FOUND MessageSet from Log[%s], offset=%d, length=%d", name, offset, length));
 9             }
10             return MessageSet.Empty;
11         }
12         //调用FileMessageSet的read方法,读取消息数据
13         return found.getMessageSet().read(offset - found.start(), length);
14     }
15 
16 public static <T extends Range> T findRange(List<T> ranges, long value, int arraySize) {
17         if (ranges.size() < 1) return null;
18         T first = ranges.get(0);
19         T last = ranges.get(arraySize - 1);
20         // check out of bounds
21         if (value < first.start() || value > last.start() + last.size()) {
22             throw new OffsetOutOfRangeException("offset " + value + " is out of range");
23         }
24 
25         // check at the end
26         if (value == last.start() + last.size()) return null;
27 
28 	//二分查找的代码
29         int low = 0;
30         int high = arraySize - 1;
31         while (low <= high) {
32             int mid = (high + low) / 2;
33             T found = ranges.get(mid);
34             
35             if (found.contains(value)) {
36                 return found;
37             } else if (value < found.start()) {
38                 high = mid - 1;
39             } else {
40                 low = mid + 1;
41             }
42         }
43         return null;
44     }

上面便是读消息的相关代码,相信读者结合注释很容易便能读懂。二分查找是一个亮点,另外在FileMessageSet的read方法还是有很多细节要注意的,比如如果length指定的位置不是一条消息的结尾时如何处理等等,感兴趣的读者可以自己去看下源码是如何解决这些问题的。

下面来看下写消息的代码。

 1 public List<Long> append(ByteBufferMessageSet messages) {
 2         //validate the messages
 3         int numberOfMessages = 0;
 4         for (MessageAndOffset messageAndOffset : messages) {
 5             if (!messageAndOffset.message.isValid()) {
 6                 throw new InvalidMessageException();
 7             }
 8             numberOfMessages += 1;
 9         }
10 
11         ByteBuffer validByteBuffer = messages.getBuffer().duplicate();
12         long messageSetValidBytes = messages.getValidBytes();
13         if (messageSetValidBytes > Integer.MAX_VALUE || messageSetValidBytes < 0) throw new InvalidMessageSizeException(
14                 "Illegal length of message set " + messageSetValidBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests");
15 
16         validByteBuffer.limit((int) messageSetValidBytes);
17         ByteBufferMessageSet validMessages = new ByteBufferMessageSet(validByteBuffer);
18 
19         // they are valid, insert them in the log
20         synchronized (lock) {
21             try {
22             	//获取最后一个logsegment对象,append数据即可
23                 LogSegment lastSegment = segments.getLastView();
24                 long[] writtenAndOffset = lastSegment.getMessageSet().append(validMessages);
25                 if (logger.isTraceEnabled()) {
26                     logger.trace(String.format("[%s,%s] save %d messages, bytes %d", name, lastSegment.getName(),
27                             numberOfMessages, writtenAndOffset[0]));
28                 }
29                 //检查是否要flush数据到磁盘
30                 maybeFlush(numberOfMessages);
31                 //检测该文件是否达到定义的文件大小,如果达到了,要新建一个文件
32                 maybeRoll(lastSegment);
33 
34             } catch (IOException e) {
35                 logger.fatal("Halting due to unrecoverable I/O error while handling producer request", e);
36                 Runtime.getRuntime().halt(1);
37             } catch (RuntimeException re) {
38                 throw re;
39             }
40         }
41         return (List<Long>) null;
42     }

写文件的代码也很简单,获取最后一个LogSegment,然后append就可以了,之后检查一下是否要flush和roll就好了。

另外该类也提供了markDeletedWhile和getOffsetsBefore方法,分别用于标记jafka文件是否可删除和在某时间之前的offset值,这里就不展开讲了,感兴趣的读者可以自行去阅读。

小结

本文主要讲述了LogManager相关的类,jafka数据文件的组织方式等,希望对大家理解这部分源码有所帮助。