第十四章 Spring Security
最后更新于:2022-03-31 23:46:25
# SpringSecurity个性化用户认证流程(一)
第十三章 虚拟容器部署 Docker
最后更新于:2022-03-31 23:46:23
# 什么是Docker
开源的应用容器引擎,基于go语言开发
Docker安装采坑记
相关命令:`systemctl status docker.service`查看服务状态
Linux的安装后步骤 预计阅读时间: 15分钟 本节包含配置Linux主机以使用Docker更好地工作的可选过程。
以非root用户身份管理Docker Docker守护程序绑定到Unix套接字而不是TCP端口。默认情况下,Unix套接字由用户拥有,root而其他用户只能使用它sudo。Docker守护程序始终以root用户身份运行。
如果您不想在docker命令前加上sudo,请创建一个名为的Unix组docker并向其添加用户。当Docker守护程序启动时,它会创建一个可由该docker组成员访问的Unix套接字。
警告
该docker组授予与root 用户等效的权限。有关如何影响系统安全性的详细信息,请参阅 Docker Daemon Attack Surface。
要创建docker组并添加您的用户:
创建docker组。
~~~
$ sudo groupadd docker
~~~
将您的用户添加到该docker组。
~~~
$ sudo usermod -aG docker $USER
~~~
注销并重新登录,以便重新评估您的组成员身份。
如果在虚拟机上进行测试,则可能需要重新启动虚拟机才能使更改生效。
在桌面Linux环境(如X Windows)上,完全注销会话,然后重新登录。
验证您是否可以运行docker命令sudo。
~~~
$ docker run hello-world
~~~
此命令下载测试映像并在容器中运行它。当容器运行时,它会打印一条信息性消息并退出。
如果sudo在将用户添加到docker组之前最初使用Docker CLI命令,则可能会看到以下错误,表示~/.docker/由于sudo命令而创建的目录的权限不正确。
~~~
WARNING: Error loading config file: /home/user/.docker/config.json -
stat /home/user/.docker/config.json: permission denied
~~~
要解决此问题,请删除~/.docker/目录(它会自动重新创建,但任何自定义设置都将丢失),或使用以下命令更改其所有权和权限:
~~~
$ sudo chown "$USER":"$USER" /home/"$USER"/.docker -R
$ sudo chmod g+rwx "$HOME/.docker" -R
~~~
配置Docker以在启动时启动 大多数当前的Linux发行版(RHEL,CentOS,Fedora,Ubuntu 16.04及更高版本)用于systemd管理系统启动时启动的服务。Ubuntu 14.10及以下使用upstart。
~~~
systemd
$ sudo systemctl enable docker
~~~
要禁用此行为,请disable改用。
~~~
$ sudo systemctl disable docker
~~~
如果需要添加HTTP代理,为Docker运行时文件设置不同的目录或分区,或进行其他自定义,请参阅 自定义systemd Docker守护程序选项。
upstart Docker自动配置为在启动时启动 upstart。要禁用此行为,请使用以下命令:
~~~
$ echo manual | sudo tee /etc/init/docker.override
chkconfig
$ sudo chkconfig docker on
~~~
使用其他存储引擎 有关不同存储引擎的信息,请参阅 存储驱动程序。默认存储引擎和支持的存储引擎列表取决于主机的Linux发行版和可用的内核驱动程序。
配置Docker守护程序侦听连接的位置 默认情况下,Docker守护程序侦听UNIX套接字上的连接以接受来自本地客户端的请求。通过将Docker配置为侦听IP地址和端口以及UNIX套接字,可以允许Docker接受来自远程主机的请求。有关此配置选项的更多详细信息,请参阅Docker CLI参考文章中的“将Docker绑定到另一个主机/端口或unix套接字”部分。
Docker EE客户
Docker EE客户可以使用UCP客户端捆绑包获得对UCP的远程CLI访问。UCP客户端捆绑包由UCP生成,并由相互TLS保护。有关更多信息,请参阅有关UCP的CLI访问的文档 。
保护您的连接
在配置Docker以接受来自远程主机的连接之前,了解打开docker到网络的安全隐患至关重要。如果不采取措施来保护连接,则远程非root用户可以在主机上获得root访问权限。有关如何使用TLS证书保护此连接的更多信息,请查看有关 如何保护Docker守护程序套接字的文章。
配置Docker以接受远程连接可以docker.service使用systemd的Linux发行版的systemd单元文件来完成,例如RedHat,CentOS,Ubuntu和SLES的最新版本,或者daemon.json推荐用于不使用systemd的Linux发行版的文件。
systemd vs daemon.json
配置Docker以使用systemd单元文件和daemon.json 文件来侦听连接会导致冲突,从而阻止Docker启动。
使用systemd单元文件配置远程访问 使用该命令在文本编辑器中sudo systemctl edit docker.service打开覆盖文件docker.service。
添加或修改以下行,替换您自己的值。
\[Service\] ExecStart= ExecStart=/usr/bin/dockerd -H fd:// -H tcp://127.0.0.1:2375 保存文件。
重新加载systemctl配置。
$ sudo systemctl daemon-reload 重启Docker。
$ sudo systemctl restart docker.service 通过查看netstat确认的输出是否dockerd正在侦听已配置的端口来检查更改是否得到遵守。
$ sudo netstat -lntp | grep dockerd tcp 0 0 127.0.0.1:2375 0.0.0.0:\* LISTEN 3758/dockerd 使用配置远程访问 daemon.json 将hosts数组设置/etc/docker/daemon.json为连接到UNIX套接字和IP地址,如下所示:
{ "hosts": \["unix:///var/run/docker.sock", "tcp://127.0.0.1:2375"\] } 重启Docker。
通过查看netstat确认的输出是否dockerd正在侦听已配置的端口来检查更改是否得到遵守。
$ sudo netstat -lntp | grep dockerd tcp 0 0 127.0.0.1:2375 0.0.0.0:\* LISTEN 3758/dockerd 在Docker守护程序上启用IPv6 要在Docker守护程序上启用IPv6,请参阅 启用IPv6支持。
故障排除 内核兼容性 如果您的内核早于3.10版本或者缺少某些模块,则Docker无法正常运行。要检查内核兼容性,可以下载并运行该check-config.sh 脚本。
$ curl https://raw.githubusercontent.com/docker/docker/master/contrib/check-config.sh > check-config.sh
$ bash ./check-config.sh 该脚本仅适用于Linux,而不适用于macOS。
Cannot connect to the Docker daemon 如果您看到如下所示的错误,则可能将Docker客户端配置为连接到其他主机上的Docker守护程序,并且该主机可能无法访问。
Cannot connect to the Docker daemon. Is 'docker daemon' running on this host? 要查看客户端配置连接到哪个主机,请检查DOCKER\_HOST环境中变量的值。
$ env | grep DOCKER\_HOST 如果此命令返回值,则Docker客户端将设置为连接到在该主机上运行的Docker守护程序。如果未设置,则Docker客户端将设置为连接到本地主机上运行的Docker守护程序。如果设置错误,请使用以下命令取消设置:
$ unset DOCKER\_HOST 您可能需要在文件中编辑环境,~/.bashrc或者 ~/.profile防止DOCKER\_HOST错误地设置变量。
如果DOCKER\_HOST按预期设置,请验证Docker守护程序是否在远程主机上运行,以及防火墙或网络中断是否阻止您进行连接。
IP转发问题 如果使用手动配置你的网络systemd-network有systemd 219或更高版本,Docker容器可能无法访问您的网络。从systemd版本220 开始,给定网络(net.ipv4.conf..forwarding)的转发设置默认为关闭。此设置可防止IP转发。它还与Docker net.ipv4.conf.all.forwarding在容器中启用设置的行为相冲突。
要在RHEL,CentOS或Fedora上解决此问题,请.network 在/usr/lib/systemd/network/Docker主机上编辑该文件(例如:)/usr/lib/systemd/network/80-container-host0.network并在该\[Network\]部分中添加以下块。
\[Network\] ... IPForward=kernel
# OR
IPForward=true ... 此配置允许按预期从容器进行IP转发。
DNS resolver found in resolv.conf and containers can't use it 使用GUI的Linux系统通常运行网络管理器,该网络管理器使用dnsmasq在环回地址上运行的 实例,例如127.0.0.1或 127.0.1.1缓存DNS请求,并将此条目添加到 /etc/resolv.conf。该dnsmasq服务可加速DNS查询并提供DHCP服务。此配置不拥有自己的网络命名空间的码头工人容器内工作,因为多克尔容器解决回环地址,如127.0.0.1对 自身,这是很不可能的运行在自己的回送地址的DNS服务器。
如果Docker检测到没有引用的DNS服务器/etc/resolv.conf是功能齐全的DNS服务器,则会出现以下警告,并且Docker使用Google提供的公共DNS服务器8.8.8.8并8.8.4.4进行DNS解析。
WARNING: Local (127.0.0.1) DNS resolver found in resolv.conf and containers can't use it. Using default external servers : \[8.8.8.8 8.8.4.4\] 如果您看到此警告,请先检查您是否使用dnsmasq:
$ ps aux |grep dnsmasq 如果您的容器需要解析网络内部的主机,则公共名称服务器不够用。你有两个选择:
您可以为Docker指定要使用的DNS服务器,或 您可以dnsmasq在NetworkManager中禁用。如果您这样做,NetworkManager会添加您真正的DNS名称服务器/etc/resolv.conf,但您将失去可能的好处dnsmasq。 您只需要使用这些方法之一。
为Docker指定DNS服务器 配置文件的默认位置是/etc/docker/daemon.json。您可以使用--config-file 守护程序标志更改配置文件的位置。以下文档假定配置文件位于/etc/docker/daemon.json。
创建或编辑Docker守护程序配置文件,该/etc/docker/daemon.json文件默认为 file,它控制Docker守护程序配置。
$ sudo nano /etc/docker/daemon.json 添加dns一个或多个IP地址作为值的密钥。如果文件包含现有内容,则只需添加或编辑该dns行。
{ "dns": \["8.8.8.8", "8.8.4.4"\] } 如果您的内部DNS服务器无法解析公共IP地址,请至少包含一个DNS服务器,以便您可以连接到Docker Hub,以便您的容器可以解析Internet域名。
保存并关闭文件。
重新启动Docker守护程序。
$ sudo service docker restart 通过尝试提取图像来验证Docker是否可以解析外部IP地址:
$ docker pull hello-world 如有必要,请验证Docker容器是否可以通过ping它来解析内部主机名。
$ docker run --rm -it alpine ping -c4
PING google.com (192.168.1.2): 56 data bytes 64 bytes from 192.168.1.2: seq=0 ttl=41 time=7.597 ms 64 bytes from 192.168.1.2: seq=1 ttl=41 time=7.635 ms 64 bytes from 192.168.1.2: seq=2 ttl=41 time=7.660 ms 64 bytes from 192.168.1.2: seq=3 ttl=41 time=7.677 ms 禁用 DNSMASQ Ubuntu的 如果您不想更改Docker守护程序的配置以使用特定的IP地址,请按照以下说明dnsmasq在NetworkManager中禁用。
编辑/etc/NetworkManager/NetworkManager.conf文件。
通过dns=dnsmasq在行#的开头添加一个字符来注释掉该行。
# dns=dnsmasq
保存并关闭文件。
重新启动NetworkManager和Docker。作为替代方案,您可以重新启动系统。
$ sudo restart network-manager $ sudo restart docker RHEL,CentOS或Fedora 要dnsmasq在RHEL,CentOS或Fedora上禁用:
禁用该dnsmasq服务:
$ sudo service dnsmasq stop
$ sudo systemctl disable dnsmasq 使用Red Hat文档手动配置DNS服务器 。
允许通过防火墙访问远程API 如果您在运行Docker的同一主机上运行防火墙并且想要从另一台主机访问Docker Remote API并启用远程访问,则需要配置防火墙以允许Docker端口上的传入连接,默认为2376if启用TLS加密传输或2375 以其他方式启用。
两个常见的防火墙守护程序是 UFW(简单防火墙)(通常用于Ubuntu系统)和firewalld(通常用于基于RPM的系统)。请参阅操作系统和防火墙的文档,但以下信息可能有助于您入门。这些选项相当宽松,您可能希望使用不同的配置来更好地锁定系统。
UFW:DEFAULT\_FORWARD\_POLICY="ACCEPT"在您的配置中设置。
firewalld:在策略中添加与以下类似的规则(一个用于传入请求,另一个用于传出请求)。确保接口名称和链名称正确。
\[\-i zt0 -j ACCEPT\] \[\-o zt0 -j ACCEPT\]Your kernel does not support cgroup swap limit capabilities 在Ubuntu或Debian主机上,使用图像时,您可能会看到类似于以下内容的消息。
WARNING: Your kernel does not support swap limit capabilities. Limitation discarded. 在基于RPM的系统上不会发生此警告,这些系统默认启用这些功能。
如果您不需要这些功能,则可以忽略该警告。您可以按照这些说明在Ubuntu或Debian上启用这些功能。即使Docker未运行,内存和交换计费也会占总可用内存的1%左右,总体性能降低10%。
以具有sudo权限的用户身份登录Ubuntu或Debian主机。
编辑/etc/default/grub文件。添加或编辑该GRUB\_CMDLINE\_LINUX行以添加以下两个键值对:
GRUB\_CMDLINE\_LINUX="cgroup\_enable=memory swapaccount=1" 保存并关闭文件。
更新GRUB。
$ sudo update-grub 如果GRUB配置文件的语法不正确,则会发生错误。在这种情况下,请重复步骤3和4。
重新启动系统后,更改将生效。
第十二章 服务链路追踪
最后更新于:2022-03-31 23:46:20
第十一章 服务容错 Hystrix
最后更新于:2022-03-31 23:46:18
## 什么是Hystrix
在分布式系统中,服务与服务之间依赖错综复杂,一种不可避免的情况就是某些服务将会出现失败。Hystrix是一个库,它提供了服务与服务之间的容错功能,主要体现在延迟容错和容错,从而做到控制分布式系统中的联动故障。Hystrix通过隔离服务的访问点,阻止联动故障,并提供故障的解决方案,从而提高了这个分布式系统的弹性。
## Hystrix解决了什么问题
在复杂的分布式系统中,可能有成百上千个依赖服务,这些服务由于某种故障,比如机房的不可靠性、网络服务商的不可靠性等因素,导致某个服务不可用,如果系统不隔离该不可用的服务,可能会导致整个系统不可用。
例如,对于依赖30个服务的应用程序,每个服务的正常运行时间为99.99%,这是您期望的
99.9930 = 99.7%的正常运行时间 10亿次请求中有0.3%= 3,000,000次失败 2小时停机时间/月,即使所有的依赖都有很好的正常运行时间。
实际情况可能比这更糟糕。
如果不设计整个系统的韧性,即使所有依赖关系表现良好,即使0.01%的停机时间对数十个服务中的每一个服务的总体影响等同于每个月停机的潜在时间。
当所以的服务都出UP状态,即Ok状态,一个请求流程可能是这样:
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/36441aaaf32ce6633ef7049f45414b80_615x548.png)
当某一个服务出现了延迟,可能会阻止整个该请求:
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/82af0542d171acf4a7a8fa9781ed1cd5_607x562.png)
在高并发的情况下,单个服务的延迟,可能导致所有的请求都处于延迟状态,可能在几秒钟就使服务处于负载饱和的状态。
服务的单个点的请求故障,会导致整个服务出现故障,更为糟糕的是该故障服务,会导致其他的服务出现负载饱和,资源耗尽,直到不可用,从而导致这个分布式系统都不可用。这就是“雪崩”。
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/68fee363009ea7913fad540ca9d0c084_590x545.png)
当通过第三方客户端执行网络访问时,这些问题会加剧。第三方客户就是一个“黑匣子”,其中实施细节被隐藏,并且可以随时更改,网络或资源配置对于每个客户端库都是不同的,通常难以监视和 更改。
通过的故障包括:
网络连接失败或降级。 服务和服务器失败或变慢。 新的库或服务部署会改变行为或性能特征。 客户端库有错误。
所有这些都代表需要隔离和管理的故障和延迟,以便单个故障依赖关系不能导致整个应用程序或系统的故障。
## Hystrix的设计原则
原则如下:
防止单个服务的故障,耗尽整个系统服务的容器(比如tomcat)的线程资源。
减少负载并快速失败,而不是排队。
在可行的情况下提供回退以保护用户免受故障。
使用隔离技术(如隔板,泳道和断路器模式)来限制任何一个依赖的影响。
通过近乎实时的指标,监控和警报来优化发现故障的时间。
通过配置更改的低延迟传播优化恢复时间,并支持Hystrix大多数方面的动态属性更改,从而允许您使用低延迟反馈循环进行实时操作修改。
保护整个依赖客户端执行中的故障,而不仅仅是在网络流量上进行保护降级、限流。
## Hystrix 是怎么实现它的设计目标的?
通过HystrixCommand 或者HystrixObservableCommand 将所有的外部系统(或者称为依赖)包装起来,整个包装对象是单独运行在一个线程之中(这是典型的命令模式)。
超时请求应该超过你定义的阈值
为每个依赖关系维护一个小的线程池(或信号量); 如果它变满了,那么依赖关系的请求将立即被拒绝,而不是排队等待。
统计成功,失败(由客户端抛出的异常),超时和线程拒绝。
打开断路器可以在一段时间内停止对特定服务的所有请求,如果服务的错误百分比通过阈值,手动或自动的关闭断路器。
当请求被拒绝、连接超时或者断路器打开,直接执行fallback逻辑。
近乎实时监控指标和配置变化。
当您使用Hystrix包装每个底层依赖项时,上图所示的体系结构如下图所示。 每个依赖关系彼此隔离,在延迟发生时可以饱和的资源受到限制,迅速执行fallback的逻辑,该逻辑决定了在依赖关系中发生任何类型的故障时会做出什么响应:
:-:![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/9a09b31ae934e1c01df9f45dd4120b01_601x829.png)
## Hystrix是怎么工作的?
架构图
下图显示通过Hystrix向服务依赖关系发出请求时会发生什么:
:-:![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/af2e2c3e61c26b8cc045894bc183b803_626x308.png)
具体将从以下几个方面进行描述:
1.构建一个HystrixCommand或者HystrixObservableCommand 对象。
第一步是构建一个HystrixCommand或HystrixObservableCommand对象来表示你对依赖关系的请求。 其中构造函数需要和请求时的参数一致。
构造HystrixCommand对象,如果依赖关系预期返回单个响应。 可以这样写:
1 HystrixCommand command = new HystrixCommand(arg1, arg2);
同理,可以构建HystrixObservableCommand :
12 HystrixObservableCommand command = new HystrixObservableCommand(arg1, arg2);
2.执行Command
通过使用Hystrix命令对象的以下四种方法之一,可以执行该命令有四种方法(前两种方法仅适用于简单的HystrixCommand对象,并不适用于HystrixObservableCommand):
execute()–阻塞,,然后返回从依赖关系接收到的单个响应(或者在发生错误时抛出异常)
queue()–返回一个可以从依赖关系获得单个响应的future 对象
observe()–订阅Observable代表依赖关系的响应,并返回一个Observable,该Observable会复制该来源Observable
toObservable() –返回一个Observable,当您订阅它时,将执行Hystrix命令并发出其响应
1234 K value = command.execute();FuturefValue = command.queue();ObservableohValue = command.observe(); ObservableocValue = command.toObservable();
同步调用execute()调用queue().get(). queue()依次调用toObservable().toBlocking().toFuture()。 这就是说,最终每个HystrixCommand都由一个Observable实现支持,甚至是那些旨在返回单个简单值的命令。
3.响应是否有缓存?
如果为该命令启用请求缓存,并且如果缓存中对该请求的响应可用,则此缓存响应将立即以“可观察”的形式返回。
4.断路器是否打开?
当您执行该命令时,Hystrix将检查断路器以查看电路是否打开。
如果电路打开(或“跳闸”),则Hystrix将不会执行该命令,但会将流程路由到(8)获取回退。
如果电路关闭,则流程进行到(5)以检查是否有可用于运行命令的容量。
5.线程池/队列/信号量是否已经满负载?
如果与命令相关联的线程池和队列(或信号量,如果不在线程中运行)已满,则Hystrix将不会执行该命令,但将立即将流程路由到(8)获取回退。
6.HystrixObservableCommand.construct() 或者 HystrixCommand.run()
在这里,Hystrix通过您为此目的编写的方法调用对依赖关系的请求,其中之一是:
HystrixCommand.run() - 返回单个响应或者引发异常
HystrixObservableCommand.construct() - 返回一个发出响应的Observable或者发送一个onError通知
如果run()或construct()方法超出了命令的超时值,则该线程将抛出一个TimeoutException(或者如果命令本身没有在自己的线程中运行,则会产生单独的计时器线程)。 在这种情况下,Hystrix将响应通过8进行路由。获取Fallback,如果该方法不取消/中断,它会丢弃最终返回值run()或construct()方法。
请注意,没有办法强制潜在线程停止工作 - 最好的Hystrix可以在JVM上执行它来抛出一个InterruptedException。 如果由Hystrix包装的工作不处理InterruptedExceptions,Hystrix线程池中的线程将继续工作,尽管客户端已经收到了TimeoutException。 这种行为可能使Hystrix线程池饱和,尽管负载“正确地流失”。 大多数Java HTTP客户端库不会解释InterruptedExceptions。 因此,请确保在HTTP客户端上正确配置连接和读/写超时。
如果该命令没有引发任何异常并返回响应,则Hystrix在执行某些日志记录和度量报告后返回此响应。 在run()的情况下,Hystrix返回一个Observable,发出单个响应,然后进行一个onCompleted通知; 在construct()的情况下,Hystrix返回由construct()返回的相同的Observable。
7.计算Circuit 的健康
Hystrix向断路器报告成功,失败,拒绝和超时,该断路器维护了一系列的计算统计数据组。
它使用这些统计信息来确定电路何时“跳闸”,此时短路任何后续请求直到恢复时间过去,在首次检查某些健康检查之后,它再次关闭电路。
8.获取Fallback
当命令执行失败时,Hystrix试图恢复到你的回退:当construct()或run()(6.)抛出异常时,当命令由于电路断开而短路时(4.),当 命令的线程池和队列或信号量处于容量(5.),或者当命令超过其超时长度时。
编写Fallback ,它不一依赖于任何的网络依赖,从内存中获取获取通过其他的静态逻辑。如果你非要通过网络去获取Fallback,你可能需要些在获取服务的接口的逻辑上写一个HystrixCommand。
9.返回成功的响应
如果 Hystrix command成功,如果Hystrix命令成功,它将以Observable的形式返回对呼叫者的响应或响应。 根据您在上述步骤2中调用命令的方式,此Observable可能会在返回给您之前进行转换:
:-:![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/7bd56dc5d05e0b9ecb41766457d484d9_635x209.png)
execute() - 以与.queue()相同的方式获取Future,然后在此Future上调用get()来获取Observable发出的单个值
queue() - 将Observable转换为BlockingObservable,以便将其转换为Future,然后返回此未来
observe() - 立即订阅Observable并启动执行命令的流程; 返回一个Observable,当您订阅它时,重播排放和通知
toObservable() - 返回Observable不变; 您必须订阅它才能实际开始导致命令执行的流程
## 断路器(Circuit Breaker)
下图显示HystrixCommand或HystrixObservableCommand如何与HystrixCircuitBreaker及其逻辑和决策流程进行交互,包括计数器在断路器中的行为。
:-:![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/362d5492a1f5e75bd9aec629ada9adce_671x504.png)
发生电路开闭的过程如下:
1.假设电路上的音量达到一定阈值(HystrixCommandProperties.circuitBreakerRequestVolumeThreshold())…
2.并假设错误百分比超过阈值错误百分比(HystrixCommandProperties.circuitBreakerErrorThresholdPercentage())…
3.然后断路器从CLOSED转换到OPEN。
4.虽然它是开放的,它使所有针对该断路器的请求短路。
5.经过一段时间(HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds()),下一个单个请求是通过(这是HALF-OPEN状态)。 如果请求失败,断路器将在睡眠窗口持续时间内返回到OPEN状态。 如果请求成功,断路器将转换到CLOSED,逻辑1.重新接管。
## 隔离(Isolation)
Hystrix采用隔板模式来隔离彼此的依赖关系,并限制对其中任何一个的并发访问。
:-:![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/3802ba5538d6f68fd70525233e33b2f4_641x518.png)
# 依赖和部署
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/a54df87c41d8c37d28e950ea4c8cc501_667x237.png)
* 服务降级
* 服务熔断
* 依赖隔离
* 监控
## 服务降级
1. 引入依赖
~~~
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-hystrix</artifactId>
</dependency>
~~~
2. 启动类加上注解
~~~
@EnableCircuitBreaker
@EnableDiscoveryClient
@EnableFeignClients(basePackages = "com.eclab.product.iclient")
public class OrderApplication {
public static void main(String[] args) {
SpringApplication.run(OrderApplication.class, args);
}
}
~~~
随着业务的不断增加,注解也越来越多,此时有些注解可以使用另外的注解代替:
~~~
/*@SpringBootApplication
@EnableDiscoveryClient
@EnableCircuitBreaker*/
@SpringCloudApplication
@EnableFeignClients(basePackages = "com.eclab.product.iclient")
@EnableHystrixDashboard
public class OrderApplication {
public static void main(String[] args) {
SpringApplication.run(OrderApplication.class, args);
}
}
~~~
服务降级实例:
~~~
@HystrixCommand(fallbackMethod = "fallback")
@GetMapping("/getProductInfoList")
public String getProductInfoList(@RequestParam("i") Integer i){
if (i % 2 == 0){
return "Sucess";
}
RestTemplate restTemplate = new RestTemplate();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return restTemplate.postForObject("http://localhost:8081/product/listForOrder",
Arrays.asList("157875152258154"),
String.class);
}
private String fallback(){
return "太拥挤了,请稍后再试~";
}
~~~
在服务降级中,除了调用的目标服务不可用导致的错误引起降级之外,自身的异常也可以进行降级
降级的方法除了自定外,还可以有个全局的通用方法,将注解加在类上:
~~~
@RestController
@DefaultProperties(defaultFallback = "defaultFallback")
public class HystrixController {
//超时配置
/* @HystrixCommand(fallbackMethod = "fallback",
commandProperties = {
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds" , value = "3000")
}
)*/
@HystrixCommand(commandProperties = {
@HystrixProperty(name = "circuitBreaker.enabled" , value = "true"), //设置熔断
@HystrixProperty(name = "circuitBreaker.requestVolumeThreshold" , value = "10"),
@HystrixProperty(name = "circuitBreaker.sleepWindowInMilliseconds" , value = "1000"),
@HystrixProperty(name = "circuitBreaker.errorThresholdPercentage" , value = "60")
})
@GetMapping("/getProductInfoList")
public String getProductInfoList(@RequestParam("i") Integer i){
if (i % 2 == 0){
return "Sucess";
}
RestTemplate restTemplate = new RestTemplate();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return restTemplate.postForObject("http://localhost:8081/product/listForOrder",
Arrays.asList("157875152258154"),
String.class);
}
private String fallback(){
return "太拥挤了,请稍后再试~";
}
private String defaultFallback(){
return "默认提示:太拥挤了,请稍后再试~";
}
}
~~~
上面的代码中的超时设置,可以用来配置是否降级
# 依赖隔离
* 线程池隔离
* Hystrix自动实现依赖隔离
熔断的配置:
在需要进行熔断配置的方法上加上Hystrix的注解
~~~
@HystrixCommand(commandProperties = {
@HystrixProperty(name = "circuitBreaker.enabled" , value = "true"), //设置熔断
@HystrixProperty(name = "circuitBreaker.requestVolumeThreshold" , value = "10"),
@HystrixProperty(name = "circuitBreaker.sleepWindowInMilliseconds" , value = "1000"),
@HystrixProperty(name = "circuitBreaker.errorThresholdPercentage" , value = "60")
})
~~~
* Circuit Breaker : 断路器
[断路器详解,马丁](https://martinfowler.com/bliki/CircuitBreaker.html)
断路器是将受保护的对象封装在可以监控故障的断路对象里面,当故障达到一定的值,将会引发跳闸,断路器对象返回错误
* 图解(断路器模式状态机):
:-:![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/189e0a0608fecb68ab2bef9a39bdd601_469x410.png)
* circuitBreaker.sleepWindowInMilliseconds:时间窗口
当断路器打开,对主逻辑进行熔断之后,Hystrix会开启一个休眠时间窗口,将降级逻辑临时提升为主逻辑,当休眠时间到期,断路器将进入半开状态,释放一次请求到原来的主逻辑上,如果此次请求正常返回,那么断路器将继续闭合,主逻辑恢复,如果此次请求依然失败,断路器进入打开状态,休眠时间窗继续计时。
* circuitBreaker.requestVolumeThreshold : 设置在滚动窗口中断路器的最小请求数
* circuitBreaker.errorThresholdPercentage : 断路器打开的错误百分比条件
在配置文件中统一配置:
~~~
hystrix:
command:
default:
execution:
isolation:
thread:
timeoutInMilliseconds: 3000
getProductInfoList:
execution:
isolation:
thread:
timeoutInMilliseconds: 800
~~~
# Hystrix Dashboard
Hystrix-dashboard是一款针对Hystrix进行实时监控的工具,通过Hystrix Dashboard我们可以在直观地看到各Hystrix Command的请求响应时间, 请求成功率等数据。
* Hystrix Dashboard
1、添加依赖
~~~
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-hystrix</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-hystrix-dashboard</artifactId>
</dependency>
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-javanica</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
~~~
2、在启动类上加上注解
~~~
@EnableHystrixDashboard
public class OrderApplication {
public static void main(String[] args) {
SpringApplication.run(OrderApplication.class, args);
}
}
~~~
3、在浏览器中进行访问
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/3e3b52e6bbe1e0d39c7c411f67c32c2f_1137x645.png)
在输入框中填入相关信息:
:-:![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/a76c1e440e50047d3594ac96ca0eb9fc_971x605.png)
选择单机版:[http://hystrix-app](http://hystrix-app/):port/actuator/hystrix.stream
填写对应的端口和ip,时间和应用名,即可进入主界面
:-:![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/815fe6f48a98359ced0c48b8690ec674_1134x482.png)
访问下熔断的接口:
:-:![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/283eb172422c8c3a3aa7def70e8adc3f_1144x595.png)
使用postman压力测试:
:-:![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/573ddc965d1736b76ec585a4ded5a58f_1251x739.png)
结果:
:-:![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/03579f0a19e26834304127cb5a808439_1133x710.png)
错误次数累计,开始熔断打开:
:-:![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/0befccff3651337caf2be0789fa92909_1136x653.png)
第十章 路由网关 Zuul
最后更新于:2022-03-31 23:46:16
第九章 消息和异步Stream
最后更新于:2022-03-31 23:46:13
# 消息驱动
微服务的目的: 松耦合
事件驱动的优势:高度解耦
Spring Cloud Stream 的几个概念 Spring Cloud Stream is a framework for building message-driven microservice applications.
官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。
Spring Cloud Stream Application 应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中binder 交互,通过我们配置来 binding ,而 Spring Cloud Stream 的 binder 负责与中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式
Binder Binder 是 Spring Cloud Stream 的一个抽象概念,是应用与消息中间件之间的粘合剂。目前 Spring Cloud Stream 实现了 Kafka 和 Rabbit MQ 的binder。
通过 binder ,可以很方便的连接中间件,可以动态的改变消息的 destinations(对应于 Kafka 的topic,Rabbit MQ 的 exchanges),这些都可以通过外部配置项来做到。
甚至可以任意的改变中间件的类型而不需要修改一行代码。
Publish-Subscribe 消息的发布(Publish)和订阅(Subscribe)是事件驱动的经典模式。Spring Cloud Stream 的数据交互也是基于这个思想。生产者把消息通过某个 topic 广播出去(Spring Cloud Stream 中的 destinations)。其他的微服务,通过订阅特定 topic 来获取广播出来的消息来触发业务的进行。
这种模式,极大的降低了生产者与消费者之间的耦合。即使有新的应用的引入,也不需要破坏当前系统的整体结构。
Consumer Groups “Group”,如果使用过 Kafka 的童鞋并不会陌生。Spring Cloud Stream 的这个分组概念的意思基本和 Kafka 一致。
微服务中动态的缩放同一个应用的数量以此来达到更高的处理能力是非常必须的。对于这种情况,同一个事件防止被重复消费,只要把这些应用放置于同一个 “group” 中,就能够保证消息只会被其中一个应用消费一次。
Durability 消息事件的持久化是必不可少的。Spring Cloud Stream 可以动态的选择一个消息队列是持久化,还是 present。
Bindings bindings 是我们通过配置把应用和spring cloud stream 的 binder 绑定在一起,之后我们只需要修改 binding 的配置来达到动态修改topic、exchange、type等一系列信息而不需要修改一行代码。
基于 RabbitMQ 使用 以下内容源码: spring cloud demo
消息接收 Spring Cloud Stream 基本用法,需要定义一个接口,如下是内置的一个接口。
~~~
public interface Sink {
String INPUT = "input";
@Input("input")
SubscribableChannel input();
}
~~~
注释\_\_ @Input\_\_ 对应的方法,需要返回 \_\_ SubscribableChannel \_\_ ,并且参入一个参数值。
这就接口声明了一个\_\_ binding \_\_命名为 “input” 。
其他内容通过配置指定:
~~~
spring:
cloud:
stream:
bindings:
input:
destination: mqTestDefault
~~~
destination:指定了消息获取的目的地,对应于MQ就是 exchange,这里的exchange就是 mqTestDefault
~~~
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
// 监听 binding 为 Sink.INPUT 的消息
@StreamListener(Sink.INPUT)
public void input(Message<String> message) {
System.out.println("一般监听收到:" + message.getPayload());
}
public static void main(String[] args) {
SpringApplication.run(Application.class);
}
}
~~~
定义一个 class (这里直接在启动类),并且添加注解@EnableBinding(Sink.class) ,其中 Sink 就是上述的接口。同时定义一个方法(此处是 input)标明注解为 \_\_ @StreamListener(Processor.INPUT) \_\_,方法参数为 Message 。
启动后,默认是会创建一个临时队列,临时队列绑定的exchange为 “mqTestDefault”,routing key为 “#”。
所有发送 exchange 为“mqTestDefault” 的MQ消息都会被投递到这个临时队列,并且触发上述的方法。
以上代码就完成了最基本的消费者部分。
消息发送 消息的发送同消息的接受,都需要定义一个接口,不同的是接口方法的返回对象是 MessageChannel,下面是 Spring Cloud Stream 内置的接口:
~~~
public interface Source {
String OUTPUT = "output";
@Output("output")
MessageChannel output();
}
~~~
这就接口声明了一个 binding 命名为 “output” ,不同于上述的 “input”,这个binding 声明了一个消息输出流,也就是消息的生产者。
~~~
spring:
cloud:
stream:
bindings:
output:
destination: mqTestDefault
contentType: text/plain
~~~
contentType:用于指定消息的类型。具体可以参考 spring cloud stream docs
destination:指定了消息发送的目的地,对应 RabbitMQ,会发送到 exchange 是 mqTestDefault 的所有消息队列中。
代码中调用:
~~~
@SpringBootApplication
@EnableBinding(Source.class)
public class Application implements CommandLineRunner {
@Autowired
@Qualifier("output")
MessageChannel output;
@Override
public void run(String... strings) throws Exception {
// 字符串类型发送MQ
System.out.println("字符串信息发送");
output.send(MessageBuilder.withPayload("大家好").build());
}
public static void main(String[] args) {
SpringApplication.run(Application.class);
}
}
~~~
通过注入MessageChannel的方式,发送消息。
通过注入Source 接口的方式,发送消息。 具体可以查看样例
以上代码就完成了最基本的生产者部分。
自定义消息发送接收 自定义接口 Spring Cloud Stream 内置了两种接口,分别定义了 binding 为 “input” 的输入流,和 “output” 的输出流,而在我们实际使用中,往往是需要定义各种输入输出流。使用方法也很简单。
~~~
interface OrderProcessor {
String INPUT_ORDER = "inputOrder";
String OUTPUT_ORDER = "outputOrder";
@Input(INPUT_ORDER)
SubscribableChannel inputOrder();
@Output(OUTPUT_ORDER)
MessageChannel outputOrder();
}
~~~
一个接口中,可以定义无数个输入输出流,可以根据实际业务情况划分。上述的接口,定义了一个订单输入,和订单输出两个 binding。
使用时,需要在 @EnableBinding 注解中,添加自定义的接口。 使用 @StreamListener 做监听的时候,需要指定 OrderProcessor.INPUT\_ORDER
~~~
spring:
cloud:
stream:
defaultBinder: defaultRabbit
bindings:
inputOrder:
destination: mqTestOrder
outputOrder:
destination: mqTestOrder
~~~
如上配置,指定了 destination 为 mqTestOrder 的输入输出流。
分组与持久化 上述自定义的接口配置中,Spring Cloud Stream 会在 RabbitMQ 中创建一个临时的队列,程序关闭,对应的连接关闭的时候,该队列也会消失。而在实际使用中,我们需要一个持久化的队列,并且指定一个分组,用于保证应用服务的缩放。
只需要在消费者端的 binding 添加配置项 spring.cloud.stream.bindings.\[channelName\].group = XXX 。对应的队列就是持久化,并且名称为:mqTestOrder.XXX。
rabbitMQ routing key 绑定 用惯了 rabbitMQ 的童鞋,在使用的时候,发现 Spring Cloud Stream 的消息投递,默认是根据 destination + group 进行区分,所有的消息都投递到 routing key 为 “#‘’ 的消息队列里。
如果我们需要进一步根据 routing key 来进行区分消息投递的目的地,或者消息接受,需要进一步配,Spring Cloud Stream 也提供了相关配置:
~~~
spring:
cloud:
stream:
bindings:
inputProductAdd:
destination: mqTestProduct
group: addProductHandler # 拥有 group 默认会持久化队列
outputProductAdd:
destination: mqTestProduct
rabbit:
bindings:
inputProductAdd:
consumer:
bindingRoutingKey: addProduct.* # 用来绑定消费者的 routing key
outputProductAdd:
producer:
routing-key-expression: '''addProduct.*''' # 需要用这个来指定 RoutingKey
~~~
spring.cloud.stream.rabbit.bindings.\[channelName\].consumer.bindingRoutingKey 指定了生成的消息队列的routing key
spring.cloud.stream.rabbit.bindings.\[channelName\].producer.routing-key-expression 指定了生产者消息投递的routing key
DLX 队列 DLX 作用 DLX:Dead-Letter-Exchange(死信队列)。利用DLX, 当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。消息变成死信一向有一下几种情况:
消息被拒绝(basic.reject/ basic.nack)并且requeue=false 消息TTL过期(参考:RabbitMQ之TTL(Time-To-Live 过期时间)) 队列达到最大长度
DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性,当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列,可以监听这个队列中消息做相应的处理。
Spring Cloud Stream 中使用
~~~
spring.cloud.stream.rabbit.bindings.[channelName].consumer.autoBindDlq=true
spring.cloud.stream.rabbit.bindings.[channelName].consumer.republishToDlq=true
~~~
配置说明,可以参考 spring cloud stream rabbitmq consumer properties
结论 Spring Cloud Stream 最大的方便之处,莫过于抽象了事件驱动的一些概念,对于消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件,切换topic。使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。
# [RabbitMq使用概况](http://localhost:8028/article/rabbitmq)
什么叫消息队列
消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
\*\*消息队列(Message Queue)\*\*是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。
为何用消息队列 从上面的描述中可以看出消息队列是一种应用间的异步协作机制,那什么时候需要使用 MQ 呢?
以常见的订单系统为例,用户点击【下单】按钮之后的业务逻辑可能包括:扣减库存、生成相应单据、发红包、发短信通知。在业务发展初期这些逻辑可能放在一起同步执行,随着业务的发展订单量增长,需要提升系统服务的性能,这时可以将一些不需要立即生效的操作拆分出来异步执行,比如发放红包、发短信通知等。这种场景下就可以用 MQ ,在下单的主流程(比如扣减库存、生成相应单据)完成之后发送一条消息到 MQ 让主流程快速完结,而由另外的单独线程拉取MQ的消息(或者由 MQ 推送消息),当发现 MQ 中有发红包或发短信之类的消息时,执行相应的业务逻辑。
以上是用于业务解耦的情况,其它常见场景包括最终一致性、广播、错峰流控等等。
RabbitMQ 特点 RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。
AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:
可靠性(Reliability) RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
灵活的路由(Flexible Routing) 在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
消息集群(Clustering) 多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。
高可用(Highly Available Queues) 队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
多种协议(Multi-protocol) RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。
多语言客户端(Many Clients) RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。
管理界面(Management UI) RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
跟踪机制(Tracing) 如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
插件机制(Plugin System) RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。
RabbitMQ 中的概念模型 消息模型 所有 MQ 产品从模型抽象上来说都是一样的过程: 消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。
消息流 RabbitMQ 基本概念 上面只是最简单抽象的描述,具体到 RabbitMQ 则有更详细的概念需要解释。上面介绍过 RabbitMQ 是 AMQP 协议的一个开源实现,所以其内部实际上也是 AMQP 中的基本概念:
RabbitMQ 内部结构 Message 消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。 Publisher 消息的生产者,也是一个向交换器发布消息的客户端应用程序。 Exchange 交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。 Binding 绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。 Queue 消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。 Connection 网络连接,比如一个TCP连接。 Channel 信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。 Consumer 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。 Virtual Host 虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。 Broker 表示消息队列服务器实体。 AMQP 中的消息路由 AMQP 中消息的路由过程和 Java 开发者熟悉的 JMS 存在一些差别,AMQP 中增加了 Exchange 和 Binding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。![请输入图片描述](https://upload-images.jianshu.io/upload_images/5015984-7fd73af768f28704.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/484/format/webp)
AMQP 的消息路由过程 Exchange 类型 Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型:
direct
![请输入图片描述](https://upload-images.jianshu.io/upload_images/5015984-13db639d2c22f2aa.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/385/format/webp)direct 交换器
消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。
fanout
![请输入图片描述](https://upload-images.jianshu.io/upload_images/5015984-2f509b7f34c47170.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/463/format/webp)fanout 交换器
每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。
topic![请输入图片描述](https://upload-images.jianshu.io/upload_images/5015984-275ea009bdf806a0.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/558/format/webp)topic 交换器
topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“”。#匹配0个或多个单词,匹配不多不少一个单词。 RabbitMQ 安装 一般来说安装 RabbitMQ 之前要安装 Erlang ,可以去Erlang官网下载。接着去RabbitMQ官网下载安装包,之后解压缩即可。根据操作系统不同官网提供了相应的安装说明:Windows、Debian / Ubuntu、RPM-based Linux、Mac
如果是Mac 用户,使用 HomeBrew 来安装,安装前要先更新 brew:
brew update 接着安装 rabbitmq 服务器:
brew install rabbitmq 这样 RabbitMQ 就安装好了,安装过程中会自动其所依赖的 Erlang 。
docker下启动安装rabbitmq:
~~~
docker run -d --hostname my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3.7.3-management
~~~
RabbitMQ 运行和管理 启动 启动很简单,找到安装后的 RabbitMQ 所在目录下的 sbin 目录,可以看到该目录下有6个以 rabbitmq 开头的可执行文件,直接执行 rabbitmq-server 即可,下面将 RabbitMQ 的安装位置以 . 代替,启动命令就是: ./sbin/rabbitmq-server 启动正常的话会看到一些启动过程信息和最后的 completed with 7 plugins,这也说明启动的时候默认加载了7个插件。
正常启动 后台启动 如果想让 RabbitMQ 以守护程序的方式在后台运行,可以在启动的时候加上 -detached 参数:
~~~
./sbin/rabbitmq-server -detached
~~~
查询服务器状态 sbin 目录下有个特别重要的文件叫 rabbitmqctl ,它提供了 RabbitMQ 管理需要的几乎一站式解决方案,绝大部分的运维命令它都可以提供。 查询 RabbitMQ 服务器的状态信息可以用参数 status :
~~~
./sbin/rabbitmqctl status
~~~
该命令将输出服务器的很多信息,比如 RabbitMQ 和 Erlang 的版本、OS 名称、内存等等
关闭 RabbitMQ 节点 我们知道 RabbitMQ 是用 Erlang 语言写的,在Erlang 中有两个概念:节点和应用程序。节点就是 Erlang 虚拟机的每个实例,而多个 Erlang 应用程序可以运行在同一个节点之上。节点之间可以进行本地通信(不管他们是不是运行在同一台服务器之上)。比如一个运行在节点A上的应用程序可以调用节点B上应用程序的方法,就好像调用本地函数一样。如果应用程序由于某些原因奔溃,Erlang 节点会自动尝试重启应用程序。 如果要关闭整个 RabbitMQ 节点可以用参数 stop :
~~~
./sbin/rabbitmqctl stop
~~~
它会和本地节点通信并指示其干净的关闭,也可以指定关闭不同的节点,包括远程节点,只需要传入参数 -n :
~~~
./sbin/rabbitmqctl -n rabbit@server.example.com stop
~~~
\-n node 默认 node 名称是 rabbit@server ,如果你的主机名是 server.example.com ,那么 node 名称就是 rabbit@server.example.com 。
关闭 RabbitMQ 应用程序 如果只想关闭应用程序,同时保持 Erlang 节点运行则可以用 stop\_app:
~~~
./sbin/rabbitmqctl stop_app
~~~
这个命令在后面要讲的集群模式中将会很有用。
启动 RabbitMQ 应用程序
~~~
./sbin/rabbitmqctl start_app
~~~
重置 RabbitMQ 节点
~~~
./sbin/rabbitmqctl reset
~~~
该命令将清除所有的队列。
查看已声明的队列
~~~
./sbin/rabbitmqctl list_queues
~~~
查看交换器
~~~
./sbin/rabbitmqctl list_exchanges
~~~
该命令还可以附加参数,比如列出交换器的名称、类型、是否持久化、是否自动删除:
~~~
./sbin/rabbitmqctl list_exchanges name type durable auto_delete
~~~
查看绑定
~~~
./sbin/rabbitmqctl list_bindings
~~~
Java 客户端访问 RabbitMQ 支持多种语言访问,以 Java 为例看下一般使用 RabbitMQ 的步骤。
maven工程的pom文件中添加依赖
~~~
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.1.0</version>
</dependency>
消息生产者
package org.study.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
//设置 RabbitMQ 地址
factory.setHost("localhost");
//建立到代理服务器到连接
Connection conn = factory.newConnection();
//获得信道
Channel channel = conn.createChannel();
//声明交换器
String exchangeName = "hello-exchange";
channel.exchangeDeclare(exchangeName, "direct", true);
String routingKey = "hola";
//发布消息
byte[] messageBodyBytes = "quit".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
channel.close();
conn.close();
}
}
~~~
消息消费者
~~~
package org.study.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
//建立到代理服务器到连接
Connection conn = factory.newConnection();
//获得信道
final Channel channel = conn.createChannel();
//声明交换器
String exchangeName = "hello-exchange";
channel.exchangeDeclare(exchangeName, "direct", true);
//声明队列
String queueName = channel.queueDeclare().getQueue();
String routingKey = "hola";
//绑定队列,通过键 hola 将队列和交换器绑定起来
channel.queueBind(queueName, exchangeName, routingKey);
while(true) {
//消费消息
boolean autoAck = false;
String consumerTag = "";
channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
System.out.println("消费的路由键:" + routingKey);
System.out.println("消费的内容类型:" + contentType);
long deliveryTag = envelope.getDeliveryTag();
//确认消息
channel.basicAck(deliveryTag, false);
System.out.println("消费的消息体内容:");
String bodyStr = new String(body, "UTF-8");
System.out.println(bodyStr);
}
});
}
}
}
~~~
启动 RabbitMQ 服务器
~~~
./sbin/rabbitmq-server
~~~
运行 Consumer 先运行 Consumer ,这样当生产者发送消息的时候能在消费者后端看到消息记录。 运行 Producer 接着运行 Producer ,发布一条消息,在 Consumer 的控制台能看到接收的消息:
Consumer 控制台 RabbitMQ 集群 RabbitMQ 最优秀的功能之一就是内建集群,这个功能设计的目的是允许消费者和生产者在节点崩溃的情况下继续运行,以及通过添加更多的节点来线性扩展消息通信吞吐量。RabbitMQ 内部利用 Erlang 提供的分布式通信框架 OTP 来满足上述需求,使客户端在失去一个 RabbitMQ 节点连接的情况下,还是能够重新连接到集群中的任何其他节点继续生产、消费消息。
RabbitMQ 集群中的一些概念 RabbitMQ 会始终记录以下四种类型的内部元数据:
队列元数据 包括队列名称和它们的属性,比如是否可持久化,是否自动删除 交换器元数据 交换器名称、类型、属性 绑定元数据 内部是一张表格记录如何将消息路由到队列 vhost 元数据 为 vhost 内部的队列、交换器、绑定提供命名空间和安全属性 在单一节点中,RabbitMQ 会将所有这些信息存储在内存中,同时将标记为可持久化的队列、交换器、绑定存储到硬盘上。存到硬盘上可以确保队列和交换器在节点重启后能够重建。而在集群模式下同样也提供两种选择:存到硬盘上(独立节点的默认设置),存在内存中。
如果在集群中创建队列,集群只会在单个节点而不是所有节点上创建完整的队列信息(元数据、状态、内容)。结果是只有队列的所有者节点知道有关队列的所有信息,因此当集群节点崩溃时,该节点的队列和绑定就消失了,并且任何匹配该队列的绑定的新消息也丢失了。还好RabbitMQ 2.6.0之后提供了镜像队列以避免集群节点故障导致的队列内容不可用。
RabbitMQ 集群中可以共享 user、vhost、exchange等,所有的数据和状态都是必须在所有节点上复制的,例外就是上面所说的消息队列。RabbitMQ 节点可以动态的加入到集群中。
当在集群中声明队列、交换器、绑定的时候,这些操作会直到所有集群节点都成功提交元数据变更后才返回。集群中有内存节点和磁盘节点两种类型,内存节点虽然不写入磁盘,但是它的执行比磁盘节点要好。内存节点可以提供出色的性能,磁盘节点能保障配置信息在节点重启后仍然可用,那集群中如何平衡这两者呢?
RabbitMQ 只要求集群中至少有一个磁盘节点,所有其他节点可以是内存节点,当节点加入火离开集群时,它们必须要将该变更通知到至少一个磁盘节点。如果只有一个磁盘节点,刚好又是该节点崩溃了,那么集群可以继续路由消息,但不能创建队列、创建交换器、创建绑定、添加用户、更改权限、添加或删除集群节点。换句话说集群中的唯一磁盘节点崩溃的话,集群仍然可以运行,但知道该节点恢复,否则无法更改任何东西。
RabbitMQ 集群配置和启动 如果是在一台机器上同时启动多个 RabbitMQ 节点来组建集群的话,只用上面介绍的方式启动第二、第三个节点将会因为节点名称和端口冲突导致启动失败。所以在每次调用 rabbitmq-server 命令前,设置环境变量 RABBITMQ\_NODENAME 和 RABBITMQ\_NODE\_PORT 来明确指定唯一的节点名称和端口。下面的例子端口号从5672开始,每个新启动的节点都加1,节点也分别命名为test\_rabbit\_1、test\_rabbit\_2、test\_rabbit\_3。
启动第1个节点:
~~~
RABBITMQ_NODENAME=test_rabbit_1 RABBITMQ_NODE_PORT=5672 ./sbin/rabbitmq-server -detached
~~~
启动第2个节点:
~~~
RABBITMQ_NODENAME=test_rabbit_2 RABBITMQ_NODE_PORT=5673 ./sbin/rabbitmq-server -detached
~~~
启动第2个节点前建议将 RabbitMQ 默认激活的插件关掉,否则会存在使用了某个插件的端口号冲突,导致节点启动不成功。
现在第2个节点和第1个节点都是独立节点,它们并不知道其他节点的存在。集群中除第一个节点外后加入的节点需要获取集群中的元数据,所以要先停止 Erlang 节点上运行的 RabbitMQ 应用程序,并重置该节点元数据,再加入并且获取集群的元数据,最后重新启动 RabbitMQ 应用程序。
停止第2个节点的应用程序:
~~~
./sbin/rabbitmqctl -n test_rabbit_2 stop_app
~~~
重置第2个节点元数据:
~~~
./sbin/rabbitmqctl -n test_rabbit_2 reset
~~~
第2节点加入第1个节点组成的集群:
~~~
./sbin/rabbitmqctl -n test_rabbit_2 join_cluster test_rabbit_1@localhost
~~~
启动第2个节点的应用程序
~~~
./sbin/rabbitmqctl -n test_rabbit_2 start_app
~~~
第3个节点的配置过程和第2个节点类似:
~~~
RABBITMQ_NODENAME=test_rabbit_3 RABBITMQ_NODE_PORT=5674 ./sbin/rabbitmq-server -detached
./sbin/rabbitmqctl -n test_rabbit_3 stop_app
./sbin/rabbitmqctl -n test_rabbit_3 reset
./sbin/rabbitmqctl -n test_rabbit_3 join_cluster test_rabbit_1@localhost
./sbin/rabbitmqctl -n test_rabbit_3 start_app
~~~
RabbitMQ 集群运维 停止某个指定的节点,比如停止第2个节点:
~~~
RABBITMQ_NODENAME=test_rabbit_2 ./sbin/rabbitmqctl stop
~~~
查看节点3的集群状态:
~~~
./sbin/rabbitmqctl -n test_rabbit_3 cluster_status
~~~
第八章 统一配置中心Config
最后更新于:2022-03-31 23:46:11
第七章 服务间通信Feigh
最后更新于:2022-03-31 23:46:09
第六章 负载均衡Ribbon
最后更新于:2022-03-31 23:46:07
第五章 服务注册和发现Eureka
最后更新于:2022-03-31 23:46:04
# 服务提供者和消费者
概念:
* 服务提供者:服务的被调用方,即:为其他服务提供服务的服务
* 服务消费者:服务的调用方,即:依赖其他服务的服务
服务发现组件:Eureka
# Eureka 介绍
## 什么是 Eureka
![请输入图片描述](http://172.17.66.44:8028/upload/2018/11/uoqnp1ce7uhs3oj1i1meb1gq5c.png)
[Eureka Wiki](https://github.com/Netflix/eureka/wiki/Eureka-at-a-glance)
官方的介绍在这里Eureka Wiki。 Eureka 是 Netflix 开源的一个 Restful 服务,主要用于服务的注册发现。Eureka 由两个组件组成: Eureka 服务器和 Eureka 客户端。 Eureka 服务器用作服务注册服务器。 Eureka 客户端是一个 Java 客户端,用来简化与服务器的交互、作为轮询负载均衡器,并提供服务的故障切换。 Netflix 在其生产环境中使用的是另外的客户端,它提供基于流量、资源利用率以及出错状态的加权负载均衡。
* 开源: 大家可以对实现一探究竟,甚至修改源代码。
* 可靠: 经过 Netflix 多年的**生产环境**考验,使用应该比较靠谱处心。
* 功能齐全: 不但提供了完整的注册发现服务,还有 Ribbon 等可以配合使用服务。
* 基于 Java: 对于 Java 程序员来说,使用起来,心里比较有底。
* Spring Cloud**可以使用 Spring Cloud,与 Eureka 进行了很好的集成**,使用起来非常方便。
## Eureka 架构:
Netflix 主要是在 AWS 中使用 Eureka 的,虽然同时也支持本地环境,但是了解 AWS 的一些基础概念对于理解 Eureka 的设计非常有帮助。
## 区域与可用区
首先,我们先熟悉两个概念:
* 区域(Region): AWS 云服务在全球不同的地方都有数据中心,比如北美、南美、欧洲和亚洲等。与此对应,根据地理位置我们把某个地区的基础设施服务集合称为一个区域。通过 AWS 的区域,一方面可以使得 AWS云服务在地理位置上更加靠近我们的用户,另一方面使得用户可以选择不同的区域存储他们的数据以满足法规遵循方面的要求。美东(北佛吉尼亚)、美西(俄勒冈)、美西(北加利佛尼亚)、欧洲(爱尔兰)、亚太(新加坡)、亚太(东京)等。每个区域都有自己对应的编码,如: 区域 编码
亚太(东京) ap-northeast-1 亚太(新加坡) ap-southeast-1 亚太(悉尼) ap-southeast-2 欧洲(爱尔兰) eu-west-1 南美(圣保罗) sa-east-1 美东(北佛杰尼亚) us-east-1 美西(北加利佛尼亚) us-west-1 美西(俄勒冈) us-west-2
* 可用区(Zone): AWS 的每个区域一般由多个可用区(AZ)组成,而一个可用区一般是由多个数据中心组成。AWS引入可用区设计主要是为了提升用户应用程序的高可用性。因为可用区与可用区之间在设计上是相互独立的,也就是说它们会有独立的供电、独立的网络等,这样假如一个可用区出现问题时也不会影响另外的可用区。在一个区域内,可用区与可用区之间是通过高速网络连接,从而保证有很低的延时。 每次当用户需要使用 EC2 相关资源的时候,他需要首先选择目标区域,如美东(北佛杰尼亚)us-east-1。然后在创建 EC2 产例的时候,用户可以选择实例所在的可用区,比如可以是 us-east-1a 或 us-east-1b 等。可用区的编码就是区域后面添加不同的英文字母。
## Eureka 架构说明
下图是 Eureka Wiki 中提供的架构图:![请输入图片描述](https://upload-images.jianshu.io/upload_images/4673-e73e12b7d2033cc8.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/720/format/webp)从上面的架构图可以看出,主要有三种角色:
Eureka Server \*\*\* 通过 Register, Get,Renew 等 接口提供注册和发现
Application Service (Service Provider): \*\*\* 服务提供方 \*\*\* 把自身服务实例注册到 Eureka Server
Application Client (Service Consumer): \*\*\* 服务调用方 \*\*\* 通过 Eureka Server 获取服务实例,并调用 Application Service
他们主要进行的活动如下:
每个 Region 有一个 Eureka Cluster, Region 中的每个 Zone 都至少有一个 Eureka Server。 Service 作为一个 Eureka Client,通过 register 注册到 Eureka Server,并且通过发送心跳的方式更新租约(renew leases)。如果 Eureka Client 到期没有更新租约,那么过一段时间后,Eureka Server 就会移除该 Service 实例。 当一个 Eureka Server 的数据改变以后,会把自己的数据同步到其他 Eureka Server。 Application Client 也作为一个 Eureka Client 通过 Get 接口从 Eureka Server 中获取 Service 实例信息,然后直接调用 Service 实例。 Application Client 调用 Service 实例时,可以跨可用区调用。
Eureka还提供了客户端缓存机制,即使所有的Eureka Server都挂掉了,客户端依然可以利用缓存中的信息消费其他服务的API。
简单来说,Eureka通过心跳检查,健康检查,客户端缓存机制,确保了系统的高可用性,灵活性和伸缩性。
## Eureka Demo
实际工作中,我们很少会直接使用 Eureka,因为 Spring Cloud 已经把 Eureka 与 Spring Boot 进行了集成,使用起来更为简单,所以我们使用 Spring Cloud 作为示例。
这里是官方提供的一个示例:spring-cloud-eureka-example
* 启动 Eureka Server
Eureka Server 非常简单,只需要三个步骤:
在 pom.xml 中添加依赖:
~~~
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka-server</artifactId>
</dependency>
~~~
实现 Application,添加 annotation。 @EnableEurekaServer、@EnableDiscoveryClient 执行 main 方法启动 Eureka Server。
~~~
@SpringBootApplication
@EnableEurekaServer
@EnableDiscoveryClient
public class Application {
public static void main(String[] args) throws Exception {
SpringApplication.run(Application.class, args);
}
}
~~~
运行 Application 即可启动 Server,启动 Server 后打开 http://localhost:8761/,可以看到信息页面。![请输入图片描述](http://172.17.66.44:8028/upload/2018/11/4f4qqmbh6ajukqsbft8av5f5v1.png)
* 注册服务
把一个服务注册在 server 中需要以下几个步骤:
添加 eureka 依赖
org.springframework.cloudspring-cloud-starter-eureka
添加 @EnableEurekaClient 注解
~~~
@EnableEurekaClient
public class Application
~~~
3. 在 application.yml 或者 application.properties 中添加配置
~~~
eureka:
client:
service-url:
defaultZone: http://172.17.66.44:8761/eureka/
spring:
application:
name: client
~~~
配置中有两项需要额外注意:
1. eureka.client.serviceUrl.defaultZone:指定 Eureka 服务端的地址,当客户端没有专门进行配置时,就会使用这个默认地址。
2. spring.application.name:服务注册所使用的名称,同时其他服务查找该服务时也使用该名称。我们启动该服务后,可以在管理页面中查看到该服务已经在注册中心中注册成功了。
第四章 服务框架Spring Boot
最后更新于:2022-03-31 23:46:02
# 4.1 Spring Boot简介
[TOC]
Spring Boot是由Pivotal团队提供的全新框架,其设计目的是用来简化新Spring应用的初始搭建以及开发过程。该框架使用了特定的方式来进行配置,从而使开发人员不再需要定义样板化的配置。通过这种方式,Spring Boot致力于在蓬勃发展的快速应用开发领域(rapid application development)成为领导者。
第三章 环境搭建
最后更新于:2022-03-31 23:46:00
第二章 Spring Cloud简介
最后更新于:2022-03-31 23:45:58
第一章 微服务简介
最后更新于:2022-03-31 23:45:55
# 1.1单体架构及其存在的不足
# 1.2.微服务