Jafka Broker源码阅读之SocketServer解析 02 August 2012
本文将讲解Jafka SocketServer的实现,从前一篇总览的文章中,我们知道SocketServer的主要调用方法是startUp,那么我们就来研究下这个方法做了什么,不过在此之前我们还是先看下它的构造函数。
1 public SocketServer(RequestHandlerFactory handlerFactory,ServerConfig serverConfig) {
2 super();
3 //broker配置信息
4 this.serverConfig = serverConfig;
5 //消息数据的处理类
6 this.handlerFactory = handlerFactory;
7 //每个请求包的最大值,对应max.socket.request.bytes配置
8 this.maxRequestSize = serverConfig.getMaxSocketRequestSize();
9 //worker线程组,负责处理具体的socket读写请求
10 this.processors = new Processor[serverConfig.getNumThreads()];
11 //broker信息监控类
12 this.stats = new SocketServerStats(1000L * 1000L * 1000L * serverConfig.getMonitoringPeriodSecs());
13 //acceptor处理连接请求类
14 this.acceptor = new Acceptor(serverConfig.getPort(), //
15 processors, //
16 serverConfig.getSocketSendBuffer(), //
17 serverConfig.getSocketReceiveBuffer());
18 }
构造函数所做的事情已经在注释中有所说明,这里面都是一些初始化的操作,虽然没有实质性的线程启动等,但相信聪明读者已经注意到了acceptor processor handleractory这几个名词,并且从它们的名字上也基本可以大略地猜测其作用。
SocketServer.startup
下面就让我们在startup方法中揭开这些对象的面纱。
1 public void startup() throws InterruptedException {
2 //每个processor可以处理的最大连接数
3 final int maxCacheConnectionPerThread = serverConfig.getMaxConnections() / processors.length;
4 logger.info("start " + processors.length + " Processor threads");
5 //初始化并启动所有的Processor线程,其数目默认为cpu个数,可以通过num.threads来配置
6 for (int i = 0; i < processors.length; i++) {
7 processors[i] = new Processor(handlerFactory, stats, maxRequestSize, maxCacheConnectionPerThread);
8 Utils.newThread("jafka-processor-" + i, processors[i], false).start();
9 }
10 //初始化并启动acceptor线程
11 Utils.newThread("jafka-acceptor", acceptor, false).start();
12 acceptor.awaitStartup();
13 }
startup方法做的事情很简单,启动一个acceptor线程和多个processor线程,下图是Acceptor和Processor的类图。
AbstractServerThread
我们先来看看Acceptor和Processor的父类AbstractServerThread,这个抽象类实现了Runnable和Closable接口,前者是实现run方法,传入线程执行,后者实现close方法,为了统一对象关闭调用。父类的意义在于抽取子类中有相同作用的代码,AbstractServerThread的实现也的确是这样的,源码如下:
1 public abstract class AbstractServerThread implements Runnable,Closeable {
2
3 private Selector selector;
4 //启动和停止的闭锁
5 protected final CountDownLatch startupLatch = new CountDownLatch(1);
6 protected final CountDownLatch shutdownLatch = new CountDownLatch(1);
7 //线程状态布尔值
8 protected final AtomicBoolean alive = new AtomicBoolean(false);
9
10 final protected Logger logger = Logger.getLogger(getClass());
11 /**
12 * @return the selector
13 */
14 public Selector getSelector() {
15 if (selector == null) {
16 try {
17 selector = Selector.open();
18 } catch (IOException e) {
19 throw new RuntimeException(e);
20 }
21 }
22 return selector;
23 }
24
25 protected void closeSelector() {
26 Closer.closeQuietly(selector,logger);
27 }
28
29 //线程关闭方法
30 public void close() {
31 alive.set(false);
32 //唤醒调用了selector.select()的方法
33 selector.wakeup();
34 try {
35 //等待其他资源释放
36 shutdownLatch.await();
37 } catch (InterruptedException e) {
38 logger.error(e.getMessage(),e);
39 }
40 }
41
42 //调用该方法,表明线程已经完全启动
43 protected void startupComplete() {
44 alive.set(true);
45 startupLatch.countDown();
46 }
47
48 //调用该放法,表明线程已经完全关闭
49 protected void shutdownComplete() {
50 shutdownLatch.countDown();
51 }
52
53 protected boolean isRunning() {
54 return alive.get();
55 }
56
57 //阻塞直到线程完全启动,即调用了startupComplete方法
58 public void awaitStartup() throws InterruptedException {
59 startupLatch.await();
60 }
61 }
上述代码中设计了java.nio和java.util.concurrent包中的类,关于这两个包中类的使用不是本文的重点,有疑惑的同学可以去搜索相关知识。AbstractServerThread类将线程启动关闭的方法和selector取出来,Selector是java nio中的一个类,未使用过的同学最好先去搜索下nio的相关知识,再继续往下看,否则会无法理解源码的意义。)
Acceptor
我们来看下Acceptor的代码,从其名称上我们可以看到它主要负责accept工作,即处理socket连接请求,其run方法如下:
1 public void run() {
2 final ServerSocketChannel serverChannel;
3 try {
4 //启动socket服务器,并注册连接事件到selector上
5 serverChannel = ServerSocketChannel.open();
6 serverChannel.configureBlocking(false);
7 serverChannel.socket().bind(new InetSocketAddress(port));
8 serverChannel.register(getSelector(), SelectionKey.OP_ACCEPT);
9 } catch (IOException e) {
10 throw new RuntimeException(e);
11 }
12 //
13
14 logger.info("Awaiting connection on port "+port);
15 startupComplete();
16 //
17 int currentProcessor = 0;
18 //开始等待连接事件
19 while(isRunning()) {
20 int ready = -1;
21 try {
22 //阻塞至有连接请求或者500ms超时
23 ready = getSelector().select(500L);
24 } catch (IOException e) {
25 throw new IllegalStateException(e);
26 }
27 if(ready<=0)continue;
28 //遍历所有的连接请求
29 Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();
30 while(iter.hasNext() && isRunning())
31 try {
32 SelectionKey key = iter.next();
33 iter.remove();
34 //
35 if(key.isAcceptable()) {
36 //处理连接请求,关键方法
37 accept(key,processors[currentProcessor]);
38 }else {
39 throw new IllegalStateException("Unrecognized key state for acceptor thread.");
40 }
41 //以round-robin形式选择processor
42 currentProcessor = (currentProcessor + 1) % processors.length;
43 } catch (Throwable t) {
44 logger.error("Error in acceptor",t);
45 }
46 }
47 //run over
48 logger.info("Closing server socket and selector.");
49 Closer.closeQuietly(serverChannel, logger);
50 Closer.closeQuietly(getSelector(), logger);
51 shutdownComplete();
52 }
友情提醒: 如果上述代码看得您一头雾水,请先去补一下java nio的知识,笔者在第一次阅读该代码时也困惑了好一阵。
run方法主要做了以下两件事情:
- 启动了一个Socket Server,绑定到port,然后等待连接事件的发生。
- 当连接事件发生时,以round-robin形式选择processor,调用ccept方法,将该连接传入processor处理。
因此,accept方法是处理的关键,代码如下:
1 private void accept(SelectionKey key, Processor processor) throws IOException{
2 ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
3 serverSocketChannel.socket().setReceiveBufferSize(receiveBufferSize);
4 //接受连接请求,获取socketChannel
5 SocketChannel socketChannel = serverSocketChannel.accept();
6 //配置channel为非阻塞方式
7 socketChannel.configureBlocking(false);
8 socketChannel.socket().setTcpNoDelay(true);
9 socketChannel.socket().setSendBufferSize(sendBufferSize);
10 //传入processor
11 processor.accept(socketChannel);
12 }
13
14 //processor.accept
15 public void accept(SocketChannel socketChannel) {
16 //将该channel加入newConnections这个blockingQueue中,等待processor在新一轮循环中处理
17 newConnections.add(socketChannel);
18 //欢迎seletor,以便尽快处理新加入的channel
19 getSelector().wakeup();
20 }
accept方法执行逻辑也是很清晰的,首先接受连接请求,获得响应的channel,然后将processor会将该channel加入自己的队列中,等待处理。
Acceptor类的主要作用到这里就讲清楚了,下面我们看看Processor类的实现。
Processor
Processor类负责实际的读写请求,所以其实现也稍显复杂,run方法如下:
1 public void run() {
2 startupComplete();
3 while (isRunning()) {
4 try {
5 //处理连接队列中新加入的channel,其实就是注册读事件
6 configureNewConnections();
7 //等待读写请求事件
8 final Selector selector = getSelector();
9 int ready = selector.select(500);
10 if (ready <= 0) continue;
11 Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
12 while (iter.hasNext() && isRunning()) {
13 SelectionKey key = null;
14 try {
15 key = iter.next();
16 iter.remove();
17 if (key.isReadable()) {
18 //读请求
19 read(key);
20 } else if (key.isWritable()) {
21 //写请求
22 write(key);
23 } else if (!key.isValid()) {
24 close(key);
25 } else {
26 throw new IllegalStateException("Unrecognized key state for processor thread.");
27 }
28 } catch (EOFException eofe) {
29 Socket socket = channelFor(key).socket();
30 logger.debug(format("connection closed by %s:%d.", socket.getInetAddress(), socket.getPort()));
31 close(key);
32 } catch (InvalidRequestException ire) {
33 Socket socket = channelFor(key).socket();
34 logger.info(format("Closing socket connection to %s:%d due to invalid request: %s", socket.getInetAddress(), socket.getPort(),
35 ire.getMessage()));
36 close(key);
37 } catch (Throwable t) {
38 Socket socket = channelFor(key).socket();
39 final String msg = "Closing socket for %s:%d becaulse of error";
40 if (logger.isDebugEnabled()) {
41 logger.error(format(msg, socket.getInetAddress(), socket.getPort()), t);
42 } else {
43 logger.error(format(msg, socket.getInetAddress(), socket.getPort()));
44 }
45 close(key);
46 }
47 }
48 } catch (IOException e) {
49 logger.error(e.getMessage(), e);
50 }
51
52 }
53 //
54 logger.info("Closing selector while shutting down");
55 closeSelector();
56 shutdownComplete();
57 }
Processor的循环处理体中,首先处理连接队列中的新请求,方法为configNewConnection,其源码在下方,可以看到,其所做的就是将在该processor的selector上注册该channel的read事件,之后processor等待读写请求并做出响应的操作。
这里有一个编程细节,希望大家可以注意,就是在上面processor的accept方法中,调用selector.wakeup方法,其作用便是唤醒selector.select(500),使该线程立即执行,尽快处理新连入的channel。
1 private void configureNewConnections() throws ClosedChannelException {
2 while (newConnections.size() > 0) {
3 SocketChannel channel = newConnections.poll();
4 if (logger.isDebugEnabled()) {
5 logger.debug("Listening to new connection from " + channel.socket().getRemoteSocketAddress());
6 }
7 channel.register(getSelector(), SelectionKey.OP_READ);
8 }
9 }
read请求
下面我们看下read请求的处理方法。
1 private void read(SelectionKey key) throws IOException {
2 SocketChannel socketChannel = channelFor(key);
3 Receive request = null;
4 if (key.attachment() == null) {
5 //第一次读取数据
6 request = new BoundedByteBufferReceive(maxRequestSize);
7 key.attach(request);
8 } else {
9 //多次数据时,直接由key的attachment中获取
10 request = (Receive) key.attachment();
11 }
12 //从channel中读取数据
13 int read = request.readFrom(socketChannel);
14 stats.recordBytesRead(read);
15 if (read < 0) {
16 //没有消息数据
17 close(key);
18 } else if (request.complete()) {
19 //成功读取消息数据,传入handle处理
20 Send maybeResponse = handle(key, request);
21 key.attach(null);
22 //如果有返回数据,则注册write事件
23 if (maybeResponse != null) {
24 key.attach(maybeResponse);
25 key.interestOps(SelectionKey.OP_WRITE);
26 }
27 } else {
28 //传递数据多,要分多次读取,所以要再次注册read事件
29 key.interestOps(SelectionKey.OP_READ);
30 getSelector().wakeup();
31 if (logger.isTraceEnabled()) {
32 logger.trace("reading request not been done. " + request);
33 }
34 }
35 }
36
37 private Send handle(SelectionKey key, Receive request) {
38 final short requestTypeId = request.buffer().getShort();
39 //获取request类型
40 final RequestKeys requestType = RequestKeys.valueOf(requestTypeId);
41 //获取对应种类的RequestHandler
42 RequestHandler handlerMapping = requesthandlerFactory.mapping(requestType, request);
43 if (handlerMapping == null) {
44 throw new InvalidRequestException("No handler found for request");
45 }
46 long start = System.nanoTime();
47 //调用handler方法,返回处理结果
48 Send maybeSend = handlerMapping.handler(requestType, request);
49 stats.recordRequest(requestType, System.nanoTime() - start);
50 return maybeSend;
51 }
代码逻辑请参照注释,简单说来就是先获取channel,然后尝试从channel中读取数据,如果没有获取数据,直接close;如果获取的数据不完整需要多次读取,就再注册read事件;如果已经获取所有数据了,那么传入handle方法执行相应的RequestHandler方法,如果返回值不为空,则注册写事件,将结果返回客户端。
提示
:Request Send以及RequestHandler会另外写文章分析,本文主要讲解SocketServer的处理逻辑,不再展开讲解。
write请求
下面我们看write请求的处理方法。
1 private void write(SelectionKey key) throws IOException {
2 Send response = (Send) key.attachment();
3 SocketChannel socketChannel = channelFor(key);
4 //将response写到channel中
5 int written = response.writeTo(socketChannel);
6 stats.recordBytesWritten(written);
7 if (response.complete()) {
8 key.attach(null);
9 key.interestOps(SelectionKey.OP_READ);
10 } else {
11 key.interestOps(SelectionKey.OP_WRITE);
12 getSelector().wakeup();
13 }
写处理和读处理是类似的,一次写不完的就再注册写事件等待下一次写。
小结
通过上面的分析,相信大家脑海中已经对SocketServer有了大概的了解,它就是由一个acceptor和多个processor组成,前者负责处理连接请求,后者负责处理读写请求。这个实现简单灵活,processor的数目是可调节的,其性能也有相应的测试。 总的来说,SocketServer使用nio实现了一个高性能的socket服务器,感兴趣的同学可以去关注下目前NIO方面的一些框架mina、netty、taobao开源的gecko等等,借助这些框架可以更快更好地实现高性能的socket服务器。虽然jafka在服务端是自己实现了socket服务端,但nio编程中有许多陷阱和需要注意的地方,即便是一些老手都经常在这上面再跟头,所以还是建议大家尽量使用框架。Nio编程中有许多细节要注意,比如关闭的操作,本文列出的代码中有响应关闭的操作,读者可以好好地体会下Jafka是如何在捕获各种异常的情况下来合理关闭资源的。