Netty-04-Netty优化和源码 1. 优化 1.1 扩展序列化算法 序列化,反序列化主要用在消息正文的转换上
序列化时,需要将 Java 对象变为要传输的数据(可以是 byte[],或 json 等,最终都需要变成 byte[]) 反序列化时,需要将传入的正文数据还原成 Java 对象,便于处理 目前的代码仅支持 Java 自带的序列化,反序列化机制,核心代码如下
1 2 3 4 5 6 7 8 9 10 11 byte [] body = new  byte [bodyLength];ObjectInputStream  in  =  new  ObjectInputStream (new  ByteArrayInputStream (body));Message  message  =  (Message) in.readObject();ByteArrayOutputStream  out  =  new  ByteArrayOutputStream ();new  ObjectOutputStream (out).writeObject(message);byte [] bytes = out.toByteArray();
为了支持更多序列化算法,抽象一个 Serializer 接口
1 2 3 4 5 6 7 8 9 public  interface  Serializer  {deserialize (Class<T> clazz, byte [] bytes) ;byte [] serialize(T object);
提供两个实现,我这里直接将实现加入了枚举类 Serializer.Algorithm 中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 enum  SerializerAlgorithm  implements  Serializer  {@Override public  <T> T deserialize (Class<T> clazz, byte [] bytes)  {try  {ObjectInputStream  in  =  new  ObjectInputStream (new  ByteArrayInputStream (bytes));Object  object  =  in.readObject();return  (T) object;catch  (IOException | ClassNotFoundException e) {throw  new  RuntimeException ("SerializerAlgorithm.Java 反序列化错误" , e);@Override public  <T> byte [] serialize(T object) {try  {ByteArrayOutputStream  out  =  new  ByteArrayOutputStream ();new  ObjectOutputStream (out).writeObject(object);return  out.toByteArray();catch  (IOException e) {throw  new  RuntimeException ("SerializerAlgorithm.Java 序列化错误" , e);@Override public  <T> T deserialize (Class<T> clazz, byte [] bytes)  {return  new  Gson ().fromJson(new  String (bytes, StandardCharsets.UTF_8), clazz);@Override public  <T> byte [] serialize(T object) {return  new  Gson ().toJson(object).getBytes(StandardCharsets.UTF_8);public  static  SerializerAlgorithm getByInt (int  type)  {if  (type < 0  || type > array.length - 1 ) {throw  new  IllegalArgumentException ("超过 SerializerAlgorithm 范围" );return  array[type];
增加配置类和配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public  abstract  class  Config  {static  Properties properties;static  {try  (InputStream  in  =  Config.class.getResourceAsStream("/application.properties" )) {new  Properties ();catch  (IOException e) {throw  new  ExceptionInInitializerError (e);public  static  int  getServerPort ()  {String  value  =  properties.getProperty("server.port" );if (value == null ) {return  8080 ;else  {return  Integer.parseInt(value);public  static  Serializer.Algorithm getSerializerAlgorithm ()  {String  value  =  properties.getProperty("serializer.algorithm" );if (value == null ) {return  Serializer.Algorithm.Java;else  {return  Serializer.Algorithm.valueOf(value);
配置文件
1 serializer.algorithm =Json 
修改编解码器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 public  class  MessageCodecSharable  extends  MessageToMessageCodec <ByteBuf, Message> {@Override public  void  encode (ChannelHandlerContext ctx, Message msg, List<Object> outList)  throws  Exception {ByteBuf  out  =  ctx.alloc().buffer();new  byte []{1 , 2 , 3 , 4 });1 );0xff );byte [] bytes = Config.getSerializerAlgorithm().serialize(msg);@Override protected  void  decode (ChannelHandlerContext ctx, ByteBuf in, List<Object> out)  throws  Exception {int  magicNum  =  in.readInt();byte  version  =  in.readByte();byte  serializerAlgorithm  =  in.readByte(); byte  messageType  =  in.readByte(); int  sequenceId  =  in.readInt();int  length  =  in.readInt();byte [] bytes = new  byte [length];0 , length);Algorithm  algorithm  =  Serializer.Algorithm.values()[serializerAlgorithm];extends  Message > messageClass = Message.getMessageClass(messageType);Message  message  =  algorithm.deserialize(messageClass, bytes);
其中确定具体消息类型,可以根据 消息类型字节 获取到对应的 消息 class
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 @Data public  abstract  class  Message  implements  Serializable  {public  static  Class<? extends  Message > getMessageClass(int  messageType) {return  messageClasses.get(messageType);private  int  sequenceId;private  int  messageType;public  abstract  int  getMessageType () ;public  static  final  int  LoginRequestMessage  =  0 ;public  static  final  int  LoginResponseMessage  =  1 ;public  static  final  int  ChatRequestMessage  =  2 ;public  static  final  int  ChatResponseMessage  =  3 ;public  static  final  int  GroupCreateRequestMessage  =  4 ;public  static  final  int  GroupCreateResponseMessage  =  5 ;public  static  final  int  GroupJoinRequestMessage  =  6 ;public  static  final  int  GroupJoinResponseMessage  =  7 ;public  static  final  int  GroupQuitRequestMessage  =  8 ;public  static  final  int  GroupQuitResponseMessage  =  9 ;public  static  final  int  GroupChatRequestMessage  =  10 ;public  static  final  int  GroupChatResponseMessage  =  11 ;public  static  final  int  GroupMembersRequestMessage  =  12 ;public  static  final  int  GroupMembersResponseMessage  =  13 ;public  static  final  int  PingMessage  =  14 ;public  static  final  int  PongMessage  =  15 ;private  static  final  Map<Integer, Class<? extends  Message >> messageClasses = new  HashMap <>();static  {
1.2 参数调优 1)CONNECT_TIMEOUT_MILLIS 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Slf4j public  class  TestConnectionTimeout  {public  static  void  main (String[] args)  {NioEventLoopGroup  group  =  new  NioEventLoopGroup ();try  {Bootstrap  bootstrap  =  new  Bootstrap ()300 )new  LoggingHandler ());ChannelFuture  future  =  bootstrap.connect("127.0.0.1" , 8080 );catch  (Exception e) {"timeout" );finally  {
另外源码部分 io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Override public  final  void  connect (         final  SocketAddress remoteAddress, final  SocketAddress localAddress, final  ChannelPromise promise)  {int  connectTimeoutMillis  =  config().getConnectTimeoutMillis();if  (connectTimeoutMillis > 0 ) {new  Runnable () {@Override public  void  run ()  {                ChannelPromise  connectPromise  =  AbstractNioChannel.this .connectPromise;ConnectTimeoutException  cause  = new  ConnectTimeoutException ("connection timed out: "  + remoteAddress); if  (connectPromise != null  && connectPromise.tryFailure(cause)) {
2)SO_BACKLOG 属于 ServerSocketChannal 参数 
第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列 第二次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server 第三次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue 其中
netty 中
可以通过 option(ChannelOption.SO_BACKLOG, 值) 来设置大小
可以通过下面源码查看默认大小
1 2 3 4 5 6 public  class  DefaultServerSocketChannelConfig  extends  DefaultChannelConfig implements  ServerSocketChannelConfig  {private  volatile  int  backlog  =  NetUtil.SOMAXCONN;
课堂调试关键断点为:io.netty.channel.nio.NioEventLoop#processSelectedKey
oio 中更容易说明,不用 debug 模式
1 2 3 4 5 6 7 8 public  class  Server  {public  static  void  main (String[] args)  throws  IOException {ServerSocket  ss  =  new  ServerSocket (8888 , 2 );Socket  accept  =  ss.accept();
客户端启动 4 个
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public  class  Client  {public  static  void  main (String[] args)  throws  IOException {try  {Socket  s  =  new  Socket ();new  Date ()+" connecting..." );new  InetSocketAddress ("localhost" , 8888 ),1000 );new  Date ()+" connected..." );1 );catch  (IOException e) {new  Date ()+" connecting timeout..." );
第 1,2,3 个客户端都打印,但除了第一个处于 accpet 外,其它两个都处于 accept queue 中
1 2 Tue Apr 21  20 :30 :28  CST 2020  connecting...21  20 :30 :28  CST 2020  connected...
第 4 个客户端连接时
1 2 3 Tue Apr 21 20:53:58 CST 2020 connecting...
3)ulimit -n 4)TCP_NODELAY 5)SO_SNDBUF & SO_RCVBUF SO_SNDBUF 属于 SocketChannal 参数 SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数(建议设置到 ServerSocketChannal 上) 6)ALLOCATOR 属于 SocketChannal 参数 用来分配 ByteBuf, ctx.alloc() 7)RCVBUF_ALLOCATOR 属于 SocketChannal 参数 控制 netty 接收缓冲区大小 负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定 1.3 RPC 框架 1)准备工作 这些代码可以认为是现成的,无需从头编写练习
为了简化起见,在原来聊天项目的基础上新增 Rpc 请求和响应消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Data public  abstract  class  Message  implements  Serializable  {public  static  final  int  RPC_MESSAGE_TYPE_REQUEST  =  101 ;public  static  final  int   RPC_MESSAGE_TYPE_RESPONSE  =  102 ;static  {
请求消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @Getter @ToString(callSuper = true) public  class  RpcRequestMessage  extends  Message  {private  String interfaceName;private  String methodName;private  Class<?> returnType;private  Class[] parameterTypes;private  Object[] parameterValue;public  RpcRequestMessage (int  sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue)  {super .setSequenceId(sequenceId);this .interfaceName = interfaceName;this .methodName = methodName;this .returnType = returnType;this .parameterTypes = parameterTypes;this .parameterValue = parameterValue;@Override public  int  getMessageType ()  {return  RPC_MESSAGE_TYPE_REQUEST;
响应消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Data @ToString(callSuper = true) public  class  RpcResponseMessage  extends  Message  {private  Object returnValue;private  Exception exceptionValue;@Override public  int  getMessageType ()  {return  RPC_MESSAGE_TYPE_RESPONSE;
服务器架子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Slf4j public  class  RpcServer  {public  static  void  main (String[] args)  {NioEventLoopGroup  boss  =  new  NioEventLoopGroup ();NioEventLoopGroup  worker  =  new  NioEventLoopGroup ();LoggingHandler  LOGGING_HANDLER  =  new  LoggingHandler (LogLevel.DEBUG);MessageCodecSharable  MESSAGE_CODEC  =  new  MessageCodecSharable ();RpcRequestMessageHandler  RPC_HANDLER  =  new  RpcRequestMessageHandler ();try  {ServerBootstrap  serverBootstrap  =  new  ServerBootstrap ();new  ChannelInitializer <SocketChannel>() {@Override protected  void  initChannel (SocketChannel ch)  throws  Exception {new  ProcotolFrameDecoder ());Channel  channel  =  serverBootstrap.bind(8080 ).sync().channel();catch  (InterruptedException e) {"server error" , e);finally  {
客户端架子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public  class  RpcClient  {public  static  void  main (String[] args)  {NioEventLoopGroup  group  =  new  NioEventLoopGroup ();LoggingHandler  LOGGING_HANDLER  =  new  LoggingHandler (LogLevel.DEBUG);MessageCodecSharable  MESSAGE_CODEC  =  new  MessageCodecSharable ();RpcResponseMessageHandler  RPC_HANDLER  =  new  RpcResponseMessageHandler ();try  {Bootstrap  bootstrap  =  new  Bootstrap ();new  ChannelInitializer <SocketChannel>() {@Override protected  void  initChannel (SocketChannel ch)  throws  Exception {new  ProcotolFrameDecoder ());Channel  channel  =  bootstrap.connect("localhost" , 8080 ).sync().channel();catch  (Exception e) {"client error" , e);finally  {
服务器端的 service 获取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public  class  ServicesFactory  {static  Properties properties;static  Map<Class<?>, Object> map = new  ConcurrentHashMap <>();static  {try  (InputStream  in  =  Config.class.getResourceAsStream("/application.properties" )) {new  Properties ();for  (String name : names) {if  (name.endsWith("Service" )) {catch  (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {throw  new  ExceptionInInitializerError (e);public  static  <T> T getService (Class<T> interfaceClass)  {return  (T) map.get(interfaceClass);
相关配置 application.properties
1 2 serializer.algorithm=Json
2)服务器 handler 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Slf4j @ChannelHandler .Sharablepublic  class  RpcRequestMessageHandler  extends  SimpleChannelInboundHandler <RpcRequestMessage> {@Override protected  void  channelRead0 (ChannelHandlerContext ctx, RpcRequestMessage message)  {RpcResponseMessage  response  =  new  RpcResponseMessage ();try  {HelloService  service  =  (HelloService)Method  method  =  service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());Object  invoke  =  method.invoke(service, message.getParameterValue());catch  (Exception e) {
3)客户端代码第一版 只发消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 @Slf4j public  class  RpcClient  {public  static  void  main (String[] args)  {NioEventLoopGroup  group  =  new  NioEventLoopGroup ();LoggingHandler  LOGGING_HANDLER  =  new  LoggingHandler (LogLevel.DEBUG);MessageCodecSharable  MESSAGE_CODEC  =  new  MessageCodecSharable ();RpcResponseMessageHandler  RPC_HANDLER  =  new  RpcResponseMessageHandler ();try  {Bootstrap  bootstrap  =  new  Bootstrap ();new  ChannelInitializer <SocketChannel>() {@Override protected  void  initChannel (SocketChannel ch)  throws  Exception {new  ProcotolFrameDecoder ());Channel  channel  =  bootstrap.connect("localhost" , 8080 ).sync().channel();ChannelFuture  future  =  channel.writeAndFlush(new  RpcRequestMessage (1 ,"cn.itcast.server.service.HelloService" ,"sayHello" ,new  Class []{String.class},new  Object []{"张三" }if  (!promise.isSuccess()) {Throwable  cause  =  promise.cause();"error" , cause);catch  (Exception e) {"client error" , e);finally  {
4)客户端 handler 第一版 1 2 3 4 5 6 7 8 @Slf4j @ChannelHandler .Sharablepublic  class  RpcResponseMessageHandler  extends  SimpleChannelInboundHandler <RpcResponseMessage> {@Override protected  void  channelRead0 (ChannelHandlerContext ctx, RpcResponseMessage msg)  throws  Exception {"{}" , msg);
5)客户端代码 第二版 包括 channel 管理,代理,接收结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 @Slf4j public  class  RpcClientManager  {public  static  void  main (String[] args)  {HelloService  service  =  getProxyService(HelloService.class);"zhangsan" ));public  static  <T> T getProxyService (Class<T> serviceClass)  {ClassLoader  loader  =  serviceClass.getClassLoader();new  Class []{serviceClass};Object  o  =  Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) -> {int  sequenceId  =  SequenceIdGenerator.nextId();RpcRequestMessage  msg  =  new  RpcRequestMessage (new  DefaultPromise <>(getChannel().eventLoop());if (promise.isSuccess()) {return  promise.getNow();else  {throw  new  RuntimeException (promise.cause());return  (T) o;private  static  Channel  channel  =  null ;private  static  final  Object  LOCK  =  new  Object ();public  static  Channel getChannel ()  {if  (channel != null ) {return  channel;synchronized  (LOCK) { if  (channel != null ) { return  channel;return  channel;private  static  void  initChannel ()  {NioEventLoopGroup  group  =  new  NioEventLoopGroup ();LoggingHandler  LOGGING_HANDLER  =  new  LoggingHandler (LogLevel.DEBUG);MessageCodecSharable  MESSAGE_CODEC  =  new  MessageCodecSharable ();RpcResponseMessageHandler  RPC_HANDLER  =  new  RpcResponseMessageHandler ();Bootstrap  bootstrap  =  new  Bootstrap ();new  ChannelInitializer <SocketChannel>() {@Override protected  void  initChannel (SocketChannel ch)  throws  Exception {new  ProcotolFrameDecoder ());try  {"localhost" , 8080 ).sync().channel();catch  (Exception e) {"client error" , e);
6)客户端 handler 第二版 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Slf4j @ChannelHandler .Sharablepublic  class  RpcResponseMessageHandler  extends  SimpleChannelInboundHandler <RpcResponseMessage> {public  static  final  Map<Integer, Promise<Object>> PROMISES = new  ConcurrentHashMap <>();@Override protected  void  channelRead0 (ChannelHandlerContext ctx, RpcResponseMessage msg)  throws  Exception {"{}" , msg);if  (promise != null ) {Object  returnValue  =  msg.getReturnValue();Exception  exceptionValue  =  msg.getExceptionValue();if (exceptionValue != null ) {else  {
2. 源码分析 2.1 启动剖析 我们就来看看 netty 中对下面的代码是怎样进行处理的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 Selector  selector  =  Selector.open(); NioServerSocketChannel  attachment  =  new  NioServerSocketChannel ();ServerSocketChannel  serverSocketChannel  =  ServerSocketChannel.open(); false );SelectionKey  selectionKey  =  serverSocketChannel.register(selector, 0 , attachment);new  InetSocketAddress (8080 ));
入口 io.netty.bootstrap.ServerBootstrap#bind
关键代码 io.netty.bootstrap.AbstractBootstrap#doBind
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 private  ChannelFuture doBind (final  SocketAddress localAddress)  {final  ChannelFuture  regFuture  =  initAndRegister();final  Channel  channel  =  regFuture.channel();if  (regFuture.cause() != null ) {return  regFuture;if  (regFuture.isDone()) {ChannelPromise  promise  =  channel.newPromise();return  promise;else  {final  PendingRegistrationPromise  promise  =  new  PendingRegistrationPromise (channel);new  ChannelFutureListener () {@Override public  void  operationComplete (ChannelFuture future)  throws  Exception {Throwable  cause  =  future.cause();if  (cause != null ) {else  {return  promise;
关键代码 io.netty.bootstrap.AbstractBootstrap#initAndRegister
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 final  ChannelFuture initAndRegister ()  {Channel  channel  =  null ;try  {catch  (Throwable t) {return  new  DefaultChannelPromise (new  FailedChannel (), GlobalEventExecutor.INSTANCE).setFailure(t);ChannelFuture  regFuture  =  config().group().register(channel);if  (regFuture.cause() != null ) {return  regFuture;
关键代码 io.netty.bootstrap.ServerBootstrap#init
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 void  init (Channel channel)  throws  Exception {final  Map<ChannelOption<?>, Object> options = options0();synchronized  (options) {final  Map<AttributeKey<?>, Object> attrs = attrs0();synchronized  (attrs) {for  (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked") ChannelPipeline  p  =  channel.pipeline();final  EventLoopGroup  currentChildGroup  =  childGroup;final  ChannelHandler  currentChildHandler  =  childHandler;final  Entry<ChannelOption<?>, Object>[] currentChildOptions;final  Entry<AttributeKey<?>, Object>[] currentChildAttrs;synchronized  (childOptions) {0 ));synchronized  (childAttrs) {0 ));new  ChannelInitializer <Channel>() {@Override public  void  initChannel (final  Channel ch)  throws  Exception {final  ChannelPipeline  pipeline  =  ch.pipeline();ChannelHandler  handler  =  config.handler();if  (handler != null ) {new  Runnable () {@Override public  void  run ()  {new  ServerBootstrapAcceptor (
关键代码 io.netty.channel.AbstractChannel.AbstractUnsafe#register
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public  final  void  register (EventLoop eventLoop, final  ChannelPromise promise)  {this .eventLoop = eventLoop;if  (eventLoop.inEventLoop()) {else  {try  {new  Runnable () {@Override public  void  run ()  {catch  (Throwable t) {
io.netty.channel.AbstractChannel.AbstractUnsafe#register0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 private  void  register0 (ChannelPromise promise)  {try  {if  (!promise.setUncancellable() || !ensureOpen(promise)) {return ;boolean  firstRegistration  =  neverRegistered;false ;true ;if  (isActive()) {if  (firstRegistration) {else  if  (config().isAutoRead()) {catch  (Throwable t) {
关键代码 io.netty.channel.ChannelInitializer#initChannel
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private  boolean  initChannel (ChannelHandlerContext ctx)  throws  Exception {if  (initMap.add(ctx)) { try  {catch  (Throwable cause) {finally  {ChannelPipeline  pipeline  =  ctx.pipeline();if  (pipeline.context(this ) != null ) {this );return  true ;return  false ;
关键代码 io.netty.bootstrap.AbstractBootstrap#doBind0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private  static  void  doBind0 (         final  ChannelFuture regFuture, final  Channel channel,         final  SocketAddress localAddress, final  ChannelPromise promise)  {new  Runnable () {@Override public  void  run ()  {if  (regFuture.isSuccess()) {else  {
关键代码 io.netty.channel.AbstractChannel.AbstractUnsafe#bind
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public  final  void  bind (final  SocketAddress localAddress, final  ChannelPromise promise)  {if  (!promise.setUncancellable() || !ensureOpen(promise)) {return ;if  (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&instanceof  InetSocketAddress &&boolean  wasActive  =  isActive();try  {catch  (Throwable t) {return ;if  (!wasActive && isActive()) {new  Runnable () {@Override public  void  run ()  {
3.3 关键代码 io.netty.channel.socket.nio.NioServerSocketChannel#doBind
1 2 3 4 5 6 7 protected  void  doBind (SocketAddress localAddress)  throws  Exception {if  (PlatformDependent.javaVersion() >= 7 ) {else  {
3.4 关键代码 io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive
1 2 3 4 5 public  void  channelActive (ChannelHandlerContext ctx)  {
关键代码 io.netty.channel.nio.AbstractNioChannel#doBeginRead
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 protected  void  doBeginRead ()  throws  Exception {final  SelectionKey  selectionKey  =  this .selectionKey;if  (!selectionKey.isValid()) {return ;true ;final  int  interestOps  =  selectionKey.interestOps();if  ((interestOps & readInterestOp) == 0 ) {
2.2 NioEventLoop 剖析 NioEventLoop 线程不仅要处理 IO 事件,还要处理 Task(包括普通任务和定时任务),
提交任务代码 io.netty.util.concurrent.SingleThreadEventExecutor#execute
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public  void  execute (Runnable task)  {if  (task == null ) {throw  new  NullPointerException ("task" );boolean  inEventLoop  =  inEventLoop();if  (!inEventLoop) {if  (isShutdown()) {if  (!addTaskWakesUp && wakesUpForTask(task)) {
唤醒 select 阻塞线程io.netty.channel.nio.NioEventLoop#wakeup
1 2 3 4 5 6 @Override protected  void  wakeup (boolean  inEventLoop)  {if  (!inEventLoop && wakenUp.compareAndSet(false , true )) {
启动 EventLoop 主循环 io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private  void  doStartThread ()  {assert  thread == null ;new  Runnable () {@Override public  void  run ()  {if  (interrupted) {boolean  success  =  false ;try  {this .run();true ;catch  (Throwable t) {"Unexpected exception from an event executor: " , t);finally  {
io.netty.channel.nio.NioEventLoop#run 主要任务是执行死循环,不断看有没有新任务,有没有 IO 事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 protected  void  run ()  {for  (;;) {try  {try  {switch  (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case  SelectStrategy.CONTINUE:continue ;case  SelectStrategy.BUSY_WAIT:case  SelectStrategy.SELECT:boolean  oldWakenUp  =  wakenUp.getAndSet(false );if  (wakenUp.get()) {default :catch  (IOException e) {continue ;0 ;false ;final  int  ioRatio  =  this .ioRatio;if  (ioRatio == 100 ) {try  {finally  {else  {                final  long  ioStartTime  =  System.nanoTime();try  {finally  {final  long  ioTime  =  System.nanoTime() - ioStartTime;100  - ioRatio) / ioRatio);catch  (Throwable t) {try  {if  (isShuttingDown()) {if  (confirmShutdown()) {return ;catch  (Throwable t) {
⚠️ 注意 这里有个费解的地方就是 wakeup,它既可以由提交任务的线程来调用(比较好理解),也可以由 EventLoop 线程来调用(比较费解),这里要知道 wakeup 方法的效果:
由非 EventLoop 线程调用,会唤醒当前在执行 select 阻塞的 EventLoop 线程 由 EventLoop 自己调用,会本次的 wakeup 会取消下一次的 select 操作 参考下图
io.netty.channel.nio.NioEventLoop#select
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 private  void  select (boolean  oldWakenUp)  throws  IOException {Selector  selector  =  this .selector;try  {int  selectCnt  =  0 ;long  currentTimeNanos  =  System.nanoTime();long  selectDeadLineNanos  =  currentTimeNanos + delayNanos(currentTimeNanos);for  (;;) {long  timeoutMillis  =  (selectDeadLineNanos - currentTimeNanos + 500000L ) / 1000000L ;if  (timeoutMillis <= 0 ) {if  (selectCnt == 0 ) {1 ;break ;if  (hasTasks() && wakenUp.compareAndSet(false , true )) {1 ;break ;int  selectedKeys  =  selector.select(timeoutMillis);if  (selectedKeys != 0  || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {break ;if  (Thread.interrupted()) {1 ;break ;long  time  =  System.nanoTime();if  (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {1 ;else  if  (SELECTOR_AUTO_REBUILD_THRESHOLD > 0  &&1 ;break ;if  (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {catch  (CancelledKeyException e) {
处理 keys io.netty.channel.nio.NioEventLoop#processSelectedKeys
1 2 3 4 5 6 7 8 9 private  void  processSelectedKeys ()  {if  (selectedKeys != null ) {else  {
io.netty.channel.nio.NioEventLoop#processSelectedKey
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 private  void  processSelectedKey (SelectionKey k, AbstractNioChannel ch)  {final  AbstractNioChannel.NioUnsafe  unsafe  =  ch.unsafe();if  (!k.isValid()) {return ;try  {int  readyOps  =  k.readyOps();if  ((readyOps & SelectionKey.OP_CONNECT) != 0 ) {int  ops  =  k.interestOps();if  ((readyOps & SelectionKey.OP_WRITE) != 0 ) {if  ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0  || readyOps == 0 ) {catch  (CancelledKeyException ignored) {
2.3 accept 剖析 nio 中如下代码,在 netty 中的流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 while  (iter.hasNext()) {    SelectionKey  key  =  iter.next();if  (key.isAcceptable()) {SocketChannel  channel  =  serverSocketChannel.accept();false );
先来看可接入事件处理(accept)
io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 public  void  read ()  {assert  eventLoop () .inEventLoop();final  ChannelConfig  config  =  config();final  ChannelPipeline  pipeline  =  pipeline();    final  RecvByteBufAllocator.Handle  allocHandle  =  unsafe().recvBufAllocHandle();boolean  closed  =  false ;Throwable  exception  =  null ;try  {try  {do  {int  localRead  =  doReadMessages(readBuf);if  (localRead == 0 ) {break ;if  (localRead < 0 ) {true ;break ;while  (allocHandle.continueReading());catch  (Throwable t) {int  size  =  readBuf.size();for  (int  i  =  0 ; i < size; i ++) {false ;if  (exception != null ) {if  (closed) {true ;if  (isOpen()) {finally  {if  (!readPending && !config.isAutoRead()) {
关键代码 io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public  void  channelRead (ChannelHandlerContext ctx, Object msg)  {final  Channel  child  =  (Channel) msg;for  (Entry<AttributeKey<?>, Object> e: childAttrs) {try  {new  ChannelFutureListener () {@Override public  void  operationComplete (ChannelFuture future)  throws  Exception {if  (!future.isSuccess()) {catch  (Throwable t) {
又回到了熟悉的 io.netty.channel.AbstractChannel.AbstractUnsafe#register 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public  final  void  register (EventLoop eventLoop, final  ChannelPromise promise)  {this .eventLoop = eventLoop;if  (eventLoop.inEventLoop()) {else  {try  {new  Runnable () {@Override public  void  run ()  {catch  (Throwable t) {
io.netty.channel.AbstractChannel.AbstractUnsafe#register0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private  void  register0 (ChannelPromise promise)  {try  {if  (!promise.setUncancellable() || !ensureOpen(promise)) {return ;boolean  firstRegistration  =  neverRegistered;false ;true ;if  (isActive()) {if  (firstRegistration) {else  if  (config().isAutoRead()) {catch  (Throwable t) {
回到了熟悉的代码 io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive
1 2 3 4 5 public  void  channelActive (ChannelHandlerContext ctx)  {
io.netty.channel.nio.AbstractNioChannel#doBeginRead
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 protected  void  doBeginRead ()  throws  Exception {final  SelectionKey  selectionKey  =  this .selectionKey;if  (!selectionKey.isValid()) {return ;true ;final  int  interestOps  =  selectionKey.interestOps();if  ((interestOps & readInterestOp) == 0 ) {
2.4 read 剖析 再来看可读事件 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read,注意发送的数据未必能够一次读完,因此会触发多次 nio read 事件,一次事件内会触发多次 pipeline read,一次事件会触发一次 pipeline read complete
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public  final  void  read ()  {final  ChannelConfig  config  =  config();if  (shouldBreakReadReady(config)) {return ;final  ChannelPipeline  pipeline  =  pipeline();final  ByteBufAllocator  allocator  =  config.getAllocator();final  RecvByteBufAllocator.Handle  allocHandle  =  recvBufAllocHandle();ByteBuf  byteBuf  =  null ;boolean  close  =  false ;try  {do  {if  (allocHandle.lastBytesRead() <= 0 ) {null ;0 ;if  (close) {false ;break ;1 );false ;null ;while  (allocHandle.continueReading());if  (close) {catch  (Throwable t) {finally  {if  (!readPending && !config.isAutoRead()) {
io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle#continueReading(io.netty.util.UncheckedBooleanSupplier)
1 2 3 4 5 6 7 8 9 10 11 12 public  boolean  continueReading (UncheckedBooleanSupplier maybeMoreDataSupplier)  {return  0 ;