8.9 从0到1:全面理解RPC远程调用 =============================== |image0| 什么是RPC呢? 百度百科给出的解释是这样的:“RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议”。这个概念听起来还是比较抽象,没关系,继续往后看,后面概念性的东西,我会讲得足够清楚,让你完全掌握 RPC 的基础内容。在后面的篇章中还会结合其在 OpenStack 中实际应用,一步一步揭开 rpc 的神秘面纱。 有的读者,可能会问,为啥我举的例子老是 OpenStack 里的东西呢? 因为每个人的业务中接触的框架都不一样(我主要接触的就是 OpenStack 框架),我无法为每个人去定制写一篇文章,但其技术原理都是一样的。即使如此,我也会尽力将文章写得通用,不会因为你没接触过 OpenStack 而成为你理解 rpc 的瓶颈。 8.9.1 既 REST,何 RPC ? ------------------------ 在 OpenStack 里的进程间通信方式主要有两种,一种是基于HTTP协议的RESTFul API方式,另一种则是RPC调用。 那么这两种方式在应用场景上有何区别呢? 有使用经验的人,就会知道: - 前者(RESTful)主要用于\ **各组件之间**\ 的通信(如nova与glance的通信),或者说用于组件对外提供调用接口 - 而后者(RPC)则用于\ **同一组件中各个不同模块之间**\ 的通信(如nova组件中nova-compute与nova-scheduler的通信)。 关于OpenStack中基于RESTful API的通信方式主要是应用了WSGI,这个知识点,我在前一篇文章中,有深入地讲解过,你可以点击查看。 对于不熟悉 OpenStack 的人,也别担心听不懂,这样吧,我给你提两个问题: 1. RPC 和 REST 区别是什么? 2. 为什么要采用RPC呢? **第一个问题:RPC 和 REST 区别是什么?** 你一定会觉得这个问题很奇怪,是的,包括我,但是你在网络上一搜,会发现类似对比的文章比比皆是,我在想可能很多初学者由于基础不牢固,才会将不相干的二者拿出来对比吧。既然是这样,那为了让你更加了解陌生的RPC,就从你熟悉得不能再熟悉的 REST 入手吧。 **01、所属类别不同** REST,是Representational State Transfer 的简写,中文描述表述性状态传递(是指某个瞬间状态的资源数据的快照,包括资源数据的内容、表述格式(XML、JSON)等信息。) REST 是一种软件架构风格。 这种风格的典型应用,就是HTTP。其因为简单、扩展性强的特点而广受开发者的青睐。 而RPC 呢,是 Remote Procedure Call Protocol 的简写,中文描述是远程过程调用,它可以实现客户端像调用本地服务(方法)一样调用服务器的服务(方法)。 RPC 是一种基于 TCP 的通信协议,按理说它和REST不是一个层面上的东西,不应该放在一起讨论,但是谁让REST这么流行呢,它是目前最流行的一套互联网应用程序的API设计标准,某种意义下,我们说 REST 可以其实就是指代 HTTP 协议。 **02、使用方式不同** **从使用上来看**\ ,HTTP 接口只关注服务提供方,对于客户端怎么调用并不关心。接口只要保证有客户端调用时,返回对应的数据就行了。而RPC则要求客户端接口保持和服务端的一致。 - REST 是服务端把方法写好,客户端并不知道具体方法。客户端只想获取资源,所以发起HTTP请求,而服务端接收到请求后根据URI经过一系列的路由才定位到方法上面去 - RPC是服务端提供好方法给客户端调用,客户端需要知道服务端的具体类,具体方法,然后像调用本地方法一样直接调用它。 **03、面向对象不同** 从设计上来看,RPC,所谓的远程过程调用 ,是面向方法的 ,REST:所谓的 Representational state transfer ,是面向资源的,除此之外,还有一种叫做 SOA,所谓的面向服务的架构,它是面向消息的,这个接触不多,就不多说了。 **04、序列化协议不同** 接口调用通常包含两个部分,序列化和通信协议。 通信协议,上面已经提及了,REST 是 基于 HTTP 协议,而 RPC 可以基于 TCP/UDP,也可以基于 HTTP 协议进行传输的。 常见的序列化协议,有:json、xml、hession、protobuf、thrift、text、bytes等,REST 通常使用的是 JSON或者XML,而 RPC 使用的是 JSON-RPC,或者 XML-RPC。 通过以上几点,我们知道了 REST 和 RPC 之间有很明显的差异。 **第二个问题:为什么要采用RPC呢?** 那到底为何要使用 RPC,单纯的依靠RESTful API不可以吗?为什么要搞这么多复杂的协议,渣渣表示真的学不过来了。 关于这一点,以下几点仅是我的个人猜想,仅供交流哈: 1. RPC 和 REST 两者的定位不同,REST 面向资源,更注重接口的规范,因为要保证通用性更强,所以对外最好通过 REST。而 RPC 面向方法,主要用于函数方法的调用,可以适合更复杂通信需求的场景。 2. RESTful API客户端与服务端之间采用的是同步机制,当发送HTTP请求时,客户端需要等待服务端的响应。当然对于这一点是可以通过一些技术来实现异步的机制的。 3. 采用RESTful API,客户端与服务端之间虽然可以独立开发,但还是存在耦合。比如,客户端在发送请求的时,必须知道服务器的地址,且必须保证服务器正常工作。而 rpc + rabbitmq中间件可以实现低耦合的分布式集群架构。 说了这么多,我们该如何选择这两者呢?我总结了如下两点,供你参考: - REST 接口更加规范,通用适配性要求高,建议对外的接口都统一成 REST。而组件内部的各个模块,可以选择 RPC,一个是不用耗费太多精力去开发和维护多套的HTTP接口,一个RPC的调用性能更高(见下条) - 从性能角度看,由于HTTP本身提供了丰富的状态功能与扩展功能,但也正由于HTTP提供的功能过多,导致在网络传输时,需要携带的信息更多,从性能角度上讲,较为低效。而RPC服务网络传输上仅传输与业务内容相关的数据,传输数据更小,性能更高。 8.9.2 实现远程调用的三种方式 ---------------------------- “远程调用”意思就是:被调用方法的具体实现不在程序运行本地,而是在别的某个地方(分布到各个服务器),调用者只想要函数运算的结果,却不需要实现函数的具体细节。 **01、基于 xml-rpc** Python实现 rpc,可以使用标准库里的 SimpleXMLRPCServer,它是基于XML-RPC 协议的。 有了这个模块,开启一个 rpc server,就变得相当简单了。执行以下代码: .. code:: python import SimpleXMLRPCServer class calculate: def add(self, x, y): return x + y def multiply(self, x, y): return x * y def subtract(self, x, y): return abs(x-y) def divide(self, x, y): return x/y obj = calculate() server = SimpleXMLRPCServer.SimpleXMLRPCServer(("localhost", 8088)) # 将实例注册给rpc server server.register_instance(obj) print "Listening on port 8088" server.serve_forever() 有了 rpc server,接下来就是 rpc client,由于我们上面使用的是 XML-RPC,所以 rpc clinet 需要使用xmlrpclib 这个库。 .. code:: python import xmlrpclib server = xmlrpclib.ServerProxy("http://localhost:8088") 然后,我们通过 server_proxy 对象就可以远程调用之前的rpc server的函数了。 .. code:: python >> server.add(2, 3) 5 >>> server.multiply(2, 3) 6 >>> server.subtract(2, 3) 1 >>> server.divide(2, 3) 0 SimpleXMLRPCServer是一个单线程的服务器。这意味着,如果几个客户端同时发出多个请求,其它的请求就必须等待第一个请求完成以后才能继续。 若非要使用 SimpleXMLRPCServer 实现多线程并发,其实也不难。只要将代码改成如下即可。 .. code:: python from SimpleXMLRPCServer import SimpleXMLRPCServer from SocketServer import ThreadingMixIn class ThreadXMLRPCServer(ThreadingMixIn, SimpleXMLRPCServer):pass class MyObject: def hello(self): return "hello xmlprc" obj = MyObject() server = ThreadXMLRPCServer(("localhost", 8088), allow_none=True) server.register_instance(obj) print "Listening on port 8088" server.serve_forever() **02、基于json-rpc** SimpleXMLRPCServer 是基于 xml-rpc 实现的远程调用,上面我们也提到 除了 xml-rpc 之外,还有 json-rpc 协议。 那 python 如何实现基于 json-rpc 协议呢? 答案是很多,很多web框架其自身都自己实现了json-rpc,但我们要独立这些框架之外,要寻求一种较为干净的解决方案,我查找到的选择有两种 第一种是 ``jsonrpclib`` .. code:: shell pip install jsonrpclib -i https://pypi.douban.com/simple 第二种是 ``python-jsonrpc`` .. code:: shell pip install python-jsonrpc -i https://pypi.douban.com/simple 先来看第一种 `jsonrpclib `__ 它与 Python 标准库的 SimpleXMLRPCServer 很类似(因为它的类名就叫做 SimpleJSONRPCServer ,不明真相的人真以为它们是亲兄弟)。或许可以说,jsonrpclib 就是仿照 SimpleXMLRPCServer 标准库来进行编写的。 它的导入与 SimpleXMLRPCServer 略有不同,因为SimpleJSONRPCServer分布在jsonrpclib库中。 服务端 .. code:: python from jsonrpclib.SimpleJSONRPCServer import SimpleJSONRPCServer server = SimpleJSONRPCServer(('localhost', 8080)) server.register_function(lambda x,y: x+y, 'add') server.serve_forever() 客户端 .. code:: python import jsonrpclib server = jsonrpclib.Server("http://localhost:8080") |image1| 再来看第二种python-jsonrpc,写起来貌似有些复杂。 服务端 .. code:: python import pyjsonrpc class RequestHandler(pyjsonrpc.HttpRequestHandler): @pyjsonrpc.rpcmethod def add(self, a, b): """Test method""" return a + b http_server = pyjsonrpc.ThreadingHttpServer( server_address=('localhost', 8080), RequestHandlerClass=RequestHandler ) print "Starting HTTP server ..." print "URL: http://localhost:8080" http_server.serve_forever() 客户端 .. code:: python import pyjsonrpc http_client = pyjsonrpc.HttpClient( url="http://localhost:8080/jsonrpc" ) |image2| 还记得上面我提到过的 zabbix API,因为我有接触过,所以也拎出来讲讲。zabbix API 也是基于 json-rpc 2.0协议实现的。 因为内容较多,这里只带大家打个,zabbix 是如何调用的:直接指明要调用 zabbix server 的哪个方法,要传给这个方法的参数有哪些。 |image3| **03、基于 zerorpc** 以上介绍的两种rpc远程调用方式,如果你足够细心,可以发现他们都是http+rpc 两种协议结合实现的。 接下来,我们要介绍的这种(\ `zerorpc `__\ ),就不再使用走 http 了。 `zerorpc `__ 这个第三方库,它是基于TCP协议、 ZeroMQ 和 MessagePack的,速度相对快,响应时间短,并发高。zerorpc 和 pyjsonrpc 一样,需要额外安装,虽然SimpleXMLRPCServer不需要额外安装,但是SimpleXMLRPCServer性能相对差一些。 .. code:: shell pip install zerorpc -i https://pypi.douban.com/simple 服务端代码 .. code:: python import zerorpc class caculate(object): def hello(self, name): return 'hello, {}'.format(name) def add(self, x, y): return x + y def multiply(self, x, y): return x * y def subtract(self, x, y): return abs(x-y) def divide(self, x, y): return x/y s = zerorpc.Server(caculate()) s.bind("tcp://0.0.0.0:4242") s.run() 客户端 .. code:: python import zerorpc c = zerorpc.Client() c.connect("tcp://127.0.0.1:4242") |image4| 客户端除了可以使用zerorpc框架实现代码调用之外,它还支持使用“命令行”的方式调用。 |image5| 客户端可以使用命令行,那服务端是不是也可以呢? 是的,通过 Github 上的文档几个 demo 可以体验到这个第三方库做真的是优秀。 比如我们可以用下面这个命令,创建一个rpc server,后面这个 ``time`` Python 标准库中的 time 模块,zerorpc 会将 time 注册绑定以供client调用。 .. code:: shell zerorpc --server --bind tcp://127.0.0.1:1234 time 在客户端,就可以用这条命令来远程调用这个 time 函数。 .. code:: shell zerorpc --client --connect tcp://127.0.0.1:1234 strftime %Y/%m/%d |image6| 8.9.3 往rpc中引入消息中间件 --------------------------- 经过了上面的学习,我们已经学会了如何使用多种方式实现rpc远程调用。 通过对比,zerorpc 可以说是脱颖而出,一支独秀。 但为何在 OpenStack 中,rpc client 不直接 rpc 调用 rpc server ,而是先把 rpc 调用请求发给 RabbitMQ ,再由订阅者(rpc server)来取消息,最终实现远程调用呢? 为此,我也做了一番思考: OpenStack 组件繁多,在一个较大的集群内部每个组件内部通过rpc通信频繁,如果都采用rpc直连调用的方式,连接数会非常地多,开销大,若有些 server 是单线程的模式,超时会非常的严重。 OpenStack 是复杂的分布式集群架构,会有多个 rpc server 同时工作,假设有 server01,server02,server03 三个server,当 rpc client 要发出rpc请求时,发给哪个好呢?这是问题一。 你可能会说轮循或者随机,这样对大家都公平。这样的话还会引出另一个问题,倘若请求刚好发到server01,而server01刚好不凑巧,可能由于机器或者其他因为导致服务没在工作,那这个rpc消息可就直接失败了呀。要知道做为一个集群,高可用是基本要求,如果出现刚刚那样的情况其实是很尴尬的。这是问题二。 集群有可能根据实际需要扩充节点数量,如果使用直接调用,耦合度太高,不利于部署和生产。这是问题三。 引入消息中间件,可以很好的解决这些问题。 **解决问题一**\ :消息只有一份,接收者由AMQP的负载算法决定,默认为在所有Receiver中均匀发送(round robin)。 **解决问题二**\ :有了消息中间件做缓冲站,client 可以任性随意的发,server 都挂掉了?没有关系,等 server 正常工作后,自己来消息中间件取就行了。 **解决问题三**\ :无论有多少节点,它们只要认识消息中间件这一个中介就足够了。 8.9.4 消息队列你应该知道什么? ------------------------------ 由于后面,我将实例讲解 OpenStack 中如何将 rpc 和 mq broker 结合使用。 而在此之前,你必须对消息队列的一些基本知识有个概念。 首先,RPC只是定义了一个通信接口,其底层的实现可以各不相同,可以是 socket,也可以是今天要讲的 AMQP。 AMQP(Advanced Message Queuing Protocol)是一种基于队列的可靠消息服务协议,作为一种通信协议,AMQP同样存在多个实现,如Apache Qpid,RabbitMQ等。 以下是 AMQP 中的几个必知的概念: - Publisher:消息发布者 - Receiver:消息接收者,在RabbitMQ中叫订阅者:Subscriber。 - Queue:用来保存消息的存储空间,消息没有被receiver前,保存在队列中。 - Exchange:用来接收Publisher发出的消息,根据Routing key 转发消息到对应的Message Queue中,至于转到哪个队列里,这个路由算法又由exchange type决定的。 exchange type:主要四种描述exchange的类型。 direct:消息路由到满足此条件的队列中(queue,可以有多个): routing key = binding key topic:消息路由到满足此条件的队列中(queue,可以有多个):routing key 匹配 binding pattern. binding pattern是类似正则表达式的字符串,可以满足复杂的路由条件。 fanout:消息路由到多有绑定到该exchange的队列中。 - binding :binding是用来描述exchange和queue之间的关系的概念,一个exchang可以绑定多个队列,这些关系由binding建立。前面说的binding key /binding pattern也是在binding中给出。 为了让你明白这几者的关系,我画了一张模型图。 |image7| 关于AMQP,有几下几点值得注意: 1. 每个receiver/subscriber 在接收消息前都需要创建binding。 2. 一个队列可以有多个receiver,队列里的一个消息只能发给一个receiver。 3. 一个消息可以被发送到一个队列中,也可以被发送到多个多列中。多队列情况下,一个消息可以被多个receiver收到并处理。Openstack RPC中这两种情况都会用到。 在 Python 中如何发送消息,并接收消息呢? 这边写个demo 首先是生产者 .. code:: python # coding:utf-8 import sys,time import pika credentials = pika.PlainCredentials('account', 'password') def productor(): connection = pika.BlockingConnection( pika.ConnectionParameters('ctrl.openstack.com',5672,'/',credentials))#建立一个最基本的socket chanel = connection.channel()#声明一个管道 chanel.queue_declare(queue='name')#给管道创建一个队列,参数是管道队列名。 chanel.basic_publish(exchange='', routing_key='name', body ='Hello World!')#要发送的消息。 connection.close() 再者是消息费 .. code:: python # coding:utf-8 import sys,time import pika credentials = pika.PlainCredentials('account', 'password') def consumer(): consumer = pika.BlockingConnection\ (pika.ConnectionParameters('ctrl.openstack.com',5672,'/',credentials))#创建socket连接 channel = consumer.channel()#创建管道 channel.queue_declare(queue='name') def backcall(ch,method,properties,body):#参数body是发送过来的消息。 print '已接收到消息!' sys.exit() #backcall 回调函数 执行结束后立即执行另外一个函数返回给发送端是否执行完毕。 #no_ack=True 不会告知服务端我是否收到消息。一般注释。 channel.basic_consume(backcall, queue='name', no_ack=True )#如果注释掉,对方没有收到消息的话不会将消息丢失,始终在队列里等待下次发送。 channel.start_consuming()#启动后进入死循环。一直等待消息。 8.9.5 OpenStack中如何使用RPC? ------------------------------ 前面铺垫了那么久,终于到了讲真实应用的场景。在生产中RPC是如何应用的呢? 其他模型我不太清楚,在 OpenStack 中的应用模型是这样的 |image8| 至于为什么要如此设计,前面我已经给出了自己的观点。 接下来,就是源码解读 OpenStack ,看看其是如何通过rpc进行远程调用的。如若你对此没有兴趣(我知道很多人对此都没有兴趣,所以不浪费大家时间),可以直接跳过这一节,进入下一节。 目前Openstack中有两种RPC实现,一种是在oslo messaging,一种是在openstack.common.rpc。 openstack.common.rpc是旧的实现,oslo messaging是对openstack.common.rpc的重构。openstack.common.rpc在每个项目中都存在一份拷贝,oslo messaging即将这些公共代码抽取出来,形成一个新的项目。oslo messaging也对RPC API 进行了重新设计,对多种 transport 做了进一步封装,底层也是用到了kombu这个AMQP库。(注:Kombu 是Python中的messaging库。Kombu旨在通过为AMQ协议提供惯用的高级接口,使Python中的消息传递尽可能简单,并为常见的消息传递问题提供经过验证和测试的解决方案。) 关于oslo_messaging库,主要提供了两种独立的API: 1. oslo.messaging.rpc(实现了客户端-服务器远程过程调用) 2. oslo.messaging.notify(实现了事件的通知机制) 因为 notify 实现是太简单了,所以这里我就不多说了,如果有人想要看这方面内容,可以收藏我的博客(http://iswbm.com) ,我会更新补充 notify 的内容。 OpenStack RPC 模块提供了 rpc.call,rpc.cast, rpc.fanout_cast 三种 RPC 调用方法,发送和接收 RPC 请求。 - rpc.call 发送 RPC **同步请求**\ 并返回请求处理结果。 - rpc.cast 发送 RPC **异步请求**\ ,与 rpc.call 不同之处在于,不需要请求处理结果的返回。 - rpc.fanout_cast 用于发送 RPC 广播信息无返回结果 rpc.call 和 rpc.cast 从实现代码上看,他们的区别很小,就是call调用时候会带有wait_for_reply=True参数,而cast不带。 要了解 rpc 的调用机制呢,首先要知道 oslo_messaging 的几个概念 - transport:RPC功能的底层实现方法,这里是rabbitmq的消息队列的访问路径 transport 就是定义你如何访连接消息中间件,比如你使用的是 Rabbitmq,那在 nova.conf中应该有一行\ ``transport_url``\ 的配置,可以很清楚地看出指定了 rabbitmq 为消息中间件,并配置了连接rabbitmq的user,passwd,主机,端口。 .. code:: python transport_url=rabbit://user:passwd@host:5672 |image9| .. code:: python def get_transport(conf, url=None, allowed_remote_exmods=None): return _get_transport(conf, url, allowed_remote_exmods, transport_cls=RPCTransport) - target:指定RPC topic交换机的匹配信息和绑定主机。 target用来表述 RPC 服务器监听topic,server名称和server监听的exchange,是否广播fanout。 .. code:: python class Target(object): def __init__(self, exchange=None, topic=None, namespace=None, version=None, server=None, fanout=None, legacy_namespaces=None): self.exchange = exchange self.topic = topic self.namespace = namespace self.version = version self.server = server self.fanout = fanout self.accepted_namespaces = [namespace] + (legacy_namespaces or []) rpc server 要获取消息,需要定义target,就像一个门牌号一样。 |image10| rpc client 要发送消息,也需要有target,说明消息要发到哪去。 |image11| - endpoints:是可供别人远程调用的对象 RPC服务器暴露出endpoint,每个 endpoint 包涵一系列的可被远程客户端通过 transport 调用的方法。直观理解,可以参考nova-conductor创建rpc server的代码,这边的endpoints就是 ``nova/manager.py:ConductorManager()``\ |image12| - dispatcher:分发器,这是 rpc server 才有的概念 |image13|\ 只有通过它 server 端才知道接收到的rpc请求,要交给谁处理,怎么处理? 在client端,是这样指定要调用哪个方法的。 |image14| 而在server端,是如何知道要执行这个方法的呢?这就是dispatcher 要干的事,它从 endpoint 里找到这个方法,然后执行,最后返回。 |image15| - Serializer:在 python 对象和message(notification) 之间数据做序列化或是反序列化的基类。 主要方法有四个: 1. deserialize_context(ctxt) :对字典变成 request contenxt. 2. deserialize_entity(ctxt, entity) :对entity做反序列化,其中ctxt是已经deserialize过的,entity是要处理的。 3. serialize_context(ctxt) :将Request context变成字典类型 4. serialize_entity(ctxt, entity) :对entity做序列化,其中ctxt是已经deserialize过的,entity是要处理的。 - executor:服务的运行方式,单线程或者多线程 每个notification listener都和一个executor绑定,来控制收到的notification如何分配。默认情况下,使用的是blocking executor(具体特性参加executor一节) .. code:: python oslo_messaging.get_notification_listener(transport, targets, endpoints, executor=’blocking’, serializer=None, allow_requeue=False, pool=None) rpc server 和rpc client 的四个重要方法 1. ``reset()``\ :Reset service. 2. ``start()``\ :该方法调用后,server开始poll,从transport中接收message,然后转发给dispatcher.该message处理过程一直进行,直到stop方法被调用。executor决定server的IO处理策略。可能会是用一个新进程、新协程来做poll操作,或是直接简单的在一个循环中注册一个回调。同样,executor也决定分配message的方式,是在一个新线程中dispatch或是….. \* 3. ``stop()``:当调用stop之后,新的message不会被处理。但是,server可能还在处理一些之前没有处理完的message,并且底层driver资源也还一直没有释放。 4. ``wait()``\ :在stop调用之后,可能还有message正在被处理,使用wait方法来阻塞当前进程,直到所有的message都处理完成。之后,底层的driver资源会释放。 8.9.6 模仿OpenStack写rpc调用 ---------------------------- 模仿是一种很高效的学习方法,我这里根据 OpenStack 的调用方式,抽取出核心内容,写成一个简单的 demo,有对 OpenStack 感兴趣的可以了解一下,\ **大部分人也可以直接跳过这章节**\ 。 以下代码不能直接运行,你还需要配置 rabbitmq 的连接方式,你可以写在配置文件中,通过 get_transport 从cfg.CONF 中读取,也可以直接将其写成url的格式做成参数,传给 get_transport 。 **简单的 rpc client** .. code:: python #coding=utf-8 import oslo_messaging from oslo_config import cfg # 创建 rpc client transport = oslo_messaging.get_transport(cfg.CONF, url="") target = oslo_messaging.Target(topic='test', version='2.0') client = oslo_messaging.RPCClient(transport, target) # rpc同步调用 client.call(ctxt, 'test', arg=arg) **简单的 rpc server** .. code:: python #coding=utf-8 from oslo_config import cfg import oslo_messaging import time # 定义endpoint类 class ServerControlEndpoint(object): target = oslo_messaging.Target(namespace='control', version='2.0') def __init__(self, server): self.server = server def stop(self, ctx): if self.server: self.server.stop() class TestEndpoint(object): def test(self, ctx, arg): return arg # 创建rpc server transport = oslo_messaging.get_transport(cfg.CONF, url="") target = oslo_messaging.Target(topic='test', server='server1') endpoints = [ ServerControlEndpoint(None), TestEndpoint(), ] server = oslo_messaging.get_rpc_server(transport, target,endpoints,executor='blocking') try: server.start() while True: time.sleep(1) except KeyboardInterrupt: print("Stopping server") server.stop() server.wait() 8.9.7 如何实现 rpc 事件通知 --------------------------- 说完了 rpc 调用,\ **再来了解它的事件通知机制**\ ,这个比较简单。 如果你不想用现成的 ``notification_event_types``\ ,而想新定义一个,可以这样做 首先在这里先定义合法的 ``notification_event_types``\ ,相当于添加白名单。 |image16| 然后在调用处,使用 ``rpc.get_notifier`` 来发送消息给ceilometer。 |image17| 继续查看 ``rpc.get_notifier`` 做了什么事?如何实现直接info 就能发送消息的。 |image18| 当你使用的event_types 不在白名单内,或者是异常信息。就会给打印warn日志 |image19| 在rabbit里查看队列,notification 是 topic |image20| 而 debug ,info 等是event priority |image21| 参考文章: - `OpenStack之RPC调用(一) `__ - `openstack oslo_messaging 译文 `__ - `模仿OpenStack写自己的RPC `__ - `python 64式: 第1式 编写rpc的call和cast `__ - `Openstack RPC 通信原理 `__ - `RPC、REST API深入理解 `__ - `分布式RPC框架性能大比拼 `__ - `ython中使用XMLRPC(入门) `__ - `(译) JSON-RPC 2.0 规范(中文版) `__ - `nova event机制分析 `__ - `RabbitMQ 三种Exchange `__ -------------- |image22| .. |image0| image:: http://image.iswbm.com/20200602135014.png .. |image1| image:: http://image.iswbm.com/20190623185008.png .. |image2| image:: http://image.iswbm.com/20190623165341.png .. |image3| image:: http://image.iswbm.com/20190623171138.png .. |image4| image:: http://image.iswbm.com/20190623155955.png .. |image5| image:: http://image.iswbm.com/20190623162725.png .. |image6| image:: http://image.iswbm.com/20190623191042.png .. |image7| image:: http://image.iswbm.com/20190630160025.png .. |image8| image:: http://image.iswbm.com/20190623201427.png .. |image9| image:: http://image.iswbm.com/20190526182125.png .. |image10| image:: http://image.iswbm.com/20190526184854.png .. |image11| image:: http://image.iswbm.com/20190526185217.png .. |image12| image:: http://image.iswbm.com/20190526221219.png .. |image13| image:: http://image.iswbm.com/20190526220809.png .. |image14| image:: http://image.iswbm.com/20190527220820.png .. |image15| image:: http://image.iswbm.com/20190527220012.png .. |image16| image:: http://image.iswbm.com/20190526172514.png .. |image17| image:: http://image.iswbm.com/20190526172725.png .. |image18| image:: http://image.iswbm.com/20190526173314.png .. |image19| image:: http://image.iswbm.com/20190526175100.png .. |image20| image:: http://image.iswbm.com/20190526180708.png .. |image21| image:: http://image.iswbm.com/20190526181433.png .. |image22| image:: http://image.iswbm.com/20200607174235.png