在调研broker的具体实现时,发现之前项目中的消息发送,是使用的Client(publish)/Server(subscribe)的方式,并没有直接使用broker,于是决定调研下这两者是什么关系。
Broker
broker是go-micro自身定义的异步Pub/Sub interface, 不同的机制(kafka、mqtt、nats…)最终只需要实现对应的接口,即可支持go-micro的异步消息发布/订阅。
| 1 | type Broker interface { | 
Event
event是go-micro基于broker的interface封装的一个基于protobuf的消息发送/订阅模块, 即最终还是依赖broker的实现(go-micro默认提供一个点对点http代理),所以只需要使用plugin的方式,修改了broker的实现, event即可应用。
- Event只定义了Publish接口 ( - micro.go)- 1 
 2
 3
 4
 5
 6
 7
 8- // Event is used to publish messages to a topic 
 type Event interface {
 // Publish publishes a message to the event topic
 Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error
 }
 // Type alias to satisfy the deprecation
 type Publisher = Event
- Client中Publish ( - client/client.go)- 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13- // Client is the interface used to make requests to services. 
 // It supports Request/Response via Transport and Publishing via the Broker.
 // It also supports bidirectional streaming of requests.
 type Client interface {
 Init(...Option) error
 Options() Options
 NewMessage(topic string, msg interface{}, opts ...MessageOption) Message
 NewRequest(service, endpoint string, req interface{}, reqOpts ...RequestOption) Request
 Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
 Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error)
 Publish(ctx context.Context, msg Message, opts ...PublishOption) error
 String() string
 }
- Server中Subscribe ( - server/server.go)- 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12- // Server is a simple micro server abstraction 
 type Server interface {
 Options() Options
 Init(...Option) error
 Handle(Handler) error
 NewHandler(interface{}, ...HandlerOption) Handler
 NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber
 Subscribe(Subscriber) error
 Start() error
 Stop() error
 String() string
 }
- micro中RegisterSubscriber ( - micro.go)- 1 
 2
 3
 4- // RegisterSubscriber is syntactic sugar for registering a subscriber 
 func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error {
 return s.Subscribe(s.NewSubscriber(topic, h, opts...))
 }
- grpc实现的Server中Subscribe ( - server/grpc/grpc.go)- 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20- func newGRPCServer(opts ...server.Option) server.Server { 
 options := newOptions(opts...)
 // create a grpc server
 srv := &grpcServer{
 opts: options,
 rpc: &rServer{
 serviceMap: make(map[string]*service),
 },
 handlers: make(map[string]server.Handler),
 subscribers: make(map[*subscriber][]broker.Subscriber),
 exit: make(chan chan error),
 wg: wait(options.Context),
 }
 // configure the grpc server
 srv.configure()
 return srv
 }
异同
基于以上的分析, 直接使用Broker实现的Publish/Subscribe和使用go-micro中封装的Event实现的Publish/Subscribe本质是相同的,但是在使用的时候还是有一点差异:
- Event的Body可以使用proto定义的message,Broker的body只能是[]byte
- Event的Header需要通过context传递到底层, Broker直接设置header
- 同一个topic使用两种方式都可以接受到,但是通过Broker直接发布的消息, Event订阅接受后,会提示序列化错误
