用例2:Facebook 和 Twitter

最后更新于:2022-04-01 02:37:39

';

用例1:Droplr Firebase 和 Urban Airship

最后更新于:2022-04-01 02:37:37

';

总结

最后更新于:2022-04-01 02:37:35

在这一章里,你知道 Netty 使用哪个线程模型。你学会了使用线程模型的优缺点以及当使用 Netty 它们如何简化你的生活。 除了学习的内部运作,您获得了洞察力,知道如何可以执行自己的任务在 EventLoop(I/O Thread) 和 Netty 一样。你学会了如何在一大堆任务中安排任务。您还了解了如何验证一个任务是否执行以及如何取消它。 你现在知道 Netty 使用的各个先前版本的线程模型,你获得了更多的背景信息知道为什么新线程模型是更强大的。 你对 Netty 的线程模型有了深入了解,从而帮助您最大限度地提高您的应用程序性能,同时最小化所需的代码。关于线程池和并发访问的更多信息,请参阅 Java Concurrency in Practice (Brian Goetz)。他的书将会给你一个更深层次的了解,即使是最复杂的应用程序必须处理多线程的用例场景。
';

I/O EventLoop/Thread 分配细节

最后更新于:2022-04-01 02:37:32

Netty 的使用一个包含 EventLoop 的 EventLoopGroup 为 Channel 的 I/O 和事件服务。EventLoop 创建并分配方式不同基于传输的实现。异步实现使用只有少数 EventLoop(和 Threads)共享于 Channel 之间 。这允许最小线程数服务多个 Channel,不需要为他们每个人都有一个专门的 Thread。 图15.7显示了如何使用 EventLoopGroup。 [![](https://github.com/waylau/essential-netty-in-action/raw/master/images/Figure%2015.7%20Thread%20allocation%20for%20nonblocking%20transports%20(such%20as%20NIO%20and%20AIO).jpg)](https://github.com/waylau/essential-netty-in-action/blob/master/images/Figure%2015.7%20Thread%20allocation%20for%20nonblocking%20transports%20(such%20as%20NIO%20and%20AIO).jpg) 1. 所有的 EventLoop 由 EventLoopGroup 分配。这里它将使用三个EventLoop 实例 2. 这个 EventLoop 处理所有分配给它管道的事件和任务。每个EventLoop 绑定到一个 Thread 3. 管道绑定到 EventLoop,所以所有操作总是被同一个线程在 Channel 的生命周期执行。一个管道属于一个连接 Figure 15.7 Thread allocation for nonblocking transports (such as NIO and AIO) 如图所述,使用有 3个 EventLoop (每个都有一个 Thread ) EventLoopGroup 。EventLoop (同时也是 Thread )直接当 EventLoopGroup 创建时分配。这样保证资源是可以使用的 这三个 EventLoop 实例将会分配给每个新创建的 Channel。这是通过EventLoopGroup 实现,管理 EventLoop 实例。实际实现会照顾所有EventLoop 实例上均匀的创建 Channel (同样是不同的 Thread)。 一旦 Channel 是分配给一个 EventLoop,它将使用这个 EventLoop 在它的生命周期里和同样的线程。你可以,也应该,依靠这个,因为它可以确保你不需要担心同步(包括线程安全、可见性和同步)在你 ChannelHandler实现。 但是这也会影响使用 ThreadLocal,例如,经常使用的应用程序。因为一个EventLoop 通常影响多个 Channel,ThreadLocal 将相同的 Channel 分配给 EventLoop。因此,它适合状态跟踪等等。它仍然可以用于共享重或昂贵的对象之间的 Channel ,不再需要保持状态,因此它可以用于每个事件,而不需要依赖于先前 ThreadLocal 的状态。 *EventLoop 和 Channel* *我们应该注意,在 Netty 4 , Channel 可能从 EventLoop 注销稍后又从不同 EventLoop 注册。这个功能是不赞成,因为它在实践中没有很好的工作* 语义跟其他传输略有不同,如 OIO(Old Blocking I/O)运输,可以看到如图14.8所示。 [![](https://github.com/waylau/essential-netty-in-action/raw/master/images/Figure%2015.8%20Thread%20allocation%20of%20blocking%20transports%20(such%20as%20OIO).jpg)](https://github.com/waylau/essential-netty-in-action/blob/master/images/Figure%2015.8%20Thread%20allocation%20of%20blocking%20transports%20(such%20as%20OIO).jpg) 1. 所有 EventLoop 从 EventLoopGroup 分配。每个新的 channel 将会获得新的 EventLoop 2. EventLoop 分配给 channel 用于执行所有事件和任务 3. Channel 绑定到 EventLoop。一个 channel 属于一个连接 Figure 15.8 Thread allocation of blocking transports (such as OIO) 你可能会注意到这里,一个 EventLoop (也是一个 Thread)创建每个 Channel。你可能被用来从开发网络应用程序是基于常规阻塞I/O在使用java.io.* 包。但即使语义变化在这种情况下,有一件事仍然是相同的:每个 I/O 通道将由一次只有一个线程来处理,这是一个线程增强 Channel 的 EventLoop。可以依靠这个硬性的规则,使 Netty 的框架很容易与其他网络框架进行比较。
';

EventLoop

最后更新于:2022-04-01 02:37:30

每隔一段时间需要调度任务执行,也许你想注册一个任务在客户端完成连接5分钟后执行,一个常见的用例是发送一个消息“你还活着?”到远端通,如果远端没有反应,则可以关闭通道(连接)和释放资源。 本节介绍使用强大的 EventLoop 实现任务调度,还会简单介绍 Java API的任务调度,以方便和 Netty 比较加深理解。 ### [](https://github.com/waylau/essential-netty-in-action/blob/master/ADVANCED%20TOPICS/Scheduling%20tasks%20for%20later%20execution.md#使用普通的-java-api-调度任务)使用普通的 Java API 调度任务 在 Java 中使用 JDK 提供的 ScheduledExecutorService 实现任务调度。使用 Executors 提供的静态方法创建 ScheduledExecutorService,有如下方法 Table 15.1 java.util.concurrent.Executors-Static methods to create a ScheduledExecutorService | 方法 | 描述 | | --- | --- | | newScheduledThreadPool(int corePoolSize) newScheduledThreadPool(int corePoolSize,ThreadFactorythreadFactory) | | ScheduledThreadExecutorService 用于调度命令来延迟或者周期性的执行。 corePoolSize 用于计算线程的数量 newSingleThreadScheduledExecutor() newSingleThreadScheduledExecutor(ThreadFact orythreadFactory) | 新建一个 ScheduledThreadExecutorService 可以用于调度命令来延迟或者周期性的执行。它将使用一个线程来执行调度的任务 下面的 ScheduledExecutorService 调度任务 60 执行一次 Listing 15.4 Schedule task with a ScheduledExecutorService ~~~ ScheduledExecutorService executor = Executors .newScheduledThreadPool(10); //1 ScheduledFuture<?> future = executor.schedule( new Runnable() { //2 @Override public void run() { System.out.println("Now it is 60 seconds later"); //3 } }, 60, TimeUnit.SECONDS); //4 // do something // executor.shutdown(); //5 ~~~ 1. 新建 ScheduledExecutorService 使用10个线程 2. 新建 runnable 调度执行 3. 稍后运行 4. 调度任务60秒后执行 5. 关闭 ScheduledExecutorService 来释放任务完成的资源 ### [](https://github.com/waylau/essential-netty-in-action/blob/master/ADVANCED%20TOPICS/Scheduling%20tasks%20for%20later%20execution.md#使用-eventloop-调度任务)使用 EventLoop 调度任务 使用 ScheduledExecutorService 工作的很好,但是有局限性,比如在一个额外的线程中执行任务。如果需要执行很多任务,资源使用就会很严重;对于像 Netty 这样的高性能的网络框架来说,严重的资源使用是不能接受的。Netty 对这个问题提供了很好的方法。 Netty 允许使用 EventLoop 调度任务分配到通道,如下面代码: Listing 15.5 Schedule task with EventLoop ~~~ Channel ch = null; // Get reference to channel ScheduledFuture<?> future = ch.eventLoop().schedule( new Runnable() { @Override public void run() { System.out.println("Now its 60 seconds later"); } }, 60, TimeUnit.SECONDS); ~~~ 1. 新建 runnable 用于执行调度 2. 稍后执行 3. 调度任务60秒后运行 如果想任务每隔多少秒执行一次,看下面代码: Listing 15.6 Schedule a fixed task with the EventLoop ~~~ Channel ch = null; // Get reference to channel ScheduledFuture<?> future = ch.eventLoop().scheduleAtFixedRate( new Runnable() { @Override public void run() { System.out.println("Run every 60 seconds"); } }, 60, 60, TimeUnit.SECONDS); ~~~ 1. 新建 runnable 用于执行调度 2. 将运行直到 ScheduledFuture 被取消 3. 调度任务60秒运行 取消操作,可以使用 ScheduledFuture 返回每个异步操作。 ScheduledFuture 提供一个方法用于取消一个调度了的任务或者检查它的状态。一个简单的取消操作如下: ~~~ ScheduledFuture<?> future = ch.eventLoop() .scheduleAtFixedRate(..); //1 // Some other code that runs... future.cancel(false); //2 ~~~ 1. 调度任务并获取返回的 ScheduledFuture 2. 取消任务,阻止它再次运行 ### [](https://github.com/waylau/essential-netty-in-action/blob/master/ADVANCED%20TOPICS/Scheduling%20tasks%20for%20later%20execution.md#调度的内部实现)调度的内部实现 Netty 内部实现其实是基于George Varghese 提出的 “Hashed and hierarchical timing wheels: Data structures to efficiently implement timer facility(散列和分层定时轮:数据结构有效实现定时器)”。这种实现只保证一个近似执行,也就是说任务的执行可能不是100%准确;在实践中,这已经被证明是一个可容忍的限制,不影响多数应用程序。所以,定时执行任务不可能100%准确的按时执行。 为了更好的理解它是如何工作,我们可以这样认为: * 在指定的延迟时间后调度任务; * 任务被插入到 EventLoop 的 Schedule-Task-Queue(调度任务队列); * 如果任务需要马上执行,EventLoop 检查每个运行; * 如果有一个任务要执行,EventLoop 将立刻执行它,并从队列中删除; * EventLoop 等待下一次运行,从第4步开始一遍又一遍的重复。 因为这样的实现计划执行不可能100%正确,对于多数用例不可能100%准备的执行计划任务;在 Netty 中,这样的工作几乎没有资源开销。 但是如果需要更准确的执行呢?很容易,你需要使用ScheduledExecutorService 的另一个实现,这不是 Netty 的内容。记住,如果不遵循 Netty 的线程模型协议,你将需要自己同步并发访问。
';

EventLoop

最后更新于:2022-04-01 02:37:28

事件循环所做的正如它的名字所说的。它运行在一个循环里,直到它的终止。这符合网络框架的设计,因为他们需要在一个循环为一个特定的连接运行事件。这不是 Netty 发明新的东西;其他框架和实现已经这样做了。 下面的清单显示了典型的 EventLoop 逻辑。请注意这是为了更好的说明这个想法而不是单单展示 Netty 实现本身。 Listing 14.1 Execute task in EventLoop ~~~ while (!terminated) { List<Runnable> readyEvents = blockUntilEventsReady(); //1 for (Runnable ev: readyEvents) { ev.run(); //2 } } ~~~ 1. 阻塞直到事件可以运行 2. 循环所有事件,并运行他们 在 Netty 中使用 EventLoop 接口代表事件循环,EventLoop 是从EventExecutor 和 ScheduledExecutorService 扩展而来,所以可以将任务直接交给 EventLoop 执行。类关系图如下: [![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-08-19_55d47c070c386.jpg)](https://github.com/waylau/essential-netty-in-action/blob/master/images/Figure%2015.2%20EventLoop%20class%20hierarchy.jpg) Figure 15.2 EventLoop class hierarchy EventLoop 是完全由一个 Thread,从未改变。为了更合理利用资源,根据配置和可用的内核, Netty 可以使用多个 EventLoop。 *事件/任务执行顺序* *一个重要的细节关于事件和任务的执行顺序是,事件/任务执行顺序按照FIFO(先进先出)。这是必要的,因为否则事件不能按顺序处理,所处理的字节将不能保证正确的顺序。这将导致问题,所以这个不是所允许的设计。* ### [](https://github.com/waylau/essential-netty-in-action/blob/master/ADVANCED%20TOPICS/The%20EventLoop.md#netty-4-中的-io-和事件处理)Netty 4 中的 I/O 和事件处理 Netty 使用 I/O 事件,b被各种 I/O 操作运输本身所触发。 这些 I/O 操作,例如网络 API 的一部分,由Java 和底层操作系统提供。 一个区别在于,一些操作(或者事件)是由 Netty 的本身的传输实现触发的,一些是由用户自己。例如读事件通常是由传输本身在读取一些数据时触发。相比之下,写事件通常是由用户本身,例如,当调用 Channel.write(…)。 究竟需要做一次处理一个事件取决于事件的性质。经常会读网络栈的数据转移到您的应用程序。有时它会在另一个方向做同样的事情,例如,把数据从应用程序到网络堆栈(内核)发送到它的远端。但不限于这种类型的事务;重要的是,所使用的逻辑是通用的,灵活地处理各种各样的用例。 I/O 和事件处理的一个重要的事情在 Netty 4,是每一个 I/O 操作和事件总是由 EventLoop 本身处理,以及分配给 EventLoop 的 Thread。 我们应该注意,Netty 不总是使用我们描述的线程模型(通过 EventLoop 抽象)。在下一节中,你会了解 Netty 3 中使用的线程模型。这将帮助你理解为什么现在用新的线程模型以及为什么使用取代了 Netty 3 中仍然使用的旧模式。 ### [](https://github.com/waylau/essential-netty-in-action/blob/master/ADVANCED%20TOPICS/The%20EventLoop.md#netty-3-中的-io-操作)Netty 3 中的 I/O 操作 在以前的版本中,线程模型是不同的。Netty 保证只将入站(以前称为 upstream)事件在执行 I/O Thread 执行 (I/O Thread 现在在 Netty 4 叫 EventLoop )。所有的出站(以前称为 downstream)事件被调用Thread 处理,这可能是 I/O Thread 也可以能是其他 Thread。 这听起来像一个好主意,但原来是容易出错,因为处理 ChannelHandler需要小心的出站事件同步,因为它没有保证只有一个线程运行在同一时间。这可能会发生如果你触发 downstream 事件同时在一个管道时;例如,您 调用 Channel.write(..) 在不同的线程。 除了需要负担同步 ChannelHandler,这个线程模型的另一个问题是你可能需要去掉一个入站事件作为一个出站事件的结果,例如 Channel.write(..) 操作导致异常。在这种情况下,exceptionCaught 必须生成并抛出去。乍看之下这不像是一个问题,但我们知道, exceptionCaught 由入站事件涉及,会让你知道问题出在哪里。问题是,事实上,你现在的情况是在调用 Thread 上执行,但 exceptionCaught 事件必须交给工作线程来执行,这样上下文切换是必须的。 相比之下,Netty 4 新线程模型根本没有这些问题,因为一切都在同一个EventLoop 在同一 Thread 中 执行。这消除了需要同步ChannelHandler ,并且使它更容易为用户理解执行。 现在你知道 EventLoop 如何执行任务,它的时间来快速浏览下 Netty 的各种内部功能。 ### [](https://github.com/waylau/essential-netty-in-action/blob/master/ADVANCED%20TOPICS/The%20EventLoop.md#netty-线程模型的内部)Netty 线程模型的内部 Netty 的内部实现使其线程模型表现优异,它会检查正在执行的 Thread 是否是已分配给实际 Channel (和 EventLoop),在 Channel 的生命周期内,EventLoop 负责处理所有的事件。 如果 Thread 是相同的 EventLoop 中的一个,讨论的代码块被执行;如果线程不同,它安排一个任务并在一个内部队列后执行。通常是通过EventLoop 的 Channel 只执行一次下一个事件,这允许直接从任何线程与通道交互,同时还确保所有的 ChannelHandler 是线程安全,不需要担心并发访问问题。 下图显示在 EventLoop 中调度任务执行逻辑,这适合 Netty 的线程模型: [![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-08-19_55d47c0b7e807.jpg)](https://github.com/waylau/essential-netty-in-action/blob/master/images/15.5%20EventLoop%20execution%20logic%20flow.jpg) 1. 应在 EventLoop 中执行的任务 2. 任务传递到执行方法后,执行检查来检测调用线程是否是与分配给 EventLoop 是一样的 3. 线程是一样的,说明你在 EventLoop 里,这意味着可以直接执行的任务 4. 线程与 EventLoop 分配的不一样。当 EventLoop 事件执行时,队列的任务再次执行一次 15.5 EventLoop execution logic/flow 设计是非常重要的,以确保不要把任何长时间运行的任务放在执行队列中,因为长时间运行的任务会阻止其他在相同线程上执行的任务。这多少会影响整个系统依赖于 EventLoop 实现用于特殊传输的实现。 传输之间的切换在你的代码库中可能没有任何改变,重要的是:切勿阻塞 I/O 线程。如果你必须做阻塞调用(或执行需要长时间才能完成的任务),使用 EventExecutor。 下一节将讲解一个在应用程序中经常使用的功能,就是调度执行任务(定期执行)。Java对这个需求提供了解决方案,但 Netty 提供了几个更好的方案
';

线程模型的总览

最后更新于:2022-04-01 02:37:25

本节将简单介绍一般的线程模型,Netty 中如何使用指定的线程模型,以及Netty 过去不同的版本中使用的线程模型。你会更好的理解不同的线程模型的所有利弊。 一个线程模型指定代码执行,给开发人员如何执行他们代码的信息。这很重要,因为它允许开发人员事先知道如何保护他们的代码免受并发执行的副作用。若没有这个知识背景,即使是最好的开发人员都只能是碰运气,希望到最后都能这么幸运,但这几乎是不可能的。进入更多的细节之前,提供一个更好的理解主题的回顾这些天大多数应用程序做什么。 大多数现代应用程序使用多个线程调度工作,因此让应用程序使用所有可用的系统资源以有效的方式。这使得很多有意义,因为大部分硬件有不止一个甚至多个CPU核心。如果一切都只有一个 Thread 执行,不可能完全使用所提供的资源。为了解决这个问题,许多应用程序执行多个 Thread 的运行代码。在早期的 Java,这样做是通过简单地按需创建新 Thread 时,并行工作需要做。 但很快就发现,这不是完美的,因为创建 Thread 和回收会给他们带来的开销。在 Java 5 中,我们终于有了所谓的线程池,经常缓存 Thread,用来消除创建和回收 Thread 的开销。这些池由 Executor 接口提供。Java 5 提供了许多有用的实现,在其内部发生显著的变化,但思想都一脉相承的。创建 Thread 和重用他们提交一个任务时执行。这可以帮助创建和回收线程的开销降到最低。 下图显示使用一个线程池执行一个任务,提交一个任务后会使用线程池中空闲的线程来执行,完成任务后释放线程并将线程重新放回线程池: [![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-08-19_55d47beff1dfe.jpg)](https://github.com/waylau/essential-netty-in-action/blob/master/images/Figure%2015.1%20Executor%20execution%20logic.jpg) 1. Runnable 表示要执行的任务。这可能是任何东西,从一个数据库调用文件系统清理。 2. 之前 runnable 移交到线程池。 3. 闲置的线程被用来执行任务。当一个线程运行结束之后,它将回到闲置线程的列表新任务需要运行时被重用。 4. 线程执行任务 Figure 15.1 Executor execution logic 这个修复 Thread 创建和回收的开销,不需要每个新任务创建和销毁新的 Thread 。 但使用多个 Thread 提供了资源和管理成本,作为一个副作用,引入了太多的上下文切换。这种会随着运行的线程的数量和任务执行的数量的增加而恶化。尽管使用多个线程在开始时似乎不是一个问题,但一旦你把真正工作负载放在系统上,可以会遭受到重击。 除了这些技术的限制和问题,其他问题可能发生在相关的维护应用程序/框架在未来或在项目的生命周期里。有效地说,增加应用程序的复杂性取决于对比。当状态简单时,写一个多线程应用程序是一个辛苦的工作!你能解决这个问题吗?在实际的场景中需要多个 Thread 规模;这是一个事实。让我们看看 Netty 是解决这个问题。
';

EventLoop 和线程模型

最后更新于:2022-04-01 02:37:23

本章介绍 * 线程模型的总览 * EventLoop * 并发 * 任务执行 * 任务调度 线程模型定义了应用或者框架如何执行你的代码,所以选择线程模型极其重要。Netty 提供了一个简单强大的线程模型来帮助我们简化代码。所有 ChannelHandler,包括业务逻辑,都保证由一个 Thread 同时执行特定的 Channel。这并不意味着Netty不能使用多线程,只是 Netty 限制每个Channel 都由一个 Thread 处理,这种设计适用于非阻塞 IO 操作。 读完本章就会深刻理解 Netty 的线程模型以及 Nett y团队为什么会选择这样的线程模型,这些信息可以让我们在使用 Netty 时让程序由最好的性能。此外,Netty 提供的线程模型还可以让我们编写整洁简单的代码,以保持代码的整洁性;我们还会学习 Netty 团队的经验,过去使用其他的线程模型,现在我们将使用 Netty 提供的更容易更强大的线程模型来开发。 本章假设如下: * 你明白线程是什么以及如何使用,并有使用线程的工作经验。若不是这样,就请花些时间来了解清楚这些知识。推荐一本书:《Java Concurrency in Practice(Java 并发编程实战)》(Brian Goetz)。 * 你了解多线程应用程序及其设计,也包括如何保证线程安全和获取最佳性能。
';

测试编解码器

最后更新于:2022-04-01 02:37:21

编码器和解码器完成,但仍有一些缺失:测试。 没有测试你只看到如果编解码器工作对一些真正的服务器运行时,这并不是你应该是依靠什么。第十章所示,为一个自定义编写测试 ChannelHandler通常是通过 EmbeddedChannel。 所以这正是现在做测试我们定制的编解码器,其中包括一个编码器和解码器。让重新开始编码器。后面的清单显示了简单的编写单元测试。 Listing 14.5 MemcachedRequestEncoderTest class public class MemcachedRequestEncoderTest { ~~~ @Test public void testMemcachedRequestEncoder() { MemcachedRequest request = new MemcachedRequest(Opcode.SET, "key1", "value1"); //1 EmbeddedChannel channel = new EmbeddedChannel(new MemcachedRequestEncoder()); //2 channel.writeOutbound(request); //3 ByteBuf encoded = (ByteBuf) channel.readOutbound(); Assert.assertNotNull(encoded); //4 Assert.assertEquals(request.magic(), encoded.readUnsignedByte()); //5 Assert.assertEquals(request.opCode(), encoded.readByte()); //6 Assert.assertEquals(4, encoded.readShort());//7 Assert.assertEquals((byte) 0x08, encoded.readByte()); //8 Assert.assertEquals((byte) 0, encoded.readByte());//9 Assert.assertEquals(0, encoded.readShort());//10 Assert.assertEquals(4 + 6 + 8, encoded.readInt());//11 Assert.assertEquals(request.id(), encoded.readInt());//12 Assert.assertEquals(request.cas(), encoded.readLong());//13 Assert.assertEquals(request.flags(), encoded.readInt()); //14 Assert.assertEquals(request.expires(), encoded.readInt()); //15 byte[] data = new byte[encoded.readableBytes()]; //16 encoded.readBytes(data); Assert.assertArrayEquals((request.key() + request.body()).getBytes(CharsetUtil.UTF_8), data); Assert.assertFalse(encoded.isReadable()); //17 Assert.assertFalse(channel.finish()); Assert.assertNull(channel.readInbound()); } ~~~ } 1. 新建 MemcachedRequest 用于编码为 ByteBuf 2. 新建 EmbeddedChannel 用于保持 MemcachedRequestEncoder 到测试 3. 写请求到 channel 并且判断是否产生了编码的消息 4. 检查 ByteBuf 是否 null 5. 判断 magic 是否正确写入 ByteBuf 6. 判断 opCode (SET) 是否写入正确 7. 检查 key 是否写入长度正确 8. 检查写入的请求是否额外包含 9. 检查数据类型是否写 10. 检查是否保留数据插入 11. 检查 body 的整体大小 计算方式是 key.length + body.length + extras 12. 检查是否正确写入 id 13. 检查是否正确写入 Compare and Swap (CAS) 14. 检查是否正确的 flag 15. 检查是否正确设置到期时间的 16. 检查 key 和 body 是否正确 17. 检查是否可读 Listing 14.6 MemcachedResponseDecoderTest class ~~~ public class MemcachedResponseDecoderTest { @Test public void testMemcachedResponseDecoder() { EmbeddedChannel channel = new EmbeddedChannel(new MemcachedResponseDecoder()); //1 byte magic = 1; byte opCode = Opcode.SET; byte[] key = "Key1".getBytes(CharsetUtil.US_ASCII); byte[] body = "Value".getBytes(CharsetUtil.US_ASCII); int id = (int) System.currentTimeMillis(); long cas = System.currentTimeMillis(); ByteBuf buffer = Unpooled.buffer(); //2 buffer.writeByte(magic); buffer.writeByte(opCode); buffer.writeShort(key.length); buffer.writeByte(0); buffer.writeByte(0); buffer.writeShort(Status.KEY_EXISTS); buffer.writeInt(body.length + key.length); buffer.writeInt(id); buffer.writeLong(cas); buffer.writeBytes(key); buffer.writeBytes(body); Assert.assertTrue(channel.writeInbound(buffer)); //3 MemcachedResponse response = (MemcachedResponse) channel.readInbound(); assertResponse(response, magic, opCode, Status.KEY_EXISTS, 0, 0, id, cas, key, body);//4 } @Test public void testMemcachedResponseDecoderFragments() { EmbeddedChannel channel = new EmbeddedChannel(new MemcachedResponseDecoder()); //5 byte magic = 1; byte opCode = Opcode.SET; byte[] key = "Key1".getBytes(CharsetUtil.US_ASCII); byte[] body = "Value".getBytes(CharsetUtil.US_ASCII); int id = (int) System.currentTimeMillis(); long cas = System.currentTimeMillis(); ByteBuf buffer = Unpooled.buffer(); //6 buffer.writeByte(magic); buffer.writeByte(opCode); buffer.writeShort(key.length); buffer.writeByte(0); buffer.writeByte(0); buffer.writeShort(Status.KEY_EXISTS); buffer.writeInt(body.length + key.length); buffer.writeInt(id); buffer.writeLong(cas); buffer.writeBytes(key); buffer.writeBytes(body); ByteBuf fragment1 = buffer.readBytes(8); //7 ByteBuf fragment2 = buffer.readBytes(24); ByteBuf fragment3 = buffer; Assert.assertFalse(channel.writeInbound(fragment1)); //8 Assert.assertFalse(channel.writeInbound(fragment2)); //9 Assert.assertTrue(channel.writeInbound(fragment3)); //10 MemcachedResponse response = (MemcachedResponse) channel.readInbound(); assertResponse(response, magic, opCode, Status.KEY_EXISTS, 0, 0, id, cas, key, body);//11 } private static void assertResponse(MemcachedResponse response, byte magic, byte opCode, short status, int expires, int flags, int id, long cas, byte[] key, byte[] body) { Assert.assertEquals(magic, response.magic()); Assert.assertArrayEquals(key, response.key().getBytes(CharsetUtil.US_ASCII)); Assert.assertEquals(opCode, response.opCode()); Assert.assertEquals(status, response.status()); Assert.assertEquals(cas, response.cas()); Assert.assertEquals(expires, response.expires()); Assert.assertEquals(flags, response.flags()); Assert.assertArrayEquals(body, response.data().getBytes(CharsetUtil.US_ASCII)); Assert.assertEquals(id, response.id()); } } ~~~ 1. 新建 EmbeddedChannel ,持有 MemcachedResponseDecoder 到测试 2. 创建一个新的 Buffer 并写入数据,与二进制协议的结构相匹配 3. 写缓冲区到 EmbeddedChannel 和检查是否一个新的MemcachedResponse 创建由声明返回值 4. 判断 MemcachedResponse 和预期的值 5. 创建一个新的 EmbeddedChannel 持有 MemcachedResponseDecoder 到测试 6. 创建一个新的 Buffer 和写入数据的二进制协议的结构相匹配 7. 缓冲分割成三个片段 8. 写的第一个片段 EmbeddedChannel 并检查,没有新的MemcachedResponse 创建,因为并不是所有的数据都是准备好了 9. 写第二个片段 EmbeddedChannel 和检查,没有新的MemcachedResponse 创建,因为并不是所有的数据都是准备好了 10. 写最后一段到 EmbeddedChannel 和检查新的 MemcachedResponse 是否创建,因为我们终于收到所有数据 11. 判断 MemcachedResponse 与预期的值
';

Netty 编码器和解码器

最后更新于:2022-04-01 02:37:18

Netty 的是一个复杂和先进的框架,但它不虚幻。当我们请求设置一些 key 为给定值,我们现在知道,Request 类的一个实例被创建来代表这个请求。但 Netty 并不知道 Request 对象是如何转成 Memcached 所期望的。Memcached 所期望的是字节序列;忽略使用的协议,数据在网络上传输永远是字节序列。 将 Request 对象转为 Memcached 所需的字节序列,Netty 需要用 MemcachedRequest 来编码成另外一种格式。 Netty 提供了一个抽象类称为MessageToByteEncoder。它提供了一个抽象方法,将一条消息(在本例中我们 MemcachedRequest 对象)转为字节。你显示什么信息实现通过使用 Java 泛型可以处理;例如 , MessageToByteEncoder 说这个编码器要编码的对象类型是 MemcachedRequest *MessageToByteEncoder 和 Java 泛型* *使用 MessageToByteEncoder 可以绑定特定的参数类型。如果你有多个不同的消息类型,在相同的编码器里,也可以使用MessageToByteEncoder,注意检查消息示例的类型即可* * 这也适用于解码器,除了解码器将一系列字节转换回一个对象。 这个 Netty 的提供了 ByteToMessageDecoder 类,而不是提供一个编码方法用来实现解码。在接下来的两个部分你看看如何实现一个 Memcached 解码器和编码器。在你做之前,然而,它的重要的意识到在使用 Netty 时,你不总是需要提供自己的编码器和解码器。只是现在因为没有 Netty 这样对 Memcached 的内置支持。 *编码器和解码器* *记住,编码器处理出站和译码器处理入站。这基本上意味着编码器将编码数据,写入远端。译码器将从远端读取处理数据。重要的是要记住,出站和入站是两个不同的方向。* 请注意,我们的编码器和译码器不检查任何值最大大小保持实现简单。在实际实现中你最有可能想放入一些验证检查,使用 EncoderException 或DecoderException(或一个子类)如果检测到违反协议。 ### [](https://github.com/waylau/essential-netty-in-action/blob/master/ADVANCED%20TOPICS/Netty%20encoders%20and%20decoders.md#实现-memcached-编码器)实现 Memcached 编码器 本节我们将简要介绍编码器的实现。正如我们提到的,编码器负责编码消息为一系列字节。这些字节可以通过网络发送到远端。为了发送请求,我们首先创建 MemcachedRequest 类,稍后编码器实现会编码为一系列字节。下面的清单显示了我们的 MemcachedRequest 类 Listing 14.1 Implementation of a Memcached request ~~~ public class MemcachedRequest { //1 private static final Random rand = new Random(); private final int magic = 0x80;//fixed so hard coded private final byte opCode; //the operation e.g. set or get private final String key; //the key to delete, get or set private final int flags = 0xdeadbeef; //random private final int expires; //0 = item never expires private final String body; //if opCode is set, the value private final int id = rand.nextInt(); //Opaque private final long cas = 0; //data version check...not used private final boolean hasExtras; //not all ops have extras public MemcachedRequest(byte opcode, String key, String value) { this.opCode = opcode; this.key = key; this.body = value == null ? "" : value; this.expires = 0; //only set command has extras in our example hasExtras = opcode == Opcode.SET; } public MemcachedRequest(byte opCode, String key) { this(opCode, key, null); } public int magic() { //2 return magic; } public int opCode() { //3 return opCode; } public String key() { //4 return key; } public int flags() { //5 return flags; } public int expires() { //6 return expires; } public String body() { //7 return body; } public int id() { //8 return id; } public long cas() { //9 return cas; } public boolean hasExtras() { //10 return hasExtras; } } ~~~ 1. 这个类将会发送请求到 Memcached server 2. 幻数,它可以用来标记文件或者协议的格式 3. opCode,反应了响应的操作已经创建了 4. 执行操作的 key 5. 使用的额外的 flag 6. 表明到期时间 7. body 8. 请求的 id。这个id将在响应中回显。 9. compare-and-check 的值 10. 如果有额外的使用,将返回 true 你如果想实现 Memcached 的其余部分协议,你只需要将 client.op*(op * 任何新的操作添加)转换为其中一个方法请求。我们需要两个更多的支持类,在下一个清单所示 Listing 14.2 Possible Memcached operation codes and response statuses ~~~ public class Status { public static final short NO_ERROR = 0x0000; public static final short KEY_NOT_FOUND = 0x0001; public static final short KEY_EXISTS = 0x0002; public static final short VALUE_TOO_LARGE = 0x0003; public static final short INVALID_ARGUMENTS = 0x0004; public static final short ITEM_NOT_STORED = 0x0005; public static final short INC_DEC_NON_NUM_VAL = 0x0006; } public class Opcode { public static final byte GET = 0x00; public static final byte SET = 0x01; public static final byte DELETE = 0x04; } ~~~ 一个 Opcode 告诉 Memcached 要执行哪些操作。每个操作都由一个字节表示。同样的,当 Memcached 响应一个请求,响应头中包含两个字节代表响应状态。状态和 Opcode 类表示这些 Memcached 的构造。这些操作码可以使用当你构建一个新的 MemcachedRequest 指定哪个行动应该由它引发的。 但现在可以集中精力在编码器上: Listing 14.3 MemcachedRequestEncoder implementation ~~~ public class MemcachedRequestEncoder extends MessageToByteEncoder<MemcachedRequest> { //1 @Override protected void encode(ChannelHandlerContext ctx, MemcachedRequest msg, ByteBuf out) throws Exception { //2 byte[] key = msg.key().getBytes(CharsetUtil.UTF_8); byte[] body = msg.body().getBytes(CharsetUtil.UTF_8); //total size of the body = key size + content size + extras size //3 int bodySize = key.length + body.length + (msg.hasExtras() ? 8 : 0); //write magic byte //4 out.writeByte(msg.magic()); //write opcode byte //5 out.writeByte(msg.opCode()); //write key length (2 byte) //6 out.writeShort(key.length); //key length is max 2 bytes i.e. a Java short //7 //write extras length (1 byte) int extraSize = msg.hasExtras() ? 0x08 : 0x0; out.writeByte(extraSize); //byte is the data type, not currently implemented in Memcached but required //8 out.writeByte(0); //next two bytes are reserved, not currently implemented but are required //9 out.writeShort(0); //write total body length ( 4 bytes - 32 bit int) //10 out.writeInt(bodySize); //write opaque ( 4 bytes) - a 32 bit int that is returned in the response //11 out.writeInt(msg.id()); //write CAS ( 8 bytes) out.writeLong(msg.cas()); //24 byte header finishes with the CAS //12 if (msg.hasExtras()) { //write extras (flags and expiry, 4 bytes each) - 8 bytes total //13 out.writeInt(msg.flags()); out.writeInt(msg.expires()); } //write key //14 out.writeBytes(key); //write value //15 out.writeBytes(body); } } ~~~ 1. 该类是负责编码 MemachedRequest 为一系列字节 2. 转换的 key 和实际请求的 body 到字节数组 3. 计算 body 大小 4. 写幻数到 ByteBuf 字节 5. 写 opCode 作为字节 6. 写 key 长度z作为 short 7. 编写额外的长度作为字节 8. 写数据类型,这总是0,因为目前不是在 Memcached,但可用于使用 后来的版本 9. 为保留字节写为 short ,后面的 Memcached 版本可能使用 10. 写 body 的大小作为 long 11. 写 opaque 作为 int 12. 写 cas 作为 long。这个是头文件的最后部分,在 body 的开始 13. 编写额外的 flag 和到期时间为 int 14. 写 key 15. 这个请求完成后 写 body。 总结,编码器 使用 Netty 的 ByteBuf 处理请求,编码 MemcachedRequest 成一套正确排序的字节。详细步骤为: * 写幻数字节。 * 写 opcode 字节。 * 写 key 长度(2字节)。 * 写额外的长度(1字节)。 * 写数据类型(1字节)。 * 为保留字节写 null 字节(2字节)。 * 写 body 长度(4字节- 32位整数)。 * 写 opaque(4个字节,一个32位整数在响应中返回)。 * 写 CAS(8个字节)。 * 写 额外的(flag 和 到期,4字节)= 8个字节 * 写 key * 写 值 无论你放入什么到输出缓冲区( 调用 ByteBuf) Netty 的将向服务器发送被写入请求。下一节将展示如何进行反向通过解码器工作。 ### [](https://github.com/waylau/essential-netty-in-action/blob/master/ADVANCED%20TOPICS/Netty%20encoders%20and%20decoders.md#实现-memcached-编码器-1)实现 Memcached 编码器 将 MemcachedRequest 对象转为 字节序列,Memcached 仅需将字节转到响应对象返回即可。 先见一个 POJO: Listing 14.7 Implementation of a MemcachedResponse ~~~ public class MemcachedResponse { //1 private final byte magic; private final byte opCode; private byte dataType; private final short status; private final int id; private final long cas; private final int flags; private final int expires; private final String key; private final String data; public MemcachedResponse(byte magic, byte opCode, byte dataType, short status, int id, long cas, int flags, int expires, String key, String data) { this.magic = magic; this.opCode = opCode; this.dataType = dataType; this.status = status; this.id = id; this.cas = cas; this.flags = flags; this.expires = expires; this.key = key; this.data = data; } public byte magic() { //2 return magic; } public byte opCode() { //3 return opCode; } public byte dataType() { //4 return dataType; } public short status() { //5 return status; } public int id() { //6 return id; } public long cas() { //7 return cas; } public int flags() { //8 return flags; } public int expires() { //9 return expires; } public String key() { //10 return key; } public String data() { //11 return data; } } ~~~ 1. 该类,代表从 Memcached 服务器返回的响应 2. 幻数 3. opCode,这反映了创建操作的响应 4. 数据类型,这表明这个是基于二进制还是文本 5. 响应的状态,这表明如果请求是成功的 6. 惟一的 id 7. compare-and-set 值 8. 使用额外的 flag 9. 表示该值存储的一个有效期 10. 响应创建的 key 11. 实际数据 下面为 MemcachedResponseDecoder, 使用了 ByteToMessageDecoder 基类,用于将 字节序列转为 MemcachedResponse Listing 14.4 MemcachedResponseDecoder class ~~~ public class MemcachedResponseDecoder extends ByteToMessageDecoder { //1 private enum State { //2 Header, Body } private State state = State.Header; private int totalBodySize; private byte magic; private byte opCode; private short keyLength; private byte extraLength; private short status; private int id; private long cas; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { switch (state) { //3 case Header: if (in.readableBytes() < 24) { return;//response header is 24 bytes //4 } magic = in.readByte(); //5 opCode = in.readByte(); keyLength = in.readShort(); extraLength = in.readByte(); in.skipBytes(1); status = in.readShort(); totalBodySize = in.readInt(); id = in.readInt(); //referred to in the protocol spec as opaque cas = in.readLong(); state = State.Body; case Body: if (in.readableBytes() < totalBodySize) { return; //until we have the entire payload return //6 } int flags = 0, expires = 0; int actualBodySize = totalBodySize; if (extraLength > 0) { //7 flags = in.readInt(); actualBodySize -= 4; } if (extraLength > 4) { //8 expires = in.readInt(); actualBodySize -= 4; } String key = ""; if (keyLength > 0) { //9 ByteBuf keyBytes = in.readBytes(keyLength); key = keyBytes.toString(CharsetUtil.UTF_8); actualBodySize -= keyLength; } ByteBuf body = in.readBytes(actualBodySize); //10 String data = body.toString(CharsetUtil.UTF_8); out.add(new MemcachedResponse( //1 magic, opCode, status, id, cas, flags, expires, key, data )); state = State.Header; } } } ~~~ 1. 类负责创建的 MemcachedResponse 读取字节 2. 代表当前解析状态,这意味着我们需要解析的头或 body 3. 根据解析状态切换 4. 如果不是至少24个字节是可读的,它不可能读整个头部,所以返回这里,等待再通知一次数据准备阅读 5. 阅读所有头的字段 6. 检查是否足够的数据是可读用来读取完整的响应的 body。长度是从头读取 7. 检查如果有任何额外的 flag 用于读,如果是这样做 8. 检查如果响应包含一个 expire 字段,有就读它 9. 检查响应是否包含一个 key ,有就读它 10. 读实际的 body 的 payload 11. 从前面读取字段和数据构造一个新的 MemachedResponse 所以在实现发生了什么事?我们知道一个 Memcached 响应有24位头;我们不知道是否所有数据,响应将被包含在输入 ByteBuf ,当解码方法调用时。这是因为底层网络堆栈可能将数据分解成块。所以确保我们只解码当我们有足够的数据,这段代码检查是否可用可读的字节的数量至少是24。一旦我们有24个字节,我们可以确定整个消息有多大,因为这个信息包含在24位头。 当我们解码整个消息,我们创建一个 MemcachedResponse 并将其添加到输出列表。任何对象添加到该列表将被转发到下一个ChannelInboundHandler 在 ChannelPipeline,因此允许处理。 *
';

了解 Memcached 二进制协议

最后更新于:2022-04-01 02:37:16

我们说,要实现 Memcached 的 GET, SET, 和 DELETE 操作。我们仅仅关注这些,但 memcached 协议有一个通用的结构,只有少数参数改变为了改变一个请求或响应的意义。这意味着您可以轻松地扩展实现添加其他命令。一般协议有 24 字节头用于请求和响应。这个头可以分解如下表14.1中。 Table 14.1 Sample Memcached header byte structure | Field | Byte offset | Value | | --- | --- | --- | | Magic | 0 | 0x80 用于请求 0x81 用于响应 | | OpCode | 1 | 0x01...0x1A | | Key length | 2 和 3 | 1...32,767 | | Extra length | 4 | 0x00, x04, 或 0x08 | | Data type | 5 | 0x00 | | Reserved | 6 和 7 | 0x00 | | 所有 body 的长度 | 8-11 | 所有 body 的长度 | | Opaque | 12-15 | 任何带带符号的 32-bit 整数; 这个已将包含在响应中,因此更容易将请求映射到响应。 | | CAS | 16-23 | 数据版本检查 | 注意有多少字节用于每个部分。这告诉你以后你应该用什么数据类型。例如,如果一个字节抵消只使用 byte 0,然后使用一个 Java byte来表示它;如果它使用6和7(2字节),你使用一个Java short;如果它使用 12-15(4字节),你使用一个Java int,等等。 [![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-08-19_55d47b2492145.jpg)](https://github.com/waylau/essential-netty-in-action/blob/master/images/Figure%2014.2%20Real-world%20Memcached%20request%20and%20response%20headers.jpg) 1. 请求(只有头显示) 2. 响应 Figure 14.2 Real-world Memcached request and response headers 在图14.2中,高亮显示的第一部分代表请求打到 Memcached (只显示请求头),在这种情况下是告诉 Memcached 来 SET 键是“a”而值是“abc”。第部分是响应。 突出显示的部分中的每一行代表4个字节;因为有6行,这意味着请求头是由24个字节,正如我们之前说的。回顾表14.1中,您可以头在一个真正的请求中看到头文件中的信息。现在,这是所有你需要知道的关于 Memcached 二进制协议。在下一节中,我们需要看看多么我们可以开始制作 Netty 这些请求。
';

实现 Memcached 编解码器

最后更新于:2022-04-01 02:37:14

当想要实现一个给定协议的编解码器,我们应该花一些事件来了解它的运作原理。通常情况下,协议本身都有一些详细的记录。在这里你会发现多少细节?幸运的是 Memcached 的二进制协议可以很好的扩展。 在 RFC 中有相应的规范,可以在 [https://code.google.com/p/Memcached/wiki/MemcacheBinaryProtocol](https://code.google.com/p/Memcached/wiki/MemcacheBinaryProtocol) 找到 。 我们不会实现 Memcached 的所有命令,只会实现三种操作:SET,GET 和 DELETE。这样做事为了让事情变得简单。
';

编解码器的范围

最后更新于:2022-04-01 02:37:12

我们将只实现 Memcached 协议的一个子集,这足够我们进行添加、检索、删除对象;在 Memcached 中是通过执行 SET,GET,DELETE 命令来实现的。Memcached 支持很多其他的命令,但我们只使用其中三个命令,简单的东西,我们才会理解的更清楚。 Memcached 有一个二进制和纯文本协议,它们都可以用来与 Memcached 服务器通信,使用什么类型的协议取决于服务器支持哪些协议。本章主要关注实现二进制协议,因为二进制在网络编程中最常用。
';

实现自定义的编解码器

最后更新于:2022-04-01 02:37:09

本章介绍: * Decoder * Encoder * 单元测试 本章讲述 Netty 中如何轻松实现定制的编解码器,由于 Netty 架构的灵活性,这些编解码器易于重用和测试。为了更容易实现,使用 Memcached 作为协议例子是因为它更方便我们实现。 Memcached 是来自 Memcached.org 的免费开源、高性能、分布式的内存对象缓存系统,其目的是加速动态 Web 应用程序的响应,减轻数据库负载;Memcache 实际上是一个以 key-value 存储任意数据的内存小块。可能有人会问“为什么使用 Memcached?”,因为 Memcached 协议非常简单,便于讲解。
';

高级主题

最后更新于:2022-04-01 02:37:07

';

总结

最后更新于:2022-04-01 02:37:05

本章提供了一个无连接的传输协议,如UDP的介绍。我们看到,在 Netty的您可以从 TCP 切换到 UDP 的同时使用相同的 API。您还了解了如何通过专门的 ChannelHandler 来组织处理逻辑。我们通过独立的解码器的逻辑来处理消息对象。 在下一章中我们将探讨用 Netty 实现可重用的编解码器。
';

运行 LogEventBroadcaster 和 LogEventMonitor

最后更新于:2022-04-01 02:37:02

如上所述,我们将使用 Maven 来运行应用程序。这一次你需要打开两个控制台窗口给每个项目。用 Ctrl-C 可以停止它。 首先我们将启动 LogEventBroadcaster 如清单13.4所示,除了已经构建项目以下命令即可(使用默认值): ~~~ $ mvn exec:exec -Pchapter13-LogEventBroadcaster ~~~ 和之前一样,这将通过 UDP 广播日志消息。 现在,在一个新窗口,构建和启动 LogEventMonitor 接收和显示广播消息。 Listing 13.9 Compile and start the LogEventBroadcaster ~~~ $ mvn clean package exec:exec -Pchapter13-LogEventMonitor [INFO] Scanning for projects... [INFO] [INFO] -------------------------------------------------------------------- [INFO] Building netty-in-action 0.1-SNAPSHOT [INFO] -------------------------------------------------------------------- ... [INFO] [INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ netty-in-action --- [INFO] Building jar: /Users/norman/Documents/workspace-intellij/netty-in-actionprivate/ target/netty-in-action-0.1-SNAPSHOT.jar [INFO] [INFO] --- exec-maven-plugin:1.2.1:exec (default-cli) @ netty-in-action --- LogEventMonitor running ~~~ 当看到 “LogEventMonitor running” 说明程序运行成功了。 控制台将显示任何事件被添加到日志文件中,如下所示。消息的格式是由LogEventHandler 创建。 Listing 13.10 LogEventMonitor output ~~~ 1364217299382 [/192.168.0.38:63182] [/var/log/messages] : Mar 25 13:55:08 dev-linux dhclient: DHCPREQUEST of 192.168.0.50 on eth2 to 192.168.0.254 port 67 1364217299382 [/192.168.0.38:63182] [/var/log/messages] : Mar 25 13:55:08 dev-linux dhclient: DHCPACK of 192.168.0.50 from 192.168.0.254 1364217299382 [/192.168.0.38:63182] [/var/log/messages] : Mar 25 13:55:08 dev-linux dhclient: bound to 192.168.0.50 -- renewal in 270 seconds. 1364217299382 [/192.168.0.38:63182] [[/var/log/messages] : Mar 25 13:59:38 dev-linux dhclient: DHCPREQUEST of 192.168.0.50 on eth2 to 192.168.0.254 port 67 1364217299382 [/192.168.0.38:63182] [/[/var/log/messages] : Mar 25 13:59:38 dev-linux dhclient: DHCPACK of 192.168.0.50 from 192.168.0.254 1364217299382 [/192.168.0.38:63182] [/var/log/messages] : Mar 25 13:59:38 dev-linux dhclient: bound to 192.168.0.50 -- renewal in 259 seconds. 1364217299383 [/192.168.0.38:63182] [/var/log/messages] : Mar 25 14:03:57 dev-linux dhclient: DHCPREQUEST of 192.168.0.50 on eth2 to 192.168.0.254 port 67 1364217299383 [/192.168.0.38:63182] [/var/log/messages] : Mar 25 14:03:57 dev-linux dhclient: DHCPACK of 192.168.0.50 from 192.168.0.254 1364217299383 [/192.168.0.38:63182] [/var/log/messages] : Mar 25 14:03:57 dev-linux dhclient: bound to 192.168.0.50 -- renewal in 285 seconds. ~~~ 若你没有访问 UNIX syslog 的权限,可以创建 自定义的文件,手动填入内容。下面是 UNIX 命令用 touch 创建一个空文件 ~~~ $ touch ~/mylog.log ~~~ 再次启动 LogEventBroadcaster,设置系统属性 ~~~ $ mvn exec:exec -Pchapter13-LogEventBroadcaster -Dlogfile=~/mylog.log ~~~ 当 LogEventBroadcaster 运行时,你可以手动的添加消息到文件来查看广播到 LogEventMonitor 控制台的内容。使用 echo 和输出的文件 ~~~ $ echo ’Test log entry’ >> ~/mylog.log ~~~ 你可以启动任意个监视器实例,他们都会收到相同的消息。
';

写监视器

最后更新于:2022-04-01 02:37:00

这一节我们编写一个监视器:EventLogMonitor ,也就是用来接收事件的程序,用来代替 netcat 。EventLogMonitor 做下面事情: * 接收 LogEventBroadcaster 广播的 UDP DatagramPacket * 解码 LogEvent 消息 * 输出 LogEvent 消息 和之前一样,将实现自定义 ChannelHandler 的逻辑。图13.4描述了LogEventMonitor 的 ChannelPipeline 并表明了 LogEvent 的流经情况。 [![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-08-19_55d47aa7cd6d5.jpg)](https://github.com/waylau/essential-netty-in-action/blob/master/images/Figure%2013.4%20LogEventMonitor.jpg) Figure 13.4 LogEventMonitor 图中显示我们的两个自定义 ChannelHandlers,LogEventDecoder 和 LogEventHandler。首先是负责将网络上接收到的 DatagramPacket 解码到 LogEvent 消息。清单13.6显示了实现。 Listing 13.6 LogEventDecoder ~~~ public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket> { @Override protected void decode(ChannelHandlerContext ctx, DatagramPacket datagramPacket, List<Object> out) throws Exception { ByteBuf data = datagramPacket.content(); //1 int i = data.indexOf(0, data.readableBytes(), LogEvent.SEPARATOR); //2 String filename = data.slice(0, i).toString(CharsetUtil.UTF_8); //3 String logMsg = data.slice(i + 1, data.readableBytes()).toString(CharsetUtil.UTF_8); //4 LogEvent event = new LogEvent(datagramPacket.recipient(), System.currentTimeMillis(), filename,logMsg); //5 out.add(event); } } ~~~ 1. 获取 DatagramPacket 中数据的引用 2. 获取 SEPARATOR 的索引 3. 从数据中读取文件名 4. 读取数据中的日志消息 5. 构造新的 LogEvent 对象并将其添加到列表中 第二个 ChannelHandler 将执行一些首先创建的 LogEvent 消息。在这种情况下,我们只会写入 system.out。在真实的应用程序可能用到一个单独的日志文件或放到数据库。 下面的清单显示了 LogEventHandler。 Listing 13.7 LogEventHandler ~~~ public class LogEventHandler extends SimpleChannelInboundHandler<LogEvent> { //1 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); //2 ctx.close(); } @Override public void channelRead0(ChannelHandlerContext channelHandlerContext, LogEvent event) throws Exception { StringBuilder builder = new StringBuilder(); //3 builder.append(event.getReceivedTimestamp()); builder.append(" ["); builder.append(event.getSource().toString()); builder.append("] ["); builder.append(event.getLogfile()); builder.append("] : "); builder.append(event.getMsg()); System.out.println(builder.toString()); //4 } } ~~~ 1. 继承 SimpleChannelInboundHandler 用于处理 LogEvent 消息 2. 在异常时,输出消息并关闭 channel 3. 建立一个 StringBuilder 并构建输出 4. 打印出 LogEvent 的数据 LogEventHandler 打印出 LogEvent 的一个易读的格式,包括以下: * 收到时间戳以毫秒为单位 * 发送方的 InetSocketAddress,包括IP地址和端口 * LogEvent 生成绝对文件名 * 实际的日志消息,代表在日志文件中一行 现在我们需要安装处理程序到 ChannelPipeline ,如图13.4所示。下一个清单显示了这是如何实现 LogEventMonitor 类的一部分。 Listing 13.8 LogEventMonitor ~~~ public class LogEventMonitor { private final Bootstrap bootstrap; private final EventLoopGroup group; public LogEventMonitor(InetSocketAddress address) { group = new NioEventLoopGroup(); bootstrap = new Bootstrap(); bootstrap.group(group) //1 .channel(NioDatagramChannel.class) .option(ChannelOption.SO_BROADCAST, true) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new LogEventDecoder()); //2 pipeline.addLast(new LogEventHandler()); } }).localAddress(address); } public Channel bind() { return bootstrap.bind().syncUninterruptibly().channel(); //3 } public void stop() { group.shutdownGracefully(); } public static void main(String[] args) throws Exception { if (args.length != 1) { throw new IllegalArgumentException("Usage: LogEventMonitor <port>"); } LogEventMonitor monitor = new LogEventMonitor(new InetSocketAddress(Integer.parseInt(args[0]))); //4 try { Channel channel = monitor.bind(); System.out.println("LogEventMonitor running"); channel.closeFuture().await(); } finally { monitor.stop(); } } } ~~~ 1. 引导 NioDatagramChannel。设置 SO_BROADCAST socket 选项。 2. 添加 ChannelHandler 到 ChannelPipeline 3. 绑定的通道。注意,在使用 DatagramChannel 是没有连接,因为这些 无连接 4. 构建一个新的 LogEventMonitor
';

写广播器

最后更新于:2022-04-01 02:36:58

本节,我们将写一个广播器。下图展示了广播一个 DatagramPacket 在每个日志实体里面的方法 [![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-08-19_55d47a7dcf580.jpg)](https://github.com/waylau/essential-netty-in-action/blob/master/images/Figure%2013.2%20Log%20entries%20sent%20with%20DatagramPackets.jpg) 1. 日志文件 2. 日志文件中的日志实体 3. 一个 DatagramPacket 保持一个单独的日志实体 Figure 13.2 Log entries sent with DatagramPackets 图13.3表示一个 LogEventBroadcaster 的 ChannelPipeline 的高级视图,说明了 LogEvent 是如何流转的。 [![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-08-19_55d47a8329b13.jpg)](https://github.com/waylau/essential-netty-in-action/blob/master/images/Figure%2013.3%20LogEventBroadcaster-ChannelPipeline%20and%20LogEvent%20flow.jpg) Figure 13.3 LogEventBroadcaster: ChannelPipeline and LogEvent flow 正如我们所看到的,所有的数据传输都封装在 LogEvent 消息里。LogEventBroadcaster 写这些通过在本地端的管道,发送它们通过ChannelPipeline 转换(编码)为一个定制的 ChannelHandler 的DatagramPacket 信息。最后,他们通过 UDP 广播并被远程接收。 *编码器和解码器* *编码器和解码器将消息从一种格式转换为另一种,深度探讨在第7章中进行。我们探索 Netty 提供的基础类来简化和实现自定义 ChannelHandler 如 LogEventEncoder 在这个应用程序中。* 下面展示了 编码器的实现 Listing 13.2 LogEventEncoder ~~~ public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> { private final InetSocketAddress remoteAddress; public LogEventEncoder(InetSocketAddress remoteAddress) { //1 this.remoteAddress = remoteAddress; } @Override protected void encode(ChannelHandlerContext channelHandlerContext, LogEvent logEvent, List<Object> out) throws Exception { byte[] file = logEvent.getLogfile().getBytes(CharsetUtil.UTF_8); //2 byte[] msg = logEvent.getMsg().getBytes(CharsetUtil.UTF_8); ByteBuf buf = channelHandlerContext.alloc().buffer(file.length + msg.length + 1); buf.writeBytes(file); buf.writeByte(LogEvent.SEPARATOR); //3 buf.writeBytes(msg); //4 out.add(new DatagramPacket(buf, remoteAddress)); //5 } } ~~~ 1. LogEventEncoder 创建了 DatagramPacket 消息类发送到指定的 InetSocketAddress 2. 写文件名到 ByteBuf 3. 添加一个 SEPARATOR 4. 写一个日志消息到 ByteBuf 5. 添加新的 DatagramPacket 到出站消息 *为什么使用 MessageToMessageEncoder?* *当然我们可以编写自己的自定义 ChannelOutboundHandler 来转换 LogEvent 对象到 DatagramPackets。但是继承自MessageToMessageEncoder 为我们简化和做了大部分的工作。* 为了实现 LogEventEncoder,我们只需要定义服务器的运行时配置,我们称之为“bootstrapping(引导)”。这包括设置各种 ChannelOption 并安装需要的 ChannelHandler 到 ChannelPipeline 中。完成的 LogEventBroadcaster 类,如清单13.3所示。 Listing 13.3 LogEventBroadcaster ~~~ public class LogEventBroadcaster { private final Bootstrap bootstrap; private final File file; private final EventLoopGroup group; public LogEventBroadcaster(InetSocketAddress address, File file) { group = new NioEventLoopGroup(); bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioDatagramChannel.class) .option(ChannelOption.SO_BROADCAST, true) .handler(new LogEventEncoder(address)); //1 this.file = file; } public void run() throws IOException { Channel ch = bootstrap.bind(0).syncUninterruptibly().channel(); //2 System.out.println("LogEventBroadcaster running"); long pointer = 0; for (;;) { long len = file.length(); if (len < pointer) { // file was reset pointer = len; //3 } else if (len > pointer) { // Content was added RandomAccessFile raf = new RandomAccessFile(file, "r"); raf.seek(pointer); //4 String line; while ((line = raf.readLine()) != null) { ch.writeAndFlush(new LogEvent(null, -1, file.getAbsolutePath(), line)); //5 } pointer = raf.getFilePointer(); //6 raf.close(); } try { Thread.sleep(1000); //7 } catch (InterruptedException e) { Thread.interrupted(); break; } } } public void stop() { group.shutdownGracefully(); } public static void main(String[] args) throws Exception { if (args.length != 2) { throw new IllegalArgumentException(); } LogEventBroadcaster broadcaster = new LogEventBroadcaster(new InetSocketAddress("255.255.255.255", Integer.parseInt(args[0])), new File(args[1])); //8 try { broadcaster.run(); } finally { broadcaster.stop(); } } } ~~~ 1. 引导 NioDatagramChannel 。为了使用广播,我们设置 SO_BROADCAST 的 socket 选项 2. 绑定管道。注意当使用 Datagram Channel 时,是没有连接的 3. 如果需要,可以设置文件的指针指向文件的最后字节 4. 设置当前文件的指针,这样不会把旧的发出去 5. 写一个 LogEvent 到管道用于保存文件名和文件实体。(我们期望每个日志实体是一行长度) 6. 存储当前文件的位置,这样,我们可以稍后继续 7. 睡 1 秒。如果其他中断退出循环就重新启动它。 8. 构造一个新的实例 LogEventBroadcaster 并启动它 这就是程序的完整的第一部分。可以使用 "netcat" 程序查看程序的结果。在 UNIX/Linux 系统,可以使用 "nc", 在 Windows 环境下,可以在 [http://nmap.org/ncat](http://nmap.org/ncat)找到 Netcat 是完美的第一个测试我们的应用程序;它只是监听指定的端口上接收并打印所有数据到标准输出。将其设置为在端口 9999 上监听 UDP 数据如下: ~~~ $ nc -l -u 9999 ~~~ 现在我们需要启动 LogEventBroadcaster。清单13.4显示了如何使用 mvn 编译和运行广播器。pom的配置。pom.xml 配置指向一个文件`/var/log/syslog`(假设是UNIX / Linux环境)和端口设置为 9999。文件中的条目将通过 UDP 广播到端口,在你开始 netcat 后打印到控制台。 Listing 13.4 Compile and start the LogEventBroadcaster ~~~ $ mvn clean package exec:exec -Pchapter13-LogEventBroadcaster [INFO] Scanning for projects... [INFO] [INFO] -------------------------------------------------------------------- [INFO] Building netty-in-action 0.1-SNAPSHOT [INFO] -------------------------------------------------------------------- ... ... [INFO] [INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ netty-in-action --- [INFO] Building jar: /Users/norman/Documents/workspace-intellij/netty-in-actionprivate/ target/netty-in-action-0.1-SNAPSHOT.jar [INFO] [INFO] --- exec-maven-plugin:1.2.1:exec (default-cli) @ netty-in-action - LogEventBroadcaster running ~~~ 当调用 mvn 时,在系统属性中改变文件和端口值,指定你想要的。清单13.5 设置日志文件 到 `/var/log/mail.log` 和端口 8888。 Listing 13.5 Compile and start the LogEventBroadcaster ~~~ $ mvn clean package exec:exec -Pchapter13-LogEventBroadcaster / -Dlogfile=/var/log/mail.log -Dport=8888 -.... .... [INFO] [INFO] --- exec-maven-plugin:1.2.1:exec (default-cli) @ netty-in-action - LogEventBroadcaster running ~~~ 当看到 “LogEventBroadcaster running” 说明程序运行成功了。 netcat 只用于测试,但不适合生产环境中使用。
';

EventLog 的 POJO

最后更新于:2022-04-01 02:36:56

在消息应用里面,数据一般以 POJO 形式呈现。这可能保存配置或处理信息除了实际的消息数据。在这个应用程序里,消息的单元是一个“事件”。由于数据来自一个日志文件,我们将称之为 LogEvent。 清单13.1显示了这个简单的POJO的细节。 Listing 13.1 LogEvent message ~~~ public final class LogEvent { public static final byte SEPARATOR = (byte) ':'; private final InetSocketAddress source; private final String logfile; private final String msg; private final long received; public LogEvent(String logfile, String msg) { //1 this(null, -1, logfile, msg); } public LogEvent(InetSocketAddress source, long received, String logfile, String msg) { //2 this.source = source; this.logfile = logfile; this.msg = msg; this.received = received; } public InetSocketAddress getSource() { //3 return source; } public String getLogfile() { //4 return logfile; } public String getMsg() { //5 return msg; } public long getReceivedTimestamp() { //6 return received; } } ~~~ 1. 构造器用于出站消息 2. 构造器用于入站消息 3. 返回发送 LogEvent 的 InetSocketAddress 的资源 4. 返回用于发送 LogEvent 的日志文件的名称 5. 返回消息的内容 6. 返回 LogEvent 接收到的时间
';