April 2, 2020

消息推送架构-Based-GOIM

GOIM 是Go实现的消息推送的分布式服务,易于扩容伸缩,使用了bilibili/discovery来支持服务发现。

本文的重点,主要梳理了GOIM的架构,消息流转和消息处理。本文没有提到Comet的具体逻辑,套接字编程和RingBuffer等,但是Comet的复杂度远高于其他两个网元,因此强烈建议阅读Comet源码,应该会对Go网络编程有更多认识。

GOIM 是Go实现的消息推送的分布式服务,易于扩容伸缩,使用了bilibili/discovery来支持服务发现。相较于我之前用Socket.IO做的信令服务,优点在于更优雅的扩容,将连接层和逻辑层分离,职责更清晰。当然缺点也有(没有和具体实现解耦,如MQ的选型,导致不够灵活;客户端非全双工通信,TCP利用率偏低,这点并不全是缺点,好处是:消息流转清晰,职责非常明确),这部分可以自己做定制(最后的参考文献2中讲很多)。

架构图

总的来说,整个应用的架构如下

这里省略了其中用于服务发现的“bilibili/discovery”。在整个GOIM中用到服务发现的主要是gRPC和消息推送关系查找。

如上图:

  • Comet负责建立和维持客户端的长连接;
  • Job负责消息的分发;
  • Logic提供三种纬度的消息(全局,ROOM,用户)投递,还包括业务逻辑,Session管理。

消息流转

从上述的架构图中可以知道,消息是通过HTTP调用Logic产生的,然后用MQ来中转(存储,削峰);每个Job成员都从给队列中消费消息,投递给一个或者多个Comet,由Comet将消息发送给客户端。

生成消息

目前在Github上的GOIM版本,消息(除鉴权/心跳等基础数据包外)生成都是由Logic完成第一手处理,Logic提供了HTTP接口以支持消息发送能力,主要有三个纬度:用户,房间,全应用广播,如下:

curl -d 'mid message' http://api.goim.io:3111/goim/push/mids?operation=1000&mids=123
curl -d 'room message' http://api.goim.io:3111/goim/push/room?operation=1000&type=live&room=1000
curl -d 'broadcast message' http://api.goim.io:3111/goim/push/all?operation=1000

在Logic服务中会通过处理,将消息处理成附#消息格式#任务队列消息的格式,然后投递到MQ中。其中三种纬度的消息处理稍有不同:

用户

// goim/internal/logic/push.go
// mid => []PushMsg{op, server, keys, msg}
func (l *Logic) PushMids(c context.Context, op int32, mids []int64, msg []byte) (err error) {
	// 根据用户ID获取所有的 key:server 对应关系;在redis中是一个hash
	keyServers, _, err := l.dao.KeysByMids(c, mids) 
	// ...
	keys := make(map[string][]string)
	for key, server := range keyServers {
		// ...
		keys[server] = append(keys[server], key)
	}
	for server, keys := range keys {
		// 通过DAO组装PushMsg投递给MQ
		if err = l.dao.PushMsg(c, op, server, keys, msg); err != nil {
			return
		}
	}
	return
}

房间 没什么特别的处理

// goim/internal/logic/push.go
func (l *Logic) PushRoom(c context.Context, op int32, typ, room string, msg []byte) (err error) {
	return l.dao.BroadcastRoomMsg(c, op, model.EncodeRoomKey(typ, room), msg)
}

// // goim/internal/logic/dao
func (d *Dao) BroadcastRoomMsg(c context.Context, op int32, room string, msg []byte) (err error) {
	pushMsg := &pb.PushMsg{
		Type:      pb.PushMsg_ROOM,
		Operation: op,
		Room:      room,
		Msg:       msg,
	}
	b, err := proto.Marshal(pushMsg)
	// ...

	if err := d.nsqProducer.Publish(d.c.Nsq.Topic, b); err != nil {
		log.Errorf("PushMsg.send(push pushMsg:%v) error(%v)", pushMsg, err)
	}
	return
}

广播 没什么特别的处理

// goim/internal/logic/push.go
func (l *Logic) PushAll(c context.Context, op, speed int32, msg []byte) (err error) {
	return l.dao.BroadcastMsg(c, op, speed, msg)
}

// goim/internal/logic/dao
func (d *Dao) BroadcastMsg(c context.Context, op, speed int32, msg []byte) (err error) {
	pushMsg := &pb.PushMsg{
		Type:      pb.PushMsg_BROADCAST,
		Operation: op,
		Speed:     speed, // 这里需要去到Job才知道speed的具体功效
		Msg:       msg,
	}
	b, err := proto.Marshal(pushMsg)
	if err != nil {
		return
	}

	if err := d.nsqProducer.Publish(d.c.Nsq.Topic, b); err != nil {
		log.Errorf("PushMsg.send(push pushMsg:%v) error(%v)", pushMsg, err)
	}
	return
}

小结:

  • 针对用户单发时,会获取到具体的sever和keys组装到PushMsg
  • 房间消息,没有server和keys, 但是多一个room是通过typ和roomID组装而成的 “live://1000”
  • 广播消息,除了消息体之外,另外有一个speed字段

传输消息

由Logic处理好的消息会放在MQ中,Job任务会自动消费消息,然后通过gRPC调用Comet单元。相比其他两个网元,Job就简单多了。从MQ中消费到消息后会调用c.job.push(ctx, pushMsg)

// job 发送消息(普通消息,房间消息,广播)
func (j *Job) push(ctx context.Context, pushMsg *pb.PushMsg) (err error) {
	switch pushMsg.Type {
	case pb.PushMsg_PUSH:
		err = j.pushKeys(pushMsg.Operation, pushMsg.Server, pushMsg.Keys, pushMsg.Msg)
	case pb.PushMsg_ROOM:
		// 获取一个job中的Room缓存,用于房间内“定时,定量”发送消息,减少请求次数
		// 这里调用的Push并不会立即发送,而是放在Room.proto这个channel中
		// 实际放松是由Room.pushproc来定时
		err = j.getRoom(pushMsg.Room).Push(pushMsg.Operation, pushMsg.Msg)
	case pb.PushMsg_BROADCAST:
		err = j.broadcast(pushMsg.Operation, pushMsg.Msg, pushMsg.Speed)
	default:
		err = fmt.Errorf("no match push type: %s", pushMsg.Type)
	}
	return
}

// 根据serverID发送给特定的Comet服务,避免广播
// cometServers 是由discovery服务发现维护的comet列表。
func (j *Job) pushKeys(operation int32, serverID string, subKeys []string, body []byte) (err error) {
	buf := bytes.NewWriterSize(len(body) + 64)
	p := &comet.Proto{
		Ver:  1,
		Op:   operation,
		Body: body,
	}
	p.WriteTo(buf)
	p.Body = buf.Buffer()
	p.Op = comet.OpRaw
	var args = comet.PushMsgReq{
		Keys:    subKeys,
		ProtoOp: operation,
		Proto:   p,
	}
	if c, ok := j.cometServers[serverID]; ok {
		if err = c.Push(&args); err != nil {
			log.Errorf("c.Push(%v) serverID:%s error(%v)", args, serverID, err)
		}
		log.Infof("pushKey:%s comets:%d", serverID, len(j.cometServers))
	}
	return
}

// 处理成一个BroadcastReq,并广播给所有的Comet
func (j *Job) broadcast(operation int32, body []byte, speed int32) (err error) {
	// ... 与pushKeys一致,生成一个p
	comets := j.cometServers
	// 如 speed = 64, len(comets) = 2, speed = 32
	speed /= int32(len(comets)) 
	var args = comet.BroadcastReq{
		ProtoOp: operation,
		Proto:   p,
		Speed:   speed, // 是被传递给Comet处理,继续跟踪
	}
	for serverID, c := range comets {
		if err = c.Broadcast(&args); err != nil {
			log.Errorf("c.Broadcast(%v) serverID:%s error(%v)", args, serverID, err)
		}
	}
	log.Infof("broadcast comets:%d", len(comets))
	return
}

房间消息处理

getRoom(roomID) -> room.Push() -> p -> room.proto
	|
	|---> NewRoom(batch, duration)
			|
			|---> go room.pushproc() -> p <- room.proto
// goim/internal/job/room.go
type Room struct {
	c     *conf.Room // 关于房间的配置
	job   *Job       // 绑定job,为了追溯Room所属的Job
	id    string     // 房间ID
	proto chan *comet.Proto // 有缓冲channel
}

// pushproc merge proto and push msgs in batch.
// 默认batch = 20, sigTime = 1s
func (r *Room) pushproc(batch int, sigTime time.Duration) {
	var (
		n    int
		last time.Time
		p    *comet.Proto
		buf  = bytes.NewWriterSize(int(comet.MaxBodySize)) // 4096B = 4KB
	)
	
	// 设置了一个定时器,在一定时间后往room.proto放送一个roomReadyProto信号。
	td := time.AfterFunc(sigTime, func() {
		select {
		case r.proto <- roomReadyProto:
		default:
		}
	})
	defer td.Stop()

	for {
		if p = <-r.proto; p == nil {
			// 如果创建了room,但是读到空包
			break // exit
		} else if p != roomReadyProto {
			// 读取room.proto 如果是正常的数据包,则合并到buf中去,如果满了怎么办?
			p.WriteTo(buf)
			// 如果是第一个数据包,则重置定时器,并继续读取后续数据包
			if n++; n == 1 {
				last = time.Now()
				td.Reset(sigTime)
				continue
			} else if n < batch {
				// 后续的数据包,不会重置定时器,但是如果时间仍在第一个数据包的 sigTime 时间间隔内
				// 简单说,定时器还没到时间
				if sigTime > time.Since(last) {
					continue
				}
			}
			// 累计的数据包数量已经超过了batch, 执行发送动作
		} else {
			// 定时器到读到了roomReadyProto 
			// 如果buf已经被重置了,则跳出循环执行清理动作;否则执行发送消息		
			if n == 0 {
				break
			}
		}

		// 发送房间内的消息
		_ = r.job.broadcastRoomRawBytes(r.id, buf.Buffer())		
		buf = bytes.NewWriterSize(buf.Size())
		n = 0

		// 如果配置了房间最大闲置时间,则重新设定定时器
		// 也就是说,如果房间被创建后,处理完了该房间的消息,并不是直接跳出循环清理房间
		// 而是,会阻塞等待下一次的消息再来,如果在 “1m / r.c.Idle” 时间内没有来,则会跳出循环清理掉该房间
		// 如果在 “1m / r.c.Idle” 内有消息,则会重新设定定时器为sigTime,并为proto计数
		if r.c.Idle != 0 {
			td.Reset(time.Duration(r.c.Idle)) // 默认15分钟
		} else {
			td.Reset(time.Minute)
		}
	}

	// 清理动作
	r.job.delRoom(r.id)
}

总结如下图:

小结:

  • 针对用户单发时,会直接发送到对应的comet服务,根据key再去给特定的channel发送消息
  • 房间消息,这个会特殊一些,Job持有一个特殊的Room结构,用于合并发送到该房间的消息,定时定量发送房间消息(好处是,减少了gRPC调用次数降低系统负载,缺点增加时消息延迟)
  • 广播消息,将消息封装到BroadcastPushReq中,然后直接发送给所有的Comet

投递消息

Comet接收到Job单元的gRPC调用之后,会将消息通过Websocket套接字按照GOIM约定的协议格式发送给指定客户端。从Job那边传输过来的消息,依旧是分为用户消息,房间消息,全局消息。这里得先说明下Comet是如何管理用户端的长连接,如下图:

Bucket是在一个管理Channel和Room的数据结构,它的作用在于使用了hash来将Channel做分片管理,相较于集中管理,这样channel分布在不同的bucket中而不是一个map,可以降低冲突,减小锁的粒度。

有了上述结构,那么消息发送在忽略传输层的情况下:

针对用户单发

调用链路为:comet.Bucket(key).Channel(key).Push(proto),这里Push也只是将proto放在了channel的队列中(10缓冲),消息的下发在goim/internal/comet/server_websocket.go#dispatchWebsocket

房间消息

在Comet内部遍历Buckets并调用Bucket.BroadcastRoom(),但是这里也只是把消息放到了“某处”,并没有直接发送。实际发送代码在goim/internal/comet/bucket.go#roomproc

// BroadcastRoom broadcast a message to specified room
func (b *Bucket) BroadcastRoom(arg *grpc.BroadcastRoomReq) {
	// 这里取模选中一个goroutine来执行任务
	num := atomic.AddUint64(&b.routinesNum, 1) % b.c.RoutineAmount
	// b.routines 是 b.c.RoutineAmount 数量的 有 b.c.RoutineSize 缓冲大小的 chan *grpc.BroadcastRoomReq
	b.routines[num] <- arg
}

// 在创建Bucket时,便开启了goroutine来处理
func (b *Bucket) roomproc(c chan *grpc.BroadcastRoomReq) {
	for {
		arg := <-c
		if room := b.Room(arg.RoomID); room != nil {
			room.Push(arg.Proto)
		}
	}
}

// 遍历房间内的channel的链表,将消息放到channel的发送队列中,又回到了channel消息单发的逻辑。
func (r *Room) Push(p *grpc.Proto) {
	r.rLock.RLock()
	for ch := r.next; ch != nil; ch = ch.Next {
		_ = ch.Push(p)
	}
	r.rLock.RUnlock()
}

广播消息

在Comet内部遍历Buckets并向Bucket中的所有Channel发送消息。这里终于用到了speed,上文提到过,如果设定speed = 64, len(comets) = 2, 那么这里用到的 speed = 32。

// Broadcast broadcast msg to all user.
func (s *server) Broadcast(ctx context.Context, req *pb.BroadcastReq) (*pb.BroadcastReply, error) {
	if req.Proto == nil {
		return nil, errors.ErrBroadCastArg
	}

	go func() {
		for _, bucket := range s.srv.Buckets() {
			bucket.Broadcast(req.GetProto(), req.ProtoOp)
			if req.Speed > 0 {
				//该bucket
				// 有0个channel时,t = 0 / 32 = 0
				// 有2个channel时, t = 2 / 32 = 0.0625
				// 有32个channel时, t = 32 / 32 = 1
				// 有64个channel时,t = 64 / 32 = 2
				// 由此可得,(comet)speed 的含义是 每个bucket每秒最多发送的消息数量
				t := bucket.ChannelCount() / int(req.Speed)
				time.Sleep(time.Duration(t) * time.Second)
			}
		}
	}()
	return &pb.BroadcastReply{}, nil
}


// 广播,直接从bucket.chs中遍历
func (b *Bucket) Broadcast(p *grpc.Proto, op int32) {
	var ch *Channel
	b.cLock.RLock()
	for _, ch = range b.chs {
		// 如果不在该channel的监听队列中,那么该消息不会发给该客户端
		// 这个监听队列,是在建立连接是发送的 “accepts” 字段中取得的
		// 譬如accpets = [1000, 1001, 1002], 但是op = 1003, 那么就不会发送
		// 
		// 值得注意的是,这个op是从BroadcastReq.ProtoOp取得,BroadcastReq.ProtoOp又是从pushMsg.Operation取得
		// 也就是说 op = grpc.BroadcastReq.ProtoOp = proto.Op = PushMsg.Operation = 从发送消息接口产生。
		// 
		if !ch.NeedPush(op) {
			continue
		}
		_ = ch.Push(p)
	}
	b.cLock.RUnlock()
}

小结:

  • 针对用户单发时,直接利用key定位到Bucket和Channel,将消息放到队列中。
  • 房间消息,将消息分配到房间协程之一的队列中,在该协程中会持续不断的消费消费消息并处理,处理动作是将消息分发到Channel的消息队列(buffered chan)上。
  • 广播消息,直接使用了bucket的chs遍历,来为每一个Channel推送一条消息到消息队列上。

这里会展示GOIM中必要的数据结构,帮助理解GOIM中的数据流转过程。 这里会出现几个名词:

  • server: comet服务的hostname (string)
  • mid: 用户在业务中的ID (int64)
  • key: 用户在GOIM中的唯一ID (string)

Session结构

Session由Redis管理,维持了客户端MID,Server,Key的关系,这部分是在Logic中gRPC服务的Connect方法中设置。如下图所示:

// goim/internal/logic/conn.go
func (l *Logic) Connect(c context.Context, server, cookie string, token []byte) (mid int64, key, roomID string, accepts []int32, hb int64, err error) {
	var params struct {
		Mid      int64   `json:"mid"`     // 用户ID
		Key      string  `json:"key"`     // 客户端标识别,如果为空则自动生成UUID
		RoomID   string  `json:"room_id"` // 客户端加入房间 
		Platform string  `json:"platform"`// 客户端所在平台
		Accepts  []int32 `json:"accepts"` // 监听房间
	}
	if err = json.Unmarshal(token, &params); err != nil {
		log.Errorf("json.Unmarshal(%s) error(%v)", token, err)
		return
	}
	mid = params.Mid
	roomID = params.RoomID
	accepts = params.Accepts
	hb = int64(l.c.Node.Heartbeat) * int64(l.c.Node.HeartbeatMax)
	if key = params.Key; key == "" {
		key = uuid.New().String()
	}
	if err = l.dao.AddMapping(c, mid, key, server); err != nil {
		log.Errorf("l.dao.AddMapping(%d,%s,%s) error(%v)", mid, key, server, err)
	}
	log.Infof("conn connected key:%s server:%s mid:%d token:%s", key, server, mid, token)
	return
}


// goim/internal/logic/dao/redis.go
// 
func (d *Dao) AddMapping(c context.Context, mid int64, key, server string) (err error) {
	// ...
	if mid > 0 {
		if err = conn.Send("HSET", keyMidServer(mid), key, server); err != nil {
			log.Errorf("conn.Send(HSET %d,%s,%s) error(%v)", mid, server, key, err)
			return
		}
		if err = conn.Send("EXPIRE", keyMidServer(mid), d.redisExpire); err != nil {
			log.Errorf("conn.Send(EXPIRE %d,%s,%s) error(%v)", mid, key, server, err)
			return
		}
		// ...
	}
	if err = conn.Send("SET", keyKeyServer(key), server); err != nil {
		log.Errorf("conn.Send(HSET %d,%s,%s) error(%v)", mid, server, key, err)
		return
	}
	if err = conn.Send("EXPIRE", keyKeyServer(key), d.redisExpire); err != nil {
		log.Errorf("conn.Send(EXPIRE %d,%s,%s) error(%v)", mid, key, server, err)
		return
	}
	// ...
}

AddMapping方法中,总结下得到:

如果(mid=1, key=69dafe8b58066478aea48f3d0f384820,server=comet.001)
mid_1 = {69dafe8b58066478aea48f3d0f384820: comet.001}
key_69dafe8b58066478aea48f3d0f384820 = "comet.001"

也就是说,同一个用户可以在多个地方同时连入系统;同时也能看出来,Session管理并不包括用户所在的房间,用户需要接受哪些房间的消息,这部分是在是在Logic.Connect处理好了之后通过gRPC响应,交给Comet处理的。

// goim/internal/comet/server_websocket.go
func (s *Server) ServeWebsocket(conn net.Conn, rp, wp *bytes.Pool, tr *xtime.Timer) {
	// ...
	if p, err = ch.CliProto.Set(); err == nil {
		if ch.Mid, ch.Key, rid, accepts, hb, err = s.authWebsocket(ctx, ws, p, req.Header.Get("Cookie")); err == nil {
			ch.Watch(accepts...) // 监听房间列表
			b = s.Bucket(ch.Key) // 根据用户key选择一个bucket (对key做cityhash再取模)
			err = b.Put(rid, ch) // 将用户ID和连接Channel维护到Bucket中
			if conf.Conf.Debug {
				log.Infof("websocket connnected key:%s mid:%d proto:%+v", ch.Key, ch.Mid, p)
			}
		}
	}
	// ...
}

// auth for goim handshake with client, use rsa & aes.
func (s *Server) authWebsocket(ctx context.Context, ws *websocket.Conn, p *grpc.Proto, cookie string) (mid int64, key, rid string, accepts []int32, hb time.Duration, err error) {
	for {
		if err = p.ReadWebsocket(ws); err != nil {
			return
		}
		if p.Op == grpc.OpAuth {
			break
		} else {
			log.Errorf("ws request operation(%d) not auth", p.Op)
		}
	}
	// rid roomID
	if mid, key, rid, accepts, hb, err = s.Connect(ctx, p, cookie); err != nil {
		return
	}
	p.Op = grpc.OpAuthReply
	p.Body = nil
	if err = p.WriteWebsocket(ws); err != nil {
		return
	}
	err = ws.Flush()
	return
}

消息格式

1 - 任务队列消息

不管是个人消息,还是房间消息和广播消息,都是用的如下结构;其中Op和Type可以帮助Job单元可以针对消息上做差异化的处理。

type PushMsg struct {
	Type                 PushMsg_Type // 消息类型,个人,房间广播,广播
	Operation            int32        // 指令 goim/api/comet/grpc/operation.go
	Speed                int32        // 广播时用 TODO:
	Server               string       // Comet的Hostname, 个人消息时指定
	Room                 string       // 房间号
	Keys                 []string     // bucket key
	Msg                  []byte       // 消息体
}

2 - GOIM消息协议

区别于任务队列消息,这个条消息是客户端实际收到的消息(对比可以发现,其中只有Op和Body是从Logic单元传递过来的,其他字段很大一部分用于分流(定位Comet/Bucket/Room/Channel),或者系统字段用于差异化处理消息):

type Proto struct {
	Ver                  int32    // 版本号
	Op                   int32    // 消息类型,如Ping,Pong, Text
	Seq                  int32    // 序列号 TODO:
	Body                 []byte   // 消息体 等于 PushMsg.Msg
}

服务发现

服务发现可以帮助整个应用发现Comet单元和Logic单元,而Job单元并不需要注册自己(不需要被发现)。当然可以没有服务发现功能,直接在代码和配置中配置好(Comet/Logic)服务地址,但是也就失去了动态扩容的能力。另外,如果是K8S部署,这里的服务发现功能就有点冗余了,因此需要做一些调整再用K8S部署,调整包括(服务注册和发现抽象即于discovery结耦,可选开启;对于Comet的gRPC调用,在针对用户单发消息时,需要从定向单播变成广播)。

总结

  • GOIM将整个应用职责,分配给三个单个独立网元来承担相应的工作,让流程更清晰,应用也易于扩展(动态)。
  • 在用户和长连接的映射上,使用了Key来区别应用用户和业务用户,得以支持单个用户同时登陆(多平台)的场景;另外key也作为了Comet网元定位用户的唯一标识;利用了bucket + cityhash来降低竞争,加速用户定位并发送消息。
  • 在房间消息的处理上,出于消息频繁和业务场景的考虑:在Job上为房间消息增加了数据包合并机制;在Comet层为每一个Bucket都创建了一定数量的goroutine来持续处理房间消息。这两个动作,都能提高整个应用对于房间消息处理能力,提升吞吐量。
  • 从Job端将消息分发到Comet时,除了单个用户的消息能够指定Comet以外,其他的消息都只能广播给所有的Comet处理。

水平有限,如有错误,欢迎勘误指正🙏。

参考文档

comments powered by Disqus