在调研broker的具体实现时,发现之前项目中的消息发送,是使用的Client(publish)/Server(subscribe)的方式,并没有直接使用broker,于是决定调研下这两者是什么关系。
Broker
broker是go-micro自身定义的异步Pub/Sub interface, 不同的机制(kafka、mqtt、nats…)最终只需要实现对应的接口,即可支持go-micro的异步消息发布/订阅。
1 | type Broker interface { |
Event
event是go-micro基于broker的interface封装的一个基于protobuf的消息发送/订阅模块, 即最终还是依赖broker的实现(go-micro默认提供一个点对点http代理),所以只需要使用plugin的方式,修改了broker的实现, event即可应用。
Event只定义了Publish接口 (
micro.go
)1
2
3
4
5
6
7
8// Event is used to publish messages to a topic
type Event interface {
// Publish publishes a message to the event topic
Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error
}
// Type alias to satisfy the deprecation
type Publisher = EventClient中Publish (
client/client.go
)1
2
3
4
5
6
7
8
9
10
11
12
13// Client is the interface used to make requests to services.
// It supports Request/Response via Transport and Publishing via the Broker.
// It also supports bidirectional streaming of requests.
type Client interface {
Init(...Option) error
Options() Options
NewMessage(topic string, msg interface{}, opts ...MessageOption) Message
NewRequest(service, endpoint string, req interface{}, reqOpts ...RequestOption) Request
Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error)
Publish(ctx context.Context, msg Message, opts ...PublishOption) error
String() string
}Server中Subscribe (
server/server.go
)1
2
3
4
5
6
7
8
9
10
11
12// Server is a simple micro server abstraction
type Server interface {
Options() Options
Init(...Option) error
Handle(Handler) error
NewHandler(interface{}, ...HandlerOption) Handler
NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber
Subscribe(Subscriber) error
Start() error
Stop() error
String() string
}micro中RegisterSubscriber (
micro.go
)1
2
3
4// RegisterSubscriber is syntactic sugar for registering a subscriber
func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error {
return s.Subscribe(s.NewSubscriber(topic, h, opts...))
}grpc实现的Server中Subscribe (
server/grpc/grpc.go
)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20func newGRPCServer(opts ...server.Option) server.Server {
options := newOptions(opts...)
// create a grpc server
srv := &grpcServer{
opts: options,
rpc: &rServer{
serviceMap: make(map[string]*service),
},
handlers: make(map[string]server.Handler),
subscribers: make(map[*subscriber][]broker.Subscriber),
exit: make(chan chan error),
wg: wait(options.Context),
}
// configure the grpc server
srv.configure()
return srv
}
异同
基于以上的分析, 直接使用Broker实现的Publish/Subscribe
和使用go-micro中封装的Event实现的Publish/Subscribe
本质是相同的,但是在使用的时候还是有一点差异:
- Event的Body可以使用proto定义的message,Broker的body只能是[]byte
- Event的Header需要通过context传递到底层, Broker直接设置header
- 同一个topic使用两种方式都可以接受到,但是通过Broker直接发布的消息, Event订阅接受后,会提示序列化错误