nettyITeye - 超凡娱乐

nettyITeye

2019-01-11 19:23:07 | 作者: 君昊 | 标签: 代码,保藏,办法 | 浏览: 1821

服务端发动的第一步必须先创立一个监听套接字ServerSocketChannel,该进程是由ChannelFuture f = b.bind(port)中的bind触发。下面详细剖析其进程:
  Bind源码如下,代码坐落ServerBootstrap的父类AbstractBootstrap
Java代码  保藏代码
//AbstractBootstrap 
public ChannelFuture bind(int inetPort) { 
  return bind(new InetSocketAddress(inetPort)); 

 
public ChannelFuture bind(SocketAddress localAddress) { 
  validate(); 
  if (localAddress == null) { 
  throw new NullPointerException("localAddress"); 
  } 
  return doBind(localAddress); 
  } 
  validate()办法的作用为:校验:bossGroup、BootstrapChannelFactory、childHandler非空。假如childGroup为空,则复用bossGroup,将bossGroup赋值给childGroup。

  接着来看doBind的逻辑:
Java代码  保藏代码
//AbstractBootstrap 
private ChannelFuture doBind(final SocketAddress localAddress) { 
  final ChannelFuture regPromise = initAndRegister(); 
  final Channel channel = regPromise.channel(); 
  final ChannelPromise promise = channel.newPromise(); 
  if (regPromise.isDone()) { 
  doBind0(regPromise, channel, localAddress, promise); 
  } else { 
  regPromise.addListener(new ChannelFutureListener() { 
  @Override 
  public void operationComplete(ChannelFuture future) throws Exception { 
  doBind0(future, channel, localAddress, promise); 
  } 
  }); 
  } 
  return promise; 
  } 
 
  要点剖析里边的initAndRegister()办法
Java代码  保藏代码
//AbstractBootstrap 
final ChannelFuture initAndRegister() { 
  final Channel channel = channelFactory().newChannel(); 
  try { 
  init(channel); 
  } catch (Throwable t) { 
  channel.unsafe().closeForcibly(); 
  return channel.newFailedFuture(t); 
  } 
 
  ChannelPromise regPromise = channel.newPromise(); 
  group().register(channel, regPromise); 
  if (regPromise.cause() != null) { 
  if (channel.isRegistered()) { 
  channel.close(); 
  } else { 
  channel.unsafe().closeForcibly(); 
  } 
  } 
  return regPromise; 
  } 
 
a)首要剖析以下代码:
Java代码  保藏代码
final Channel channel = channelFactory().newChannel() 
  channelFactory()办法回来之前创立的BootstrapChannelFactory,里边的newChannel()办法会依据反射创立一个ServerSocketChannel
Java代码  保藏代码
//BootstrapChannelFactory 
public T newChannel() { 
  try { 
  return clazz.newInstance(); 
  } catch (Throwable t) { 
  throw new ChannelException("Unable to create Channel from class " + clazz, t); 
  } 
  } 
  注:clazz是在服务端发动的这段代码(b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)中设置的。
  clazz.newInstance()会调用NioServerSocketChannel的默许结构函数
Java代码  保藏代码
// NioServerSocketChannel 
public NioServerSocketChannel() { 
  super(null, newSocket(), SelectionKey.OP_ACCEPT); 
  config = new DefaultServerSocketChannelConfig(this, javaChannel().socket()); 

private static ServerSocketChannel newSocket() { 
  try { 
  return ServerSocketChannel.open(); 
  } catch (IOException e) { 
  throw new ChannelException( 
  "Failed to open a server socket.", e); 
  } 
  } 
  留意newSocket中的这行代码:
Java代码  保藏代码
return ServerSocketChannel.open(); 
  此处便是服务端监听套接字ServerSocketChannel创立的当地。

  既然是运用NIO,那么设置创立的ServerSocketChannel为非堵塞是在哪个当地发作的呢?看下这行代码
Java代码  保藏代码
super(null, newSocket(), SelectionKey.OP_ACCEPT); 
  它会对NioServerSocketChannel的父类进行初始化:NioServerSocketChannel的父类是AbstractNioMessageChannel,其结构办法只是初始化其父类AbstractNioChannel,父类结构办法如下:
Java代码  保藏代码
//AbstractNioChannel 
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { 
  super(parent); 
  this.ch = ch; 
  this.readInterestOp = readInterestOp; 
  try { 
  ch.configureBlocking(false); 
  } catch (IOException e) { 
  try { 
  ch.close(); 
  } catch (IOException e2) { 
  if (logger.isWarnEnabled()) { 
  logger.warn( 
  "Failed to close a partially initialized socket.", e2); 
  } 
  } 
  throw new ChannelException("Failed to enter non-blocking mode.", e); 
  } 
  } 
  ch.configureBlocking(false)此处就将之前创立的ServerSocketChannel设置为非堵塞形式。
 
  该办法里还有三点需求留意:
  1、super(parent)会调用AbstractNioChannel的父类AbstractChannel的结构办法
Java代码  保藏代码
// AbstractChannel.java 
protected AbstractChannel(Channel parent) { 
  this.parent = parent; 
  unsafe = newUnsafe(); 
  pipeline = new DefaultChannelPipeline(this); 
  } 
  newUnsafe()是由子类AbstractNioMessageChannel完成的,里边实例化了一个内部类NioMessageUnsafe(注:该类很重要,里边界说了read办法,会触发accept的调用,后面临其要点剖析)。
  2、this.readInterestOp = readInterestOp:设置channel的ops为SelectionKey.OP_ACCEPT(值为16)
  3、pipeline = new DefaultChannelPipeline(this),创立作用于ServerSocketChannel的管道Pipeline
Java代码  保藏代码
// DefaultChannelPipeline 
public DefaultChannelPipeline(Channel channel) { 
  if (channel == null) { 
  throw new NullPointerException("channel"); 
  } 
  this.channel = channel; 
 
  TailHandler tailHandler = new TailHandler(); 
  tail = new DefaultChannelHandlerContext(this, null, generateName(tailHandler), tailHandler); 
 
  HeadHandler headHandler = new HeadHandler(channel.unsafe()); 
  head = new DefaultChannelHandlerContext(this, null, generateName(headHandler), headHandler); 
 
  head.next = tail; 
  tail.prev = head; 
  } 
  DefaultChannelPipeline保护了一个以DefaultChannelHandlerContext为元素的双向链表结构,Head是一个Outbound处理器,而tail是一个Inbound处理器。通过此过程后,管道中的处理器链表为:Head- tail。

b)再来剖析以下代码
Java代码  保藏代码
init(channel) 
  该办法由子类ServerBootstrap完成
Java代码  保藏代码
// ServerBootstrap.java 
void init(Channel channel) throws Exception { 
  final Map ChannelOption ? , Object options = options(); 
  synchronized (options) { 
  channel.config().setOptions(options); 
  } 
 
  final Map AttributeKey ? , Object attrs = attrs(); 
  synchronized (attrs) { 
  for (Entry AttributeKey ? , Object e: attrs.entrySet()) { 
  @SuppressWarnings("unchecked") 
  AttributeKey Object key = (AttributeKey Object ) e.getKey(); 
  channel.attr(key).set(e.getValue()); 
  } 
  } 
 
  ChannelPipeline p = channel.pipeline(); 
  if (handler() != null) { 
  p.addLast(handler()); 
  } 
 
  final EventLoopGroup currentChildGroup = childGroup; 
  final ChannelHandler currentChildHandler = childHandler; 
  final Entry ChannelOption ? , Object [] currentChildOptions; 
  final Entry AttributeKey ? , Object [] currentChildAttrs; 
  synchronized (childOptions) { 
  currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); 
  } 
  synchronized (childAttrs) { 
  currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); 
  } 
 
  p.addLast(new ChannelInitializer Channel () { 
  @Override 
  public void initChannel(Channel ch) throws Exception { 
  ch.pipeline().addLast(new ServerBootstrapAcceptor( 
  currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); 
  } 
  }); 
  } 
  该办法首要做了两件事:
  1、设置NioServerSocketChannel的options和attrs,并存储之后用于SocketChannel的options和attrs。
  2、为NioServerSocketChannel对应的管道添加一个Inbound处理器ChannelInitializer。通过此过程后,管道中的处理器链表为:head(outbound)- ChannelInitializer(inbound)- tail(inbound)。留意ChannelInitializer的完成办法initChannel,里边会当channelRegistered工作发作时将ServerBootstrapAcceptor加入到管道中。

c) 最终剖析 以下代码:
Java代码  保藏代码
group().register(channel, regPromise); 
  实践是调用MultithreadEventLoopGroup的register办法
Java代码  保藏代码
//MultithreadEventLoopGroup 
public ChannelFuture register(Channel channel, ChannelPromise promise) { 
  return next().register(channel, promise); 
  } 
  next办法从bossGroup中挑选一个EventExecutor(它实践是一个SingleThreadEventLoop),然后履行register办法
Java代码  保藏代码
//SingleThreadEventLoop 
public ChannelFuture register(final Channel channel, final ChannelPromise promise) { 
  if (channel == null) { 
  throw new NullPointerException("channel"); 
  } 
  if (promise == null) { 
  throw new NullPointerException("promise"); 
  } 
  channel.unsafe().register(this, promise); 
  return promise; 
  } 
  channel.unsafe().register(this, promise)这里会调用AbstractChannel的内部类AbstractUnsafe的register办法
Java代码  保藏代码
//AbstractUnsafe 
public final void register(EventLoop eventLoop, final ChannelPromise promise) { 
  if (eventLoop == null) { 
  throw new NullPointerException("eventLoop"); 
  } 
  if (isRegistered()) { 
  promise.setFailure(new IllegalStateException("registered to an event loop already")); 
  return; 
  } 
  if (!isCompatible(eventLoop)) { 
  promise.setFailure( 
  new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); 
  return; 
  } 
  AbstractChannel.this.eventLoop = eventLoop; 
  if (eventLoop.inEventLoop()) { 
  register0(promise); 
  } else { 
  try { 
  eventLoop.execute(new Runnable() { 
  @Override 
  public void run() { 
  register0(promise); 
  } 
  }); 
  } catch (Throwable t) { 
  closeForcibly(); 
  promise.setFailure(t); 
  } 
  } 
  } 
  此处敞开了eventloop中的线程(即发动了boss线程),并将register0使命加入到boss线程的行列中。通过此过程后,boss线程的使命行列仅含有一个使命,即register0使命,且正在被履行。
  接着剖析register0使命详细干了什么工作
Java代码  保藏代码
//AbstractUnsafe 
private void register0(ChannelPromise promise) { 
  try { 
  // check if the channel is still open as it could be closed in the mean time when the register 
  // call was outside of the eventLoop 
  if (!ensureOpen(promise)) { 
  return; 
  } 
  Runnable postRegisterTask = doRegister(); 
  registered = true; 
  promise.setSuccess(); 
  pipeline.fireChannelRegistered(); 
  if (postRegisterTask != null) { 
  postRegisterTask.run(); 
  } 
  if (isActive()) { 
  pipeline.fireChannelActive(); 
  } 
  } catch (Throwable t) { 
  // Close the channel directly to avoid FD leak. 
  closeForcibly(); 
  if (!promise.tryFailure(t)) { 
  logger.warn( 
  "Tried to fail the registration promise, but it is complete already. " + 
  "Swallowing the cause of the registration failure:", t); 
  } 
  closeFuture.setClosed(); 
  } 
  } 
 
protected Runnable doRegister() throws Exception { 
  boolean selected = false; 
  for (;;) { 
  try { 
  selectionKey = javaChannel().register(eventLoop().selector, 0, this); 
  return null; 
  } catch (CancelledKeyException e) { 
  if (!selected) { 
  eventLoop().selectNow(); 
  selected = true; 
  } else { 
  throw e; 
  } 
  } 
  } 
  } 
  看一下doRegister代码,看到这行代码了吧
Java代码  保藏代码
selectionKey = javaChannel().register(eventLoop().selector, 0, this); 
  它会调用java.nio.channels.spi. AbstractSelectableChannel的register办法, 将ServerSocketChannel、0、以及this注册到selector中并得到对应的selectionkey。
  接着再看register0办法中的promise.setSuccess(),将promise设置为success,就会触发异步回调,回调之前main函数地点的线程中为ChannelPromise添加的listner,即AbstractBootstrap的以下代码:
Java代码  保藏代码
//AbstractBootstrap.java 
private ChannelFuture doBind(final SocketAddress localAddress) { 
  final ChannelFuture regPromise = initAndRegister(); 
  final Channel channel = regPromise.channel(); 
  final ChannelPromise promise = channel.newPromise(); 
  if (regPromise.isDone()) { 
  doBind0(regPromise, channel, localAddress, promise); 
  } else { 
  regPromise.addListener(new ChannelFutureListener() { 
  @Override 
  public void operationComplete(ChannelFuture future) throws Exception { 
  doBind0(future, channel, localAddress, promise); 
  } 
  }); 
  } 
  return promise; 

 
private static void doBind0( 
  final ChannelFuture regFuture, final Channel channel, 
  final SocketAddress localAddress, final ChannelPromise promise) { 
 
  // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up 
  // the pipeline in its channelRegistered() implementation. 
 
  channel.eventLoop().execute(new Runnable() { 
  @Override 
  public void run() { 
  if (regFuture.isSuccess()) { 
  channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); 
  } else { 
  promise.setFailure(regFuture.cause()); 
  } 
  } 
  }); 
  } 
  通过此过程后,boss线程的使命行列数量由本来的1个添加到了2个,即正在履行的register0使命以及本次新增的bind使命。Bind使命鄙人一篇文章中进行剖析。

  再接着剖析register0使命中的此行代码pipeline.fireChannelRegistered()
Java代码  保藏代码
//DefaultChannelPipeline 
public ChannelPipeline fireChannelRegistered() { 
  head.fireChannelRegistered(); 
  return this; 
  } 

Java代码  保藏代码
//DefaultChannelHandlerContext 
public ChannelHandlerContext fireChannelRegistered() { 
  final DefaultChannelHandlerContext next = findContextInbound(); 
  EventExecutor executor = next.executor(); 
  if (executor.inEventLoop()) { 
  next.invokeChannelRegistered(); 
  } else { 
  executor.execute(new Runnable() { 
  @Override 
  public void run() { 
  next.invokeChannelRegistered(); 
  } 
  }); 
  } 
  return this; 

 
private void invokeChannelRegistered() { 
  try { 
  ((ChannelInboundHandler) handler).channelRegistered(this); 
  } catch (Throwable t) { 
  notifyHandlerException(t); 
  } 
  } 
  ChannelRegistered是一个Inbound工作,因此会依照head- tail的次序履行一切的inbound处理器,现在有三个处理器:head- ChannelInitializer - tail- ,ChannelInitializer和tail都是inbound处理器,所以看一下ChannelInitializer的invokeChannelRegistered办法.
Java代码  保藏代码
//ChannelInitializer 
public final void channelRegistered(ChannelHandlerContext ctx) 
  throws Exception { 
  boolean removed = false; 
  boolean success = false; 
  try { 
  initChannel((C) ctx.channel()); 
  ctx.pipeline().remove(this); 
  removed = true; 
  ctx.fireChannelRegistered(); 
  success = true; 
  } catch (Throwable t) { 
  logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t); 
  } finally { 
  if (!removed) { 
  ctx.pipeline().remove(this); 
  } 
  if (!success) { 
  ctx.close(); 
  } 
  } 
  } 

  该办法首要做了以下几件事:
  1、initChannel办法是在ServerBootstrap中履行Init办法时,实例化内部类ChannelInitializer完成的
Java代码  保藏代码
p.addLast(new ChannelInitializer Channel () { 
  @Override 
  public void initChannel(Channel ch) throws Exception { 
  ch.pipeline().addLast(new ServerBootstrapAcceptor( 
  currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); 
  } 
  }); 
  其功用便是将ServerBootstrapAcceptor作为一个inbound处理器加入到管道中,此刻的处理器链表为: Head- ChannelInitializer- ServerBootstrapAcceptor- tail
  2、 然后ChannelInitializer将自己从管道中删去,此刻的处理器链表变为:Head- ServerBootstrapAcceptor- tail
  3、 接着调用ServerBootstrapAcceptor和tail的channelRegistered办法,都没有做啥实质性的工作,最终以tail的空完成完毕。
 
  再剖析register0使命中的以下代码
Java代码  保藏代码
//AbstractUnsafe 
if (isActive()) 

  pipeline.fireChannelActive(); 

 
// NioServerSocketChannel 
public boolean isActive() { 
  return javaChannel().socket().isBound(); 
  } 
  因为关于监听套接字还没有履行bind操作,所以isActive回来false,不会履行pipeline.fireChannelActive()该行代码。履行完此代码后,register0使命就履行完了,boss线程中的使命行列中仅剩余bind使命。


  总结:initAndRegister()办法首要做了以下几件工作:
  1、 创立服务端监听套接字ServerSocketChannel
  2、 设置监听套接字为非堵塞
  3、 设置channel当时感兴趣的工作为SelectionKey.OP_ACCEPT(值为16)
  4、 创立作用于ServerSocketChannel的管道Pipeline,该管道中此刻的处理器链表为:Head(outbound)- tail(inbound)。
  5、 设置NioServerSocketChannel的options和attrs,并存储之后用于SocketChannel的options和attrs
  6、 为NioServerSocketChannel对应的管道添加一个Inbound处理器ChannelInitializer。通过此过程后,管道中的处理器链表为:head(outbound)- ChannelInitializer(inbound)- tail(inbound)。留意ChannelInitializer的完成办法initChannel,里边会当channelRegisgered工作发作时将ServerBootstrapAcceptor加入到管道中。
  7、 发动了boss线程,并将register0使命加入到boss线程的行列中。而register0做的工作为:将ServerSocketChannel、0、注册到selector中并得到对应的selectionkey。然后触发绑定端口的操作,将bind使命加入到boss线程的使命行列中,该内容鄙人一篇文章中剖析。
  8、通过channelRegistered工作,将ServerBootstrapAcceptor加入到管道中,并移除ChannelInitializer,通过此过程后,管道中的处理器链表为:head(outbound)- ServerBootstrapAcceptor (inbound)- tail(inbound)。
  管道从创立到现在这段时间内,处理器链表的改变前史为:
  head- tail
  head- ChannelInitializer(inbound)- tail
  head- ServerBootstrapAcceptor(inbound)- tail
版权声明
本文来源于网络,版权归原作者所有,其内容与观点不代表超凡娱乐立场。转载文章仅为传播更有价值的信息,如采编人员采编有误或者版权原因,请与我们联系,我们核实后立即修改或删除。

猜您喜欢的文章