Java里著名框架的源码往往不容易读,他们在功能上大而全,把多态用到极致,又有很多的性能优化;相应的由于著名所以网上的资源也非常多,慢慢读总还是能体会到整个框架的设计和思路。配合着网上的博客,书籍和源代码,花了三天时间阅读,有了一定的收获,做一下笔记。
看netty之前回顾一下Java NIO,列一下使用Java API开发NIO所需要的几个步骤
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| selector = Selector.open(); servChannel = ServerSocketChannel.open(); servChannel.configureBlocking(false); servChannel.socket().bind(new InetSocketAddress(port), 1024);
SelectionKey key = servChannel.register(selector, SelectionKey.OP_ACCEPT);
public void run() { while (!stop) { selector.select(); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); handle(key); } } }
|
我们可以看到在run()
方法里开启了我们的Reactor线程,不断的获取注册在selector上的有新事件的channel,这里我们只用一个线程就处理了所有的连接,远比BIO下一个连接一个线程优秀。在循环代码里我们看到channel具体的处理逻辑是一个一个串行执行的,但是channel间没有任何联系,这个过程我们是可以利用线程池让多个channel并行处理来进一步提高性能。讲到线程我们可以联系一下Reactor的线程模型。
Reactor单线程模型:一个Reactor线程处理所有的连接的接入和读写操作,我们上面给出的代码例子就是这个模型。

Reactor多线程模型:由于单线程模型在面对大并发的情况下就显得力不从心了,所以我们在这里引入了线程池,利用多个线程来处理。具体就是我们只用一组线程来处理读写请求,毕竟读写是网络编程里最重要的两个操作,性能好不好,能不能抗住大并发很大的关键在这里;然后我们利用一个线程专门负责处理客户端的连接操作,注意这里只用了一个线程,连接建立起来之后,将channel注册到线程池中的某一个线程的多路复用器上。好,我们一句话总结:前面一个Reacotr线程处理所有的连接建立操作,确认连接后将Channel注册到后面的Reactor线程池的某个线程上,这样大量的读写操作被平均分配到了多个线程上处理,提高了吞吐量。

主从Reactor多线程模型:在多线程模型中,我们只用了一个线程来处理客户端的登陆、握手和安全认证,在一些特殊场景下还是回存在一些性能问题,于是进一步改造把acceptor的线程也改造成线程池,让多个线程来处理客户端接入。

我们把Reactor线程模型简单介绍了一下,这是事件循环的模型。我们上面提出的问题是,某个事件循环里selector.selectedKeys()
这个操作hi返回一系列可用的channel,这些channel的处理可以使用线程池来并发执行吗?这个问题在上面的三个Reactor线程模型中还暂时得不到答案,我们在源码中看一看。
由于netty是对Java NIO的封装所以一上来我的思路是把使用java API写的NIO程序中的每一个步骤看看是在netty的哪个地方实现的,这样一圈下来就能基本搞明白netty的流程。我们重点看Reactor线程模型和事件机制,netty的事件是在一条pipeline中传播的,每个channel都会有一条pipeline,pipeline里第一个是head,最后一个是tail,中间是一系列自定义的处理器,如果一个处理器可以处理,那么事件传递到这里结束,否则继续传递给下一个handle。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } p.addLast(new EchoServerHandler()); } }); ChannelFuture f = b.bind(PORT).sync(); f.channel().closeFuture().sync();
|
以上是netty的最基本的使用方式,首先我们创建了两个EventLoopGroup
,对应于Reactor线程模型,第一个是Acceptor用于连接的,第二个是负责读写操作的线程池。深入到EventLoopGroup
的构造器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { children[i] = newChild(executor, args); } chooser = chooserFactory.newChooser(children); }
protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); }
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } if (strategy == null) { throw new NullPointerException("selectStrategy"); } provider = selectorProvider; final SelectorTuple selectorTuple = openSelector(); selector = selectorTuple.selector; unwrappedSelector = selectorTuple.unwrappedSelector; selectStrategy = strategy; }
|
接下来看看,ServerBootstrap
的构造,利用构建器模式设置它的属性
1 2 3 4 5 6 7 8 9 10
| public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { super.group(parentGroup); this.childGroup = childGroup; return this; }
handler(new LoggingHandler(LogLevel.INFO)) childHandler(new ChannelInitializer<SocketChannel>())
|
现在我们关注ChannelFuture f = b.bind(PORT).sync()
的bind
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
| private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { promise.setFailure(cause); } else { promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
final ChannelFuture initAndRegister() { Channel channel = null; channel = channelFactory.newChannel(); init(channel); ChannelFuture regFuture = config().group().register(channel); return regFuture; } private static ServerSocketChannel newSocket(SelectorProvider provider) { return provider.openServerSocketChannel(); } void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e : attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; ChannelHandler test = new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } 这里的eventLoop是channel注册时的那个eventloop(每个eventloop里都有一个selector) 这个execute 并没有立马被执行,这个Runnable被扔到了task队列里 */ ch.eventLoop().execute(new Runnable() { @Override public void run() { * ServerBootstrapAcceptor是一个acceptor * 当前的这条pipeline是serversocketchannel的,serversocketchannel主要是accept一个连接用的, * 它不处理childhandler,而且这个serversocketchannel对应的是bosseventloop,这个eventloop只处理新连接的接入 * * 在新的连接接入的时候调用这条pipeline(serversocketchannel的pipeline), * 在这条pipeline里我们完成了serversocket.accept(),建立了连接得到的socketchannel,将他注册到了workeventloop上(这个eventloop关注读写操作) * 我们知道每个channel都会有一条pipeline,在ServerBootstrapAcceptor里我们给刚连接的这个socketchannel里的pipeline添加了childhandler来处理用户自定义的读写请求 * (连接建立起来的这个channel完成真正的读写,代码中我们也是在childhandler里添加处理读写的handler,) */ pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }; p.addLast(test); }
public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } pipeline本身是一个双向链表,通过addLast等方法添加handle 最终serversocketchannel的pipelinede 组成是这样的:headhandler--LoggerHandler--ServerBootstrapAcceptor--tailHandler 每一个建立起来的channel中的pipelinede 组成是这样的:headhandler--childrenhandler--tailHandler
|
Channel的init好后看看register()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| private void register0(ChannelPromise promise) { boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { eventLoop().selectNow(); selected = true; } else { throw e; } } } }
|
netty在何处开启事件循环呢?事件循环在NioEvevtLoop的run()
方法里,沿着调用链往上找我们找到了SingleThreadEventExecutor
的execute
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public void execute(Runnable task) { boolean inEventLoop = inEventLoop(); if (inEventLoop) { addTask(task); } else { startThread(); addTask(task); } } private void startThread() { if (state == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { doStartThread(); } } }
|
事件循环开始,我们看到run()
方法里的processSelectedKeys()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
| private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) { if (selectedKeys.isEmpty()) { return; } Iterator<SelectionKey> i = selectedKeys.iterator(); for (; ; ) { final SelectionKey k = i.next(); final Object a = k.attachment(); i.remove(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (!i.hasNext()) { break; } if (needsToSelectAgain) { selectAgain(); selectedKeys = selector.selectedKeys();å if (selectedKeys.isEmpty()) { break; } else { i = selectedKeys.iterator(); } } } }
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); try { int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { *此处监听读或者连接操作,这里的read()方法是个多态方法,有两个实现,niomessageunsafe 是 nioserversocketchannel 继承链上的,这里的read 变成了accept连接操作 * niobyteunsafe 是niosocketchannel 继承链上的,所以这里确实就是读取操作 */ unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } } protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; }
|
我们在源码里找到了在Java NIO里要做的每一步操作,在代码中也看到了创建的两个EventLoopGroup
的作用:一个接受连接,一个处理读写。最上面我提过一个想法,就是一个selector
得到多个selectedKeys
时,利用线程来并发处理,在netty源码里我们看到了,这些selectedKeys
依旧是for循环的方法串行执行的,为什么这样呢?
这个问题解释起来其实很简单,先看一个简单的例子,你只有一个核心的机器,有5个任务要跑,要求最快完成这5个任务,你是在一个线程上顺序跑这5个任务还是5个线程一起跑呢?当然是顺序跑啦,只有一个核,多线程微观上还是串行执行的嘛,而且还要引入线程切换的开销,所以说假设你的任务非常消耗CPU, 那么现在每个CPU都被占满了, 你再增加线程个数, 只能降低系统的效率, 因为线程还需要切换。所以我们在很多线程池的默认设置中看到N,N+1的线程数设置。我们的NIOEventLoopGroup
默认是线程数是CPU数*2,这些eventloop全部开启,我们的cpu其实已经在这些事件循环上跑满了,再开线程去跑具体的用户业务逻辑并不是好的做法(线程数默认是2N,这么做是因为eventloop是会阻塞的,这个时候cpu空闲了有能力处理其他的线程了,所以这里我们的线程数比核心数多,线程数=CPU核心数/(1-阻塞系数))。
这里简单的介绍了一下netty的源码,看的时间短设计的内容也有限,对于netty内部还有很多东西值得去琢磨。列一下过程中参考的资料:《netty权威指南》,《Netty5.0架构剖析和源码解读》。
以前主要的工作就是在Spring上堆业务代码,都是在http之上,netty不是因为自己要写一个rpc框架还真的就用不到,也就对网络编程知之甚少。最近看到的一些内容给我开了一扇门,线程,协程,事件驱动,异步这些都是高性能程序常常出现的名词,也会涉及很多操作系统层次的东西,程序员还是应该往下走,在上面堆业务还是进步太慢。