最近手上有点时间,打算继续了解下go-micro的发布订阅(消息),看了micro的examples “micro examples”)后,有个疑问,go-micro在提供发布订阅的插件Broker(以及几种实现)的同时,go-micro本身还实现了Publish(Client)以及Subscribe(Server)功能,于是翻了下源码,做个记录。
Broker
Broker是go-micro定义的一个异步消息的接口,同时使用插件的形式,可随意在不同的实现(http,nats,rabbitmq)之间无缝切换。
1 | // Broker is an interface used for asynchronous messaging. |
从上面的接口可以看出,使用Broker来完成发布订阅只需要以下几步:
- 初始化一个Broker(
Init
) - 连接Broker(
Connect
) - 使用准备好的Broker发布/订阅(
Publish/Subscribe
) - 关闭Broker(
Disconnect
)
go-micro中默认的broker实现
go-micro默认有基于http的Broker实现,可以直接使用。micro有给出具体的example “broker example”),具体看下source code中的实现。
下面是go-micro中broer.go中对DefaultBroker的相关code:
1 | var ( |
可以看到都是基于NewBroker()
返回的broker实例来做的公用方法封装,我们进一步看看。
1 | // NewBroker returns a new http broker |
这里是直接返回了一个http实现的broker(和上面提到的默认是基于http实现的匹配),继续跟newHttpBroker
。
这里这列出部分code,详细的可直接参考go-micro下的http.go
1 | h := &httpBroker{ |
这里的核心是new了一个httpBroker,做为Broker接口的实现,在具体的实现就不在这里说了,下来我们看看上面提到接口的实现。
Init
1 | func (h *httpBroker) Init(opts ...Option) error { |
从上面的code中可以看到,Init的作用就是初始化各种配置,如果Option参数有提供,就是用参数提供的,如果没有就在这里设置一个,这里有2个点我们需要额外关注下:
Registry
Registry是注册中心,如果option中没有提供registry,就会使用go-micro默认实现的(msdn)
TLSConfig
TLSConfig是针对https的配置,默认是http
Connect
1 | func (h *httpBroker) Connect() error { |
Connect方法的主要作用是创建一个Htto Server用来接收Publish时发送的消息
Subscribe
1 | func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { |
这部分代码的核心功能就是创建用于订阅的server,一个topic创建一个server并收集(注册)到httpSubscriber的svc列表中(发布消息时使用topic在subscriber的svc列表中查询到对应的server给他发送消息)。
Publish
1 | func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error { |
看过了上面的Subscribe
实现,这里的Publish
就比较简单
- 创建消息体并存储在inbox
- 根据topic以及broker的标签(这里是固定http)来查找订阅的server(在上面订阅模块创建的)
上面有可能会查找出多个node(订阅server),所以里面还有一个版本的机制,如果指定了版本就会给所有的匹配节点发送(默认是随机发送一个)
- 使用http post的方式(异步)把消息发送出去
Disconnect
1 | func (h *httpBroker) Disconnect() error { |
这部分功能很简单,清空缓存并发送退出的消息,同时停止服务
以上就是go-micro中默认基于http的broker实现。
go-micro中对于broker的包装
在看完broker的http默认实现后,我们对于broker有了一个大体了解,接下来我们在看下go-micro对于broker做的包装部分,应该是为了简化使用(确实只需要一步就可以)。
订阅RegisterSubscriber
:
1 | func main() { |
发布NewPublisher, Publish
:
1 | func main() { |
以上只是代码节选,具体使用方法可以参考example中的pubsub。
Subscriber
订阅对比直接用Broker只需要一步RegisterSubscriber
,我们看看里面实现
1 | //go-micro/micro.go |
上面的节选code可以看出,在默认server(rpcServer)中的router中定义了个map类型的变量subscribers
用来存储订阅的topic和对应处理的subscriber
,server在接收到消息后,只需要根据topic去map中找到subscriber,去处理即可。
subscriber中具体的处理,可以从定义中看出来,里面存储对应路由和响应的handler(server本身的功能),有兴趣可以在go-micro/server/subscriber.go看看具体代码实现。
Publisher
发布是在go-micro的默认client实现(rpc_client)里面定义了一个默认的broker(上面有分析过的http实现)
1 | //go-micro/micro.go |
这里可以看到实际上是使用传递进来的client来初始化一个event,并用来发送消息,如果传递的是空,默认创建一个client(rpcClient
)。
总结
经过以上过程的追踪,最终总结下来就几点:
- broker定义了接口,micro提供的插件的形式可无缝替换实现
- go-micro提供了一个默认的broker实现,是基于http
- go-micro的基于默认的server、client以及brkoer包装了一套更简单的pub和sub方法