Netty浅析

2017/11/09 - Java 系统架构

简介

Netty是一个异步网络IO框架,简单说就是帮我们专注于收发包的细节且提供高性能IO复用,在恰当的时候(如Tcp收满一个逻辑包后),回调我们的业务逻辑代码。

Netty基于NIO,整个Netty API体系都是异步的(可通过callback被动通知模式、Future主动通知模式进行后续的业务逻辑操作)。

组件介绍

Channels

代表一次具体的管道连接(文件设备或Socket),连接着两端的的读写。提供了基础的IO功能,如bind,connect,read,write等。使用Channel比Java的Socket更简单。

有如下状态,当状态变化之后,就会出发相应的事件,最终ChannelPipeline调用他的ChannelHandlers进行处理。

  • ChannelUnregistered Channel创建,但是没有被注册到EventLoop中
  • ChannelRegistered Channel被注册到EventLoop中
  • ChannelActive Channel处于Active态,可能被读写
  • ChannelInactive Channel处于非Active态

EventLoop

EventLoopGroup包含多个EventLoop。EventLoop在生命周期内被绑定到一个线程上,Channel在生命周期内与一个EventLoop绑定,多个Channel可以被绑定在同一个EventLoop上。

Handler

当特定的网络事件发生后,相应的业务逻辑Handler就会被执行。

ChannelPipeline是一个ChannelHandler链条,用来传播处理inbound和outbound事件,执行顺序与被添加到pipeline中相同(inbound)。ChannelHandler分为inbound和outbound两种情况,对应的子接口分别为ChannelInboundHandler和ChannelOutboundHandler

ChannelHandlerContext

上下文。代表的是ChannelHandler和ChannelPipeline之间的关联关系

Netty事件的流向

每个ChannelHandler都是一个业务逻辑处理单元,处理完成后,ChannelHandlerContext会将事件转发到下一个ChannelHandler,ChannelPipeline提供了一个ChnannleHandler链的容器。这就像工厂流水线和流水线上每个工种的关系。当我们实现业务的时候,只需要实现几个ChannelHandler,每个ChannelHandler负责一个业务功能,比如一个负责解码,一个负责业务处理,另一个负责异常处理等,然后用ChannelPipeline把ChannelHandler串起来就可以了。整个过程我们并不需要去了解底层的网络实现,大大加快了开发速度。而且这种实现还有一个好处,就是它把复杂的处理分解成了小的、可以复用的单元。

高性能原理

  1. 使用epoll等IO复用技术。一个IO线程可以并发处理N个客户端连接和读写操作。避免频繁IO阻塞导致的线程挂起,这从根本上解决了传统同步阻塞IO一连接一线程模型。
  2. 内存使用优化。接收和发送ByteBuffer采用DIRECT BUFFERS,使用堆外直接内存进行Socket读写。
  3. 文件传输采用transferTo(如果是linux系统则为),减少无用内存复制。
  4. 使用内存池。
  5. 操作无锁化

Demo

详情参考

阻塞IO

public class MyServer {
    public static class MyRun implements Runnable{
        private Socket socket = null;

        public MyRun(Socket socket) {
            this.socket = socket;
        }

        public void run() {
            BufferedReader bufferedReader = null;
            PrintWriter printWriter = null;
            try {
                bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String line = bufferedReader.readLine();
                printWriter = new PrintWriter(socket.getOutputStream(), true);
                if (line.length() < 1){
                    printWriter.println("it is null");
                    return;
                }

                System.out.println(line);

                if ("get time".equals(line)){
                    printWriter.println(new Date().toString());
                }else{
                    printWriter.println("isn't ok");
                }

            } catch (IOException e) {
                e.printStackTrace();
                try {
                    if (bufferedReader != null) {
                        bufferedReader.close();
                    }

                    if (printWriter != null){
                        printWriter.close();
                    }

                    if (socket != null){
                        socket.close();
                    }

                }catch (Exception ex){
                    ex.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        int port = 8080;
        ServerSocket serverSocket = null;
        Executor executor = Executors.newFixedThreadPool(100);
        try {
            serverSocket = new ServerSocket(port);
            while (true){
                Socket accept = serverSocket.accept();
                executor.execute(new MyRun(accept));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            if (serverSocket != null){
                try {
                    serverSocket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

非阻塞IO

public class NioServer {

    public static void main(String[] args) {
        new Thread(new MyRun()).start();
    }

    public static class MyRun implements Runnable {
        private Selector selector;
        private ServerSocketChannel serverSocketChannel;
        private volatile boolean stop = false;

        public MyRun() {
            try {
                selector = Selector.open();
                serverSocketChannel = ServerSocketChannel.open();
                serverSocketChannel.configureBlocking(false);
                serverSocketChannel.socket().bind(new InetSocketAddress(8080), 1024);
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        public void run() {
            while (!stop) {
                try {
                    selector.select(1000);
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    for (Iterator<SelectionKey> iterator = selectionKeys.iterator(); iterator.hasNext(); ) {
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        try {
                            handlerKey(key);
                        } catch (Exception e) {
                            if (key != null) {
                                key.cancel();
                                if (key.channel() != null) {
                                    key.channel().close();
                                }
                            }
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

            System.out.println("server is stop");

            if (selector != null) {
                try {
                    selector.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

            if (serverSocketChannel != null) {
                try {
                    serverSocketChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        private void handlerKey(SelectionKey key) throws IOException {
            if (key.isValid()) {
                if (key.isAcceptable()) {
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    SocketChannel accept = channel.accept();
                    accept.configureBlocking(false);
                    accept.register(selector, SelectionKey.OP_READ);
                }

                if (key.isReadable()) {
                    SocketChannel channel = (SocketChannel) key.channel();
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    int read = channel.read(byteBuffer);
                    if (read > 0) {
                        byteBuffer.flip();
                        byte[] bytes = new byte[byteBuffer.remaining()];
                        byteBuffer.get(bytes);
                        String recive_msg = new String(bytes, "utf-8");
                        System.out.println(recive_msg);
                        String rep_msg = "get time".equals(recive_msg) ? new Date().toString() : "bad time";
                        dowrite(channel, rep_msg);
                    } else if (read < 0) {
                        key.cancel();
                        channel.close();
                    } else {

                    }
                }
            }
        }

        private void dowrite(SocketChannel channel, String rep) throws IOException {
            if (rep != null && rep.length() > 0) {
                byte[] bytes = rep.getBytes();
                ByteBuffer wrte = ByteBuffer.allocate(bytes.length);
                wrte.put(bytes);
                wrte.flip();
                channel.write(wrte);
            }
        }
    }
}

异步IO

public class AIOServer {

    private AsynchronousServerSocketChannel serverSocketChannel;
    private CountDownLatch countDownLatch = new CountDownLatch(1);

    public AIOServer() {
        try {
            serverSocketChannel = AsynchronousServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(8080),1024);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void go() throws IOException {
        try {
            doAccept();
            countDownLatch.await();
        }catch (Exception e){
            serverSocketChannel.close();
        }
    }

    private void doAccept() {
        serverSocketChannel.accept(this, new CompletionHandler<AsynchronousSocketChannel, AIOServer>() {
                public void completed(final AsynchronousSocketChannel  socketChannel, AIOServer aioServer) {
                    //必须加这句才能接收其他的客户端连接,形成一个循环
                    aioServer.serverSocketChannel.accept(aioServer,this);
                    final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    socketChannel.read(byteBuffer, byteBuffer, new CompletionHandler<Integer, ByteBuffer>() {
                        public void completed(Integer a, ByteBuffer bf) {
                            byteBuffer.flip();
                            byte[] bytes = new byte[bf.remaining()];
                            bf.get(bytes);
                            try {
                                String msg = new String(bytes, "utf-8");
                                System.out.println(msg);
                                String rep = "get time".equals(msg) ? new Date().toString() : "bad time";
                                doWrite(rep, socketChannel);
                            } catch (Exception e) {
                                try {
                                    socketChannel.close();
                                } catch (IOException ex) {
                                    e.printStackTrace();
                                }
                            }
                        }

                        private void doWrite(String rep, final AsynchronousSocketChannel result) throws IOException {
                            byte[] bytes = rep.getBytes("utf-8");
                            ByteBuffer bf = ByteBuffer.allocate(bytes.length);
                            bf.put(bytes);
                            bf.flip();

                            result.write(bf, bf, new CompletionHandler<Integer, ByteBuffer>() {
                                    public void completed(Integer a, ByteBuffer bfer) {
                                        if (bfer.hasRemaining()) {
                                            result.write(bfer, bfer, this);
                                        }
                                    }

                                    public void failed(Throwable exc, ByteBuffer attachment) {
                                        try {
                                            result.close();
                                        } catch (IOException e) {
                                            e.printStackTrace();
                                        }
                                    }
                            });

                        }

                        public void failed(Throwable exc, ByteBuffer attachment) {
                            try {
                                socketChannel.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    });
                }

                public void failed(Throwable exc, AIOServer attachment) {
                    try {
                        serverSocketChannel.close();
                        countDownLatch.countDown();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
        });
    }

    public static void main(String[] args) throws IOException {
        new AIOServer().go();
    }
}

Netty

public class NettyServer {

    private static EventLoopGroup bosses = new NioEventLoopGroup();
    private static EventLoopGroup workers = new NioEventLoopGroup();

    public static void main(String[] args) {
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bosses, workers).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new ChannelHandlerAdapter() {
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                            ByteBuf bf = (ByteBuf) msg;
                            byte[] bs = new byte[bf.readableBytes()];
                            bf.readBytes(bs);

                            String recive_msg = new String(bs, "utf-8");
                            System.out.println(recive_msg);
                            String rep = "get time".equals(recive_msg) ? new Date().toString() : "bad time";
                            ByteBuf byteBuf = Unpooled.copiedBuffer(rep.getBytes("utf-8"));
                            ctx.writeAndFlush(byteBuf);
                        }

                        @Override
                        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                            ctx.close();
                        }
                    });
                }
        });

        try {
            ChannelFuture sync = bootstrap.bind(8080).sync();
            sync.channel().closeFuture().sync();
        } catch (Exception e) {
            bosses.shutdownGracefully();
            workers.shutdownGracefully();
        }
    }
}

如果文章对您有帮助,欢迎扫描下方二维码赞助(一分也是爱噢),谢谢

Search

    一分也是爱噢 一分也是爱

    目录