案例研究:Transport 的迁移

最后更新于:2022-04-01 02:34:17

为了让你想象 Transport 如何工作,我会从一个简单的应用程序开始,这个应用程序什么都不做,只是接受客户端连接并发送“Hi!”字符串消息到客户端,发送完了就断开连接。 ### [](https://github.com/waylau/essential-netty-in-action/blob/master/CORE%20FUNCTIONS/Case%20study%20transport%20migration.md#没有用-netty-实现-io-和-nio)没有用 Netty 实现 I/O 和 NIO 我们将不用 Netty 实现只用 JDK API 来实现 I/O 和 NIO。下面这个例子,是使用阻塞 IO 实现的例子: Listing 4.1 Blocking networking without Netty ~~~ public class PlainOioServer { public void serve(int port) throws IOException { final ServerSocket socket = new ServerSocket(port); //1 try { for (;;) { final Socket clientSocket = socket.accept(); //2 System.out.println("Accepted connection from " + clientSocket); new Thread(new Runnable() { //3 @Override public void run() { OutputStream out; try { out = clientSocket.getOutputStream(); out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8"))); //4 out.flush(); clientSocket.close(); //5 } catch (IOException e) { e.printStackTrace(); try { clientSocket.close(); } catch (IOException ex) { // ignore on close } } } }).start(); //6 } } catch (IOException e) { e.printStackTrace(); } } } ~~~ 1.绑定服务器到指定的端口。 2.接受一个连接。 3.创建一个新的线程来处理连接。 4.将消息发送到连接的客户端。 5.一旦消息被写入和刷新时就 关闭连接。 6.启动线程。 上面的方式可以工作正常,但是这种阻塞模式在大连接数的情况就会有很严重的问题,如客户端连接超时,服务器响应严重延迟,性能无法扩展。为了解决这种情况,我们可以使用异步网络处理所有的并发连接,但问题在于 NIO 和 OIO 的 API 是完全不同的,所以一个用OIO开发的网络应用程序想要使用NIO重构代码几乎是重新开发。 下面代码是使用 NIO 实现的例子: Listing 4.2 Asynchronous networking without Netty ~~~ public class PlainNioServer { public void serve(int port) throws IOException { ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); ServerSocket ss = serverChannel.socket(); InetSocketAddress address = new InetSocketAddress(port); ss.bind(address); //1 Selector selector = Selector.open(); //2 serverChannel.register(selector, SelectionKey.OP_ACCEPT); //3 final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes()); for (;;) { try { selector.select(); //4 } catch (IOException ex) { ex.printStackTrace(); // handle exception break; } Set<SelectionKey> readyKeys = selector.selectedKeys(); //5 Iterator<SelectionKey> iterator = readyKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); try { if (key.isAcceptable()) { //6 ServerSocketChannel server = (ServerSocketChannel)key.channel(); SocketChannel client = server.accept(); client.configureBlocking(false); client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, msg.duplicate()); //7 System.out.println( "Accepted connection from " + client); } if (key.isWritable()) { //8 SocketChannel client = (SocketChannel)key.channel(); ByteBuffer buffer = (ByteBuffer)key.attachment(); while (buffer.hasRemaining()) { if (client.write(buffer) == 0) { //9 break; } } client.close(); //10 } } catch (IOException ex) { key.cancel(); try { key.channel().close(); } catch (IOException cex) { // 在关闭时忽略 } } } } } } ~~~ 1.绑定服务器到制定端口 2.打开 selector 处理 channel 3.注册 ServerSocket 到 ServerSocket ,并指定这是专门意接受 连接。 4.等待新的事件来处理。这将阻塞,直到一个事件是传入。 5.从收到的所有事件中 获取 SelectionKey 实例。 6.检查该事件是一个新的连接准备好接受。 7.接受客户端,并用 selector 进行注册。 8.检查 socket 是否准备好写数据。 9.将数据写入到所连接的客户端。如果网络饱和,连接是可写的,那么这个循环将写入数据,直到该缓冲区是空的。 10.关闭连接。 如你所见,即使它们实现的功能是一样,但是代码完全不同。下面我们将用Netty 来实现相同的功能。 ### [](https://github.com/waylau/essential-netty-in-action/blob/master/CORE%20FUNCTIONS/Case%20study%20transport%20migration.md#采用-netty-实现-io-和-nio)采用 Netty 实现 I/O 和 NIO 下面代码是使用Netty作为网络框架编写的一个阻塞 IO 例子: Listing 4.3 Blocking networking with Netty ~~~ public class NettyOioServer { public void server(int port) throws Exception { final ByteBuf buf = Unpooled.unreleasableBuffer( Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8"))); EventLoopGroup group = new OioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); //1 b.group(group) //2 .channel(OioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<SocketChannel>() {//3 @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { //4 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);//5 } }); } }); ChannelFuture f = b.bind().sync(); //6 f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); //7 } } } ~~~ 1.创建一个 ServerBootstrap 2.使用 OioEventLoopGroup 允许阻塞模式(Old-IO) 3.指定 ChannelInitializer 将给每个接受的连接调用 4.添加的 ChannelHandler 拦截事件,并允许他们作出反应 5.写信息到客户端,并添加 ChannelFutureListener 当一旦消息写入就关闭连接 6.绑定服务器来接受连接 7.释放所有资源 下面代码是使用 Netty NIO 实现。 ### [](https://github.com/waylau/essential-netty-in-action/blob/master/CORE%20FUNCTIONS/Case%20study%20transport%20migration.md#netty-nio-版本)Netty NIO 版本 下面是 Netty NIO 的代码,只是改变了一行代码,就从 OIO 传输 切换到了 NIO。 Listing 4.4 Asynchronous networking with Netty ~~~ public class NettyNioServer { public void server(int port) throws Exception { final ByteBuf buf = Unpooled.unreleasableBuffer( Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8"))); NioEventLoopGroup group = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); //1 b.group(new NioEventLoopGroup(), new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<SocketChannel>() { //3 @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { //4 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(buf.duplicate()) //5 .addListener(ChannelFutureListener.CLOSE); } }); } }); ChannelFuture f = b.bind().sync(); //6 f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); //7 } } } ~~~ 1.创建一个 ServerBootstrap 2.使用 OioEventLoopGroup 允许阻塞模式(Old-IO) 3.指定 ChannelInitializer 将给每个接受的连接调用 4.添加的 ChannelInboundHandlerAdapter() 接收事件并进行处理 5.写信息到客户端,并添加 ChannelFutureListener 当一旦消息写入就关闭连接 6.绑定服务器来接受连接 7.释放所有资源 因为 Netty 使用相同的 API 来实现每个传输,它并不关心你使用什么来实现。Netty 通过操作接口Channel 、ChannelPipeline 和 ChannelHandler来实现。 现在你了解到了用 基于 Netty 传输的好处。下面就来看下传输的 API.
';