本文将讲解Jafka SocketServer的实现,从前一篇总览的文章中,我们知道SocketServer的主要调用方法是startUp,那么我们就来研究下这个方法做了什么,不过在此之前我们还是先看下它的构造函数。

 1    public SocketServer(RequestHandlerFactory handlerFactory,ServerConfig serverConfig) {
 2         super();
 3         //broker配置信息
 4         this.serverConfig = serverConfig;
 5         //消息数据的处理类
 6         this.handlerFactory = handlerFactory;
 7 	//每个请求包的最大值,对应max.socket.request.bytes配置
 8         this.maxRequestSize = serverConfig.getMaxSocketRequestSize();
 9         //worker线程组,负责处理具体的socket读写请求
10         this.processors = new Processor[serverConfig.getNumThreads()];
11         //broker信息监控类
12         this.stats = new SocketServerStats(1000L * 1000L * 1000L * serverConfig.getMonitoringPeriodSecs());
13         //acceptor处理连接请求类
14         this.acceptor = new Acceptor(serverConfig.getPort(), //
15                 processors, //
16                 serverConfig.getSocketSendBuffer(), //
17                 serverConfig.getSocketReceiveBuffer());
18    }

构造函数所做的事情已经在注释中有所说明,这里面都是一些初始化的操作,虽然没有实质性的线程启动等,但相信聪明读者已经注意到了acceptor processor handleractory这几个名词,并且从它们的名字上也基本可以大略地猜测其作用。

SocketServer.startup

下面就让我们在startup方法中揭开这些对象的面纱。

 1    public void startup() throws InterruptedException {
 2         //每个processor可以处理的最大连接数
 3         final int maxCacheConnectionPerThread = serverConfig.getMaxConnections() / processors.length;
 4         logger.info("start " + processors.length + " Processor threads");
 5         //初始化并启动所有的Processor线程,其数目默认为cpu个数,可以通过num.threads来配置
 6         for (int i = 0; i < processors.length; i++) {
 7             processors[i] = new Processor(handlerFactory, stats, maxRequestSize, maxCacheConnectionPerThread);
 8             Utils.newThread("jafka-processor-" + i, processors[i], false).start();
 9         }
10         //初始化并启动acceptor线程
11         Utils.newThread("jafka-acceptor", acceptor, false).start();
12         acceptor.awaitStartup();
13     }

startup方法做的事情很简单,启动一个acceptor线程和多个processor线程,下图是Acceptor和Processor的类图。

acceptor类图

AbstractServerThread

我们先来看看Acceptor和Processor的父类AbstractServerThread,这个抽象类实现了Runnable和Closable接口,前者是实现run方法,传入线程执行,后者实现close方法,为了统一对象关闭调用。父类的意义在于抽取子类中有相同作用的代码,AbstractServerThread的实现也的确是这样的,源码如下:

 1 public abstract class AbstractServerThread implements Runnable,Closeable {
 2 
 3     private Selector selector;
 4     //启动和停止的闭锁
 5     protected final CountDownLatch startupLatch = new CountDownLatch(1);
 6     protected final CountDownLatch shutdownLatch = new CountDownLatch(1);
 7     //线程状态布尔值
 8     protected final AtomicBoolean alive = new AtomicBoolean(false);
 9     
10     final protected Logger logger = Logger.getLogger(getClass());
11     /**
12      * @return the selector
13      */
14     public Selector getSelector() {
15         if (selector == null) {
16             try {
17                 selector = Selector.open();
18             } catch (IOException e) {
19                 throw new RuntimeException(e);
20             }
21         }
22         return selector;
23     }
24     
25     protected void closeSelector() {
26         Closer.closeQuietly(selector,logger);
27     }
28 
29     //线程关闭方法
30     public void close() {
31         alive.set(false);
32         //唤醒调用了selector.select()的方法
33         selector.wakeup();
34         try {
35             //等待其他资源释放
36             shutdownLatch.await();
37         } catch (InterruptedException e) {
38             logger.error(e.getMessage(),e);
39         }
40     }
41 
42     //调用该方法,表明线程已经完全启动
43     protected void startupComplete() {
44         alive.set(true);
45         startupLatch.countDown();
46     }
47 
48     //调用该放法,表明线程已经完全关闭
49     protected void shutdownComplete() {
50         shutdownLatch.countDown();
51     }
52 
53     protected boolean isRunning() {
54         return alive.get();
55     }
56 
57     //阻塞直到线程完全启动,即调用了startupComplete方法
58     public void awaitStartup() throws InterruptedException {
59         startupLatch.await();
60     }
61 }

上述代码中设计了java.nio和java.util.concurrent包中的类,关于这两个包中类的使用不是本文的重点,有疑惑的同学可以去搜索相关知识。AbstractServerThread类将线程启动关闭的方法和selector取出来,Selector是java nio中的一个类,未使用过的同学最好先去搜索下nio的相关知识,再继续往下看,否则会无法理解源码的意义。)

Acceptor

我们来看下Acceptor的代码,从其名称上我们可以看到它主要负责accept工作,即处理socket连接请求,其run方法如下:

 1    public void run() {
 2         final ServerSocketChannel serverChannel;
 3         try {
 4             //启动socket服务器,并注册连接事件到selector上
 5             serverChannel = ServerSocketChannel.open();
 6             serverChannel.configureBlocking(false);
 7             serverChannel.socket().bind(new InetSocketAddress(port));
 8             serverChannel.register(getSelector(), SelectionKey.OP_ACCEPT);
 9         } catch (IOException e) {
10             throw new RuntimeException(e);
11         }
12         //
13 
14         logger.info("Awaiting connection on port "+port);
15         startupComplete();
16         //
17         int currentProcessor = 0;
18         //开始等待连接事件
19         while(isRunning()) {
20             int ready = -1;
21             try {
22                 //阻塞至有连接请求或者500ms超时
23                 ready = getSelector().select(500L);
24             } catch (IOException e) {
25                 throw new IllegalStateException(e);
26             }
27             if(ready<=0)continue;
28             //遍历所有的连接请求
29             Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();
30             while(iter.hasNext() && isRunning())
31                 try {
32                     SelectionKey key = iter.next();
33                     iter.remove();
34                     //
35                     if(key.isAcceptable()) {
36                         //处理连接请求,关键方法
37                         accept(key,processors[currentProcessor]);
38                     }else {
39                         throw new IllegalStateException("Unrecognized key state for acceptor thread.");
40                     }
41                     //以round-robin形式选择processor
42                     currentProcessor = (currentProcessor + 1) % processors.length;
43                 } catch (Throwable t) {
44                     logger.error("Error in acceptor",t);
45                 }
46             }
47         //run over
48         logger.info("Closing server socket and selector.");
49         Closer.closeQuietly(serverChannel, logger);
50         Closer.closeQuietly(getSelector(), logger);
51         shutdownComplete();
52    }
 友情提醒:
 	如果上述代码看得您一头雾水,请先去补一下java nio的知识,笔者在第一次阅读该代码时也困惑了好一阵。

run方法主要做了以下两件事情:

  • 启动了一个Socket Server,绑定到port,然后等待连接事件的发生。
  • 当连接事件发生时,以round-robin形式选择processor,调用ccept方法,将该连接传入processor处理。

因此,accept方法是处理的关键,代码如下:

 1 private void accept(SelectionKey key, Processor processor) throws IOException{
 2         ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
 3         serverSocketChannel.socket().setReceiveBufferSize(receiveBufferSize);
 4         //接受连接请求,获取socketChannel
 5         SocketChannel socketChannel = serverSocketChannel.accept();
 6         //配置channel为非阻塞方式
 7         socketChannel.configureBlocking(false);
 8         socketChannel.socket().setTcpNoDelay(true);
 9         socketChannel.socket().setSendBufferSize(sendBufferSize);
10         //传入processor
11         processor.accept(socketChannel);
12     }
13 
14 //processor.accept
15 public void accept(SocketChannel socketChannel) {
16         //将该channel加入newConnections这个blockingQueue中,等待processor在新一轮循环中处理
17         newConnections.add(socketChannel);
18         //欢迎seletor,以便尽快处理新加入的channel
19         getSelector().wakeup();
20     }

accept方法执行逻辑也是很清晰的,首先接受连接请求,获得响应的channel,然后将processor会将该channel加入自己的队列中,等待处理。

Acceptor类的主要作用到这里就讲清楚了,下面我们看看Processor类的实现。

Processor

Processor类负责实际的读写请求,所以其实现也稍显复杂,run方法如下:

 1 public void run() {
 2  startupComplete();
 3  while (isRunning()) {
 4      try {
 5          //处理连接队列中新加入的channel,其实就是注册读事件
 6          configureNewConnections();
 7          //等待读写请求事件
 8          final Selector selector = getSelector();
 9          int ready = selector.select(500);
10          if (ready <= 0) continue;
11          Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
12          while (iter.hasNext() && isRunning()) {
13              SelectionKey key = null;
14              try {
15                  key = iter.next();
16                  iter.remove();
17                  if (key.isReadable()) {
18                      //读请求
19                      read(key);
20                  } else if (key.isWritable()) {
21                      //写请求
22                      write(key);
23                  } else if (!key.isValid()) {
24                      close(key);
25                  } else {
26                      throw new IllegalStateException("Unrecognized key state for processor thread.");
27                  }
28              } catch (EOFException eofe) {
29                  Socket socket = channelFor(key).socket();
30                  logger.debug(format("connection closed by %s:%d.", socket.getInetAddress(), socket.getPort()));
31                  close(key);
32              } catch (InvalidRequestException ire) {
33                  Socket socket = channelFor(key).socket();
34                  logger.info(format("Closing socket connection to %s:%d due to invalid request: %s", socket.getInetAddress(), socket.getPort(),
35                          ire.getMessage()));
36                  close(key);
37              } catch (Throwable t) {
38                  Socket socket = channelFor(key).socket();
39                  final String msg = "Closing socket for %s:%d becaulse of error";
40                  if (logger.isDebugEnabled()) {
41                      logger.error(format(msg, socket.getInetAddress(), socket.getPort()), t);
42                  } else {
43                      logger.error(format(msg, socket.getInetAddress(), socket.getPort()));
44                  }
45                  close(key);
46              }
47          }
48      } catch (IOException e) {
49          logger.error(e.getMessage(), e);
50      }
51 
52  }
53  //
54  logger.info("Closing selector while shutting down");
55  closeSelector();
56  shutdownComplete();
57 }

Processor的循环处理体中,首先处理连接队列中的新请求,方法为configNewConnection,其源码在下方,可以看到,其所做的就是将在该processor的selector上注册该channel的read事件,之后processor等待读写请求并做出响应的操作。

这里有一个编程细节,希望大家可以注意,就是在上面processor的accept方法中,调用selector.wakeup方法,其作用便是唤醒selector.select(500),使该线程立即执行,尽快处理新连入的channel。

1 private void configureNewConnections() throws ClosedChannelException {
2   while (newConnections.size() > 0) {
3       SocketChannel channel = newConnections.poll();
4       if (logger.isDebugEnabled()) {
5           logger.debug("Listening to new connection from " + channel.socket().getRemoteSocketAddress());
6       }
7       channel.register(getSelector(), SelectionKey.OP_READ);
8   }
9 }

read请求

下面我们看下read请求的处理方法。

 1  private void read(SelectionKey key) throws IOException {
 2         SocketChannel socketChannel = channelFor(key);
 3         Receive request = null;
 4         if (key.attachment() == null) {
 5             //第一次读取数据
 6             request = new BoundedByteBufferReceive(maxRequestSize);
 7             key.attach(request);
 8         } else {
 9             //多次数据时,直接由key的attachment中获取
10             request = (Receive) key.attachment();
11         }
12         //从channel中读取数据
13         int read = request.readFrom(socketChannel);
14         stats.recordBytesRead(read);
15         if (read < 0) {
16             //没有消息数据
17             close(key);
18         } else if (request.complete()) {
19             //成功读取消息数据,传入handle处理
20             Send maybeResponse = handle(key, request);
21             key.attach(null);
22             //如果有返回数据,则注册write事件
23             if (maybeResponse != null) {
24                 key.attach(maybeResponse);
25                 key.interestOps(SelectionKey.OP_WRITE);
26             }
27         } else {
28             //传递数据多,要分多次读取,所以要再次注册read事件
29             key.interestOps(SelectionKey.OP_READ);
30             getSelector().wakeup();
31             if (logger.isTraceEnabled()) {
32                 logger.trace("reading request not been done. " + request);
33             }
34         }
35     }
36 
37 private Send handle(SelectionKey key, Receive request) {
38         final short requestTypeId = request.buffer().getShort();
39         //获取request类型
40         final RequestKeys requestType = RequestKeys.valueOf(requestTypeId);
41         //获取对应种类的RequestHandler
42         RequestHandler handlerMapping = requesthandlerFactory.mapping(requestType, request);
43         if (handlerMapping == null) {
44             throw new InvalidRequestException("No handler found for request");
45         }
46         long start = System.nanoTime();
47         //调用handler方法,返回处理结果
48         Send maybeSend = handlerMapping.handler(requestType, request);
49         stats.recordRequest(requestType, System.nanoTime() - start);
50         return maybeSend;
51     }

代码逻辑请参照注释,简单说来就是先获取channel,然后尝试从channel中读取数据,如果没有获取数据,直接close;如果获取的数据不完整需要多次读取,就再注册read事件;如果已经获取所有数据了,那么传入handle方法执行相应的RequestHandler方法,如果返回值不为空,则注册写事件,将结果返回客户端。

提示:Request Send以及RequestHandler会另外写文章分析,本文主要讲解SocketServer的处理逻辑,不再展开讲解。

write请求

下面我们看write请求的处理方法。

 1 private void write(SelectionKey key) throws IOException {
 2   Send response = (Send) key.attachment();
 3   SocketChannel socketChannel = channelFor(key);
 4   //将response写到channel中
 5   int written = response.writeTo(socketChannel);
 6   stats.recordBytesWritten(written);
 7   if (response.complete()) {
 8       key.attach(null);
 9       key.interestOps(SelectionKey.OP_READ);
10   } else {
11       key.interestOps(SelectionKey.OP_WRITE);
12       getSelector().wakeup();
13   }

写处理和读处理是类似的,一次写不完的就再注册写事件等待下一次写。

小结

通过上面的分析,相信大家脑海中已经对SocketServer有了大概的了解,它就是由一个acceptor和多个processor组成,前者负责处理连接请求,后者负责处理读写请求。这个实现简单灵活,processor的数目是可调节的,其性能也有相应的测试。 总的来说,SocketServer使用nio实现了一个高性能的socket服务器,感兴趣的同学可以去关注下目前NIO方面的一些框架minanetty、taobao开源的gecko等等,借助这些框架可以更快更好地实现高性能的socket服务器。虽然jafka在服务端是自己实现了socket服务端,但nio编程中有许多陷阱和需要注意的地方,即便是一些老手都经常在这上面再跟头,所以还是建议大家尽量使用框架。Nio编程中有许多细节要注意,比如关闭的操作,本文列出的代码中有响应关闭的操作,读者可以好好地体会下Jafka是如何在捕获各种异常的情况下来合理关闭资源的。