nsq源码阅读(5)-至少被成功投递一次的实现

保证只被成功投递一次?

上次阅读了 Message 投递的整个数据流程(正常情况下的流程); 阅读中, 心中的一个疑问还是没有解开, 就是 “消息如何保证 被 且 只被 成功投递一次?”; 并且粗略阅读过程中, 发现消息被发送给Client之前, 会被塞到另外一个 InFlight 队列中, 更加疑惑, 既然消息已经做过了发送, 为什么还要塞到另外一个队列里?

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
	for {
		select {
		case msg := <-memoryMsgChan:
			//TODO 这个是什么作用? 下面不是已经发送了? 怎么又塞到一个队列里?
			subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
			client.SendingMessage()

			err = p.SendMessage(client, msg, &buf)
		}
	}
}

查看nsq 的文档, 在 FEATURES & GUARANTEES 一章, 列出了 4个 保证:

好嘛… nsq 并没有保证 “仅一次”, 只保证了 “至少一次”

至少一次的实现

这个函数叫做channle.StartInFlightTimeout(), 根据字面意思理解, InFlight, 开始”投递了”, InFlightTimeout投递超时, StartInFlightTimeout开始一个(防止)投递超时的(动作)

看下函数实现

func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error {
	now := time.Now()
	msg.clientID = clientID
	msg.deliveryTS = now
	// msg的 优先级是 超时的时间戳
	msg.pri = now.Add(timeout).UnixNano()
	//放到缓存, 按msg.ID, 记录一下, 估计方便之后查找: c.inFlightMessages[msg.ID]
	err := c.pushInFlightMessage(msg)
	if err != nil {
		return err
	}
	//放到一个叫 Inflight的队列里
	c.addToInFlightPQ(msg)
	return nil
}

函数里, “至少一次” 如何实现, 并没有很清楚, 只不过又塞到另一个InFlight队列里;这里插入队列, 所以还是查找这个队列在哪里被”读”, 找到在 channel.processInFlightQueue()

这个函数的功能是 把 InFlightQueue 里, 优先级 小于 参数 t 的, 全部重新发送

//把 InFlightQueue 里, 优先级 小于 参数 t 的, 全部重新发送
func (c *Channel) processInFlightQueue(t int64) bool {
	// 先 检查是否已经 退出
	c.exitMutex.RLock()
	defer c.exitMutex.RUnlock()

	if c.Exiting() {
		return false
	}

	dirty := false
	for {
		c.inFlightMutex.Lock()
		// 如果 栈顶元素的优先级 小于参数
		// 弹出 栈顶元素并返回
		msg, _ := c.inFlightPQ.PeekAndShift(t)
		c.inFlightMutex.Unlock()

		//如果 栈顶 元素的优先级 大于参数, 返回 nil
		if msg == nil {
			// 没有大于 指定参数优先级的元素, 什么也不做, 返回 deirty = false
			goto exit
		}
		//标记是 "脏" 的
		dirty = true

		//把之前存起来的 msg 取出来
		//TODO: inFlightPQ 里存的不就是msg 么, 怎么又存一次?
		_, err := c.popInFlightMessage(msg.clientID, msg.ID)
		if err != nil {
			goto exit
		}
		atomic.AddUint64(&c.timeoutCount, 1)

		c.RLock()
		client, ok := c.clients[msg.clientID]
		c.RUnlock()
		if ok {
			//找出这个msg 原来是发给哪个client 发的
			//通知它这个msg timeout了, nsqd 要重发了
			client.TimedOutMessage()
		}
		// 重发, 重新塞入队列: channel.memoryMsgChan <- m:
		c.doRequeue(msg)
	}	//循环直到队列里 没有满足条件的

exit:
	return dirty
}

看来, 这个消息 “至少被投递一次” 的保证实现的有点粗暴? 直接起个延时, 超时了就重新发送?

再网上追追, processInFlightQueue 的调用在哪里:

// 这里应该是 "生产者/消费者" 模式
// 这个函数是 队列扫描 "消费者", 消费的是 扫描 channel InFlightQueue 和 DeferredQueue 的任务
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
	for {
		select {
		case c := <-workCh:

			// InFlightQueue 是 container/head 包实现的一个优先级队列, 队列的顶部的优先级最小
			// head 是一个 堆数据结构, 优先级队列 是一个 小根堆
			// TODO: 堆算法 https://idisfkj.github.io/2016/06/19/%E7%AE%97%E6%B3%95-%E4%B8%83-%E5%A0%86%E6%8E%92%E5%BA%8F/
			// 然后将这个 优先级队列做了一个转换, 当做 "超时队列" 来用,
			// 具体办法是将 超时时间 作为优先级
			// 那么, 队列的顶端的任务的 "优先级最小",  也就是它 应该"最早超时"
			now := time.Now().UnixNano()
			dirty := false
			if c.processInFlightQueue(now) {
				dirty = true
			}

			// DeferredQueue 也是一个优先级队列
			// 然后同样将这个 优先级队列 转换为 延时队列 来使用
			// 将 任务"触发(发动)时间" 当做优先级, 放到队列里
			if c.processDeferredQueue(now) {
				dirty = true
			}
			responseCh <- dirty
		//注意这个地方, 跟 之前的close(exitChan) 用法不同
		//这里是启动多个worker, 然后当判断worker太多了, 需要关闭一个多余的worker时
		//给 closeCh <- 1 发个消息, 利用golang chan 随机分发的特性
		//这样就会随机的关闭掉一个 worker, 也就是随机退出一个 queueScanWorker 的 循环
		case <-closeCh:
			return
		}
	}
}

把 InFlightQueue 的 写入和读取结合起来看:

//写入
msg.pri = now.Add(timeout).UnixNano() // now + timetou 作为 优先级
c.addToInFlightPQ(msg)

//读取
now := time.Now().UnixNano()  //跟 当前实现比较, 也就是比较是否超时
dirty := false
if c.processInFlightQueue(now) {
	dirty = true
}

现在重新看一下 processInFlightQueue(), 就明白 这个函数的 整个作用是: 把 InFlightQueue 里, 超时的 msg 全部重新发送出去

//把 InFlightQueue 里, 超时的 msg 全部重新发送出去
func (c *Channel) processInFlightQueue(t int64) bool {

	dirty := false
	for {
		// t 是当前时间 now, 假如 inFlightPQ 顶端的优先级(也就是超时时间) 小于 now
		// 那返回的msg 就是 "超时" 了
		msg, _ := c.inFlightPQ.PeekAndShift(t)

		//标记是 "脏" 的
		dirty = true

		// 重发, 重新塞入队列: channel.memoryMsgChan <- m:
		c.doRequeue(msg)
	}
}

queueScanWorker

现在已经知道了, “至少投递一次” 这个保证是由 queueScanWorker 不断的扫描 InFlightQueue 实现的, 之前猜测 queueScanWorker 用了 “生产者/消费者模式”, 所以再来看下 queueScanWorker 的启动

nsqd 启动的时候就 起了一个 queueScanLoop 线程, 间隔性的派发 scan 任务, 并适时调整 worker 的数量


func (n *NSQD) Main() {
	n.waitGroup.Wrap(func() {
		n.queueScanLoop()
	})
}

func (n *NSQD) queueScanLoop() {
	//任务派发 队列
	workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)

	//任务结果 队列
	responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)

	// 用来优雅关闭
	closeCh := make(chan int)

	workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
	refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)

	channels := n.channels()
	// 确切一点, 这里应该叫 resizeWorkerPool
	// 就是调整 queue scan 任务的 worker 的数量
	// 当然, 一开始就是启动一些 worker
	n.resizePool(len(channels), workCh, responseCh, closeCh)

	for {
		select {
		case <-workTicker.C: // 开始一次任务的派发
			if len(channels) == 0 {
				continue
			}
		case <-refreshTicker.C:  // 重新调整 worker 数量
			channels = n.channels()
			n.resizePool(len(channels), workCh, responseCh, closeCh)
			continue
		case <-n.exitChan:	// 退出
			goto exit
		}

		num := n.getOpts().QueueScanSelectionCount
		if num > len(channels) {
			num = len(channels)
		}

		loop:
		// 随机取出几个 channel, 派发给 worker 进行 扫描
		for _, i := range util.UniqRands(num, len(channels)) {
			workCh <- channels[i]
		}

		// 接收 扫描结果, 统一 有多少 channel 是 "脏" 的
		numDirty := 0
		for i := 0; i < num; i++ {
			if <-responseCh {
				numDirty++
			}
		}

		// 假如 "脏" 的 "比例" 大于阀值, 则不等待 workTicker
		// 马上进行下一轮 扫描
		if float64(numDirty) / float64(num) > n.getOpts().QueueScanDirtyPercent {
			goto loop
		}
	}

	exit:
	n.logf("QUEUESCAN: closing")
	close(closeCh)
	workTicker.Stop()
	refreshTicker.Stop()
}

func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
	// 设置 queueScanWorker 的 数量 为 当前 nsqd 所有channel 个数的 1/4
	idealPoolSize := int(float64(num) * 0.25)
	if idealPoolSize < 1 {
		idealPoolSize = 1
	} else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
		idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
	}
	for {
		//queueScanWorker 多了就减少一个, 少了就增加一个
		//一直循环, 直到 queueScanWorker 的数量满足要求
		if idealPoolSize == n.poolSize {
			break
		} else if idealPoolSize < n.poolSize {
			// queueScanWorker 多了, 减少一个
			// 利用 chan 的特性, 向closeCh 推一个消息, 这样 所有的 worCh 就会随机有一个收到这个消息, 然后关闭
			// 细节: 这里跟 exitCh 的用法不同, exitCh 是要告知 "所有的" looper 退出, 所以使用的是 close(exitCh) 的用法
			// 而如果想 让其中 一个 退出, 则使用 exitCh <- 1 的用法
			closeCh <- 1
			n.poolSize--
		} else {
			// queueScanWorker 少了, 增加一个
			n.waitGroup.Wrap(func() {
				n.queueScanWorker(workCh, responseCh, closeCh)
			})
			n.poolSize++
		}
	}
}

Message 的”完成”

func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
	switch {
	// 当一个客户端成功接收到msg 并处理完成, 按照协议会向nsqd 发送一个 "FIN" 命令通知nsqd, 这时候nsqd 会将这条msg 从InFlight队列中删除
	case bytes.Equal(params[0], []byte("FIN")):
		return p.FIN(client, params)
}

func (p *protocolV2) FIN(client *clientV2, params [][]byte) ([]byte, error) {

	id, err := getMessageID(params[1])
	err = client.Channel.FinishMessage(client.ID, *id)

	// 做些 client 计数
	client.FinishedMessage()

	return nil, nil
}

func (c *Channel) FinishMessage(clientID int64, id MessageID) error {
	//删除 inFlightMessages 缓存
	msg, err := c.popInFlightMessage(clientID, id)
	if err != nil {
		return err
	}
	// 从超时队列中删除
	c.removeFromInFlightPQ(msg)
	if c.e2eProcessingLatencyStream != nil {
		c.e2eProcessingLatencyStream.Insert(msg.Timestamp)
	}
	return nil
}

nsqd 收到客户端发来的”FIN” 只会, 就会从超时队列中删除这条msg; 这个时候, 一条msg 在nsqd 中的流转完成

细节1: 减少类型推断, 提高性能

InFlightQueue 和 DeferredQueue 为什么要实现两次? DeferredQueue 用 head 包实现, InFlightQueue 自己又实现了一次heap, 其实跟 DeferredQueue 不是一样的么?

之前两个就真是是一样的, 后来有一个提交,里面的注释是: this eliminates the use of container/heap and the associated cost of boxing and interface type assertions.

https://github.com/nsqio/nsq/commit/74bfde101934700cb0cd980d01b6dfe2fe5a6a53

意思就是说, 这些 队列里 存的是 Message 这个类型, 如果使用 heap, 需要存到 heap.Item 的 Value 里,而这个value 是一个 interface{} , 赋值 和 取值 都需要做类型推断 和 包装,那么作为 InFlightQueue 这个 “高负荷” 的队列, 减少这种 “类型推断和包装” , 有利于提高性能

写个test试一下

type Item struct {
	d1 int
	d2 int
}

func BenchmarkT1(b *testing.B) {
	q := make([]*Item, 0)	// 不需要类型推断的 slice
	for i := 0; i < b.N; i++ {
		q = append(q, &Item{i, i})
	}
	for _, hero := range q {
		hero.d1++
	}
}

func BenchmarkT2(b *testing.B) {
	q := make([]interface{}, 0)
	for i := 0; i < b.N; i++ {
		q = append(q, &Item{i, i})
	}
	for _, hero := range q {
		hero.(*Item).d1++	// 需要做类型推断
	}
}

结果还是很明显, 做类型推断的更耗时, 并且有50%的多余负载:

BenchmarkT1-8           10000000               241 ns/op
BenchmarkT2-8            5000000               332 ns/op