简介
Netty是一个异步网络IO框架,简单说就是帮我们专注于收发包的细节且提供高性能IO复用,在恰当的时候(如Tcp收满一个逻辑包后),回调我们的业务逻辑代码。
Netty基于NIO,整个Netty API体系都是异步的(可通过callback被动通知模式、Future主动通知模式进行后续的业务逻辑操作)。
组件介绍
Channels
代表一次具体的管道连接(文件设备或Socket),连接着两端的的读写。提供了基础的IO功能,如bind,connect,read,write等。使用Channel比Java的Socket更简单。
有如下状态,当状态变化之后,就会出发相应的事件,最终ChannelPipeline调用他的ChannelHandlers进行处理。
- ChannelUnregistered Channel创建,但是没有被注册到EventLoop中
- ChannelRegistered Channel被注册到EventLoop中
- ChannelActive Channel处于Active态,可能被读写
- ChannelInactive Channel处于非Active态
EventLoop
EventLoopGroup包含多个EventLoop。EventLoop在生命周期内被绑定到一个线程上,Channel在生命周期内与一个EventLoop绑定,多个Channel可以被绑定在同一个EventLoop上。
Handler
当特定的网络事件发生后,相应的业务逻辑Handler就会被执行。
ChannelPipeline是一个ChannelHandler链条,用来传播处理inbound和outbound事件,执行顺序与被添加到pipeline中相同(inbound)。ChannelHandler分为inbound和outbound两种情况,对应的子接口分别为ChannelInboundHandler和ChannelOutboundHandler
ChannelHandlerContext
上下文。代表的是ChannelHandler和ChannelPipeline之间的关联关系
Netty事件的流向
每个ChannelHandler都是一个业务逻辑处理单元,处理完成后,ChannelHandlerContext会将事件转发到下一个ChannelHandler,ChannelPipeline提供了一个ChnannleHandler链的容器。这就像工厂流水线和流水线上每个工种的关系。当我们实现业务的时候,只需要实现几个ChannelHandler,每个ChannelHandler负责一个业务功能,比如一个负责解码,一个负责业务处理,另一个负责异常处理等,然后用ChannelPipeline把ChannelHandler串起来就可以了。整个过程我们并不需要去了解底层的网络实现,大大加快了开发速度。而且这种实现还有一个好处,就是它把复杂的处理分解成了小的、可以复用的单元。
高性能原理
- 使用epoll等IO复用技术。一个IO线程可以并发处理N个客户端连接和读写操作。避免频繁IO阻塞导致的线程挂起,这从根本上解决了传统同步阻塞IO一连接一线程模型。
- 内存使用优化。接收和发送ByteBuffer采用DIRECT BUFFERS,使用堆外直接内存进行Socket读写。
- 文件传输采用transferTo(如果是linux系统则为),减少无用内存复制。
- 使用内存池。
- 操作无锁化
Demo
阻塞IO
public class MyServer {
public static class MyRun implements Runnable{
private Socket socket = null;
public MyRun(Socket socket) {
this.socket = socket;
}
public void run() {
BufferedReader bufferedReader = null;
PrintWriter printWriter = null;
try {
bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String line = bufferedReader.readLine();
printWriter = new PrintWriter(socket.getOutputStream(), true);
if (line.length() < 1){
printWriter.println("it is null");
return;
}
System.out.println(line);
if ("get time".equals(line)){
printWriter.println(new Date().toString());
}else{
printWriter.println("isn't ok");
}
} catch (IOException e) {
e.printStackTrace();
try {
if (bufferedReader != null) {
bufferedReader.close();
}
if (printWriter != null){
printWriter.close();
}
if (socket != null){
socket.close();
}
}catch (Exception ex){
ex.printStackTrace();
}
}
}
}
public static void main(String[] args) {
int port = 8080;
ServerSocket serverSocket = null;
Executor executor = Executors.newFixedThreadPool(100);
try {
serverSocket = new ServerSocket(port);
while (true){
Socket accept = serverSocket.accept();
executor.execute(new MyRun(accept));
}
} catch (IOException e) {
e.printStackTrace();
}finally {
if (serverSocket != null){
try {
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
非阻塞IO
public class NioServer {
public static void main(String[] args) {
new Thread(new MyRun()).start();
}
public static class MyRun implements Runnable {
private Selector selector;
private ServerSocketChannel serverSocketChannel;
private volatile boolean stop = false;
public MyRun() {
try {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(8080), 1024);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}
public void run() {
while (!stop) {
try {
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
for (Iterator<SelectionKey> iterator = selectionKeys.iterator(); iterator.hasNext(); ) {
SelectionKey key = iterator.next();
iterator.remove();
try {
handlerKey(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
System.out.println("server is stop");
if (selector != null) {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (serverSocketChannel != null) {
try {
serverSocketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void handlerKey(SelectionKey key) throws IOException {
if (key.isValid()) {
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel accept = channel.accept();
accept.configureBlocking(false);
accept.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int read = channel.read(byteBuffer);
if (read > 0) {
byteBuffer.flip();
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
String recive_msg = new String(bytes, "utf-8");
System.out.println(recive_msg);
String rep_msg = "get time".equals(recive_msg) ? new Date().toString() : "bad time";
dowrite(channel, rep_msg);
} else if (read < 0) {
key.cancel();
channel.close();
} else {
}
}
}
}
private void dowrite(SocketChannel channel, String rep) throws IOException {
if (rep != null && rep.length() > 0) {
byte[] bytes = rep.getBytes();
ByteBuffer wrte = ByteBuffer.allocate(bytes.length);
wrte.put(bytes);
wrte.flip();
channel.write(wrte);
}
}
}
}
异步IO
public class AIOServer {
private AsynchronousServerSocketChannel serverSocketChannel;
private CountDownLatch countDownLatch = new CountDownLatch(1);
public AIOServer() {
try {
serverSocketChannel = AsynchronousServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8080),1024);
} catch (IOException e) {
e.printStackTrace();
}
}
public void go() throws IOException {
try {
doAccept();
countDownLatch.await();
}catch (Exception e){
serverSocketChannel.close();
}
}
private void doAccept() {
serverSocketChannel.accept(this, new CompletionHandler<AsynchronousSocketChannel, AIOServer>() {
public void completed(final AsynchronousSocketChannel socketChannel, AIOServer aioServer) {
//必须加这句才能接收其他的客户端连接,形成一个循环
aioServer.serverSocketChannel.accept(aioServer,this);
final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
socketChannel.read(byteBuffer, byteBuffer, new CompletionHandler<Integer, ByteBuffer>() {
public void completed(Integer a, ByteBuffer bf) {
byteBuffer.flip();
byte[] bytes = new byte[bf.remaining()];
bf.get(bytes);
try {
String msg = new String(bytes, "utf-8");
System.out.println(msg);
String rep = "get time".equals(msg) ? new Date().toString() : "bad time";
doWrite(rep, socketChannel);
} catch (Exception e) {
try {
socketChannel.close();
} catch (IOException ex) {
e.printStackTrace();
}
}
}
private void doWrite(String rep, final AsynchronousSocketChannel result) throws IOException {
byte[] bytes = rep.getBytes("utf-8");
ByteBuffer bf = ByteBuffer.allocate(bytes.length);
bf.put(bytes);
bf.flip();
result.write(bf, bf, new CompletionHandler<Integer, ByteBuffer>() {
public void completed(Integer a, ByteBuffer bfer) {
if (bfer.hasRemaining()) {
result.write(bfer, bfer, this);
}
}
public void failed(Throwable exc, ByteBuffer attachment) {
try {
result.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
public void failed(Throwable exc, ByteBuffer attachment) {
try {
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
public void failed(Throwable exc, AIOServer attachment) {
try {
serverSocketChannel.close();
countDownLatch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
public static void main(String[] args) throws IOException {
new AIOServer().go();
}
}
Netty
public class NettyServer {
private static EventLoopGroup bosses = new NioEventLoopGroup();
private static EventLoopGroup workers = new NioEventLoopGroup();
public static void main(String[] args) {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bosses, workers).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ChannelHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf bf = (ByteBuf) msg;
byte[] bs = new byte[bf.readableBytes()];
bf.readBytes(bs);
String recive_msg = new String(bs, "utf-8");
System.out.println(recive_msg);
String rep = "get time".equals(recive_msg) ? new Date().toString() : "bad time";
ByteBuf byteBuf = Unpooled.copiedBuffer(rep.getBytes("utf-8"));
ctx.writeAndFlush(byteBuf);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
});
}
});
try {
ChannelFuture sync = bootstrap.bind(8080).sync();
sync.channel().closeFuture().sync();
} catch (Exception e) {
bosses.shutdownGracefully();
workers.shutdownGracefully();
}
}
}