nsq源码阅读(6)-全局唯一MessageID

客户端发送消息, 进行PUB命令时, nsqd 会使用建立msgid来标识一个msg

//客户端推送消息
func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) {
	//创建 Message 对象
	msg := NewMessage(<-p.ctx.nsqd.idChan, messageBody)

	//topic 发送消息
	err = topic.PutMessage(msg)
}

从这看出, MessageID是从一个idChan缓存队列中取出来的; 还是利用chan的思路来减少竞争; idChan的输入端, 也就是id生成器, 是在nsqd启动的时候, 单独起了一个idPump线程进行

func (n *NSQD) Main() {
	n.waitGroup.Wrap(func() {
		n.idPump()
	})
}
// 一直生产 序列化 id
func (n *NSQD) idPump() {
	factory := &guidFactory{}
	lastError := time.Unix(0, 0)
	//因为可以做成 nsqd 集群, 所以 msgid 关联 nsq id
	workerID := n.getOpts().ID
	for {
		id, err := factory.NewGUID(workerID)
		if err != nil {
			now := time.Now()
			if now.Sub(lastError) > time.Second {
				// only print the error once/second
				n.logf("ERROR: %s", err)
				lastError = now
			}
			runtime.Gosched()
			continue
		}

		//这里利用了 golang 的chan 满了阻塞的特性,
		//如果n.idChan 这个id缓存池满了, 就阻塞不会继续生产
		select {
		case n.idChan <- id.Hex():
		case <-n.exitChan:
			goto exit
		}
	}

	exit:
	n.logf("ID: closing")
}

func (f *guidFactory) NewGUID(workerID int64) (guid, error) {
	// divide by 1048576, giving pseudo-milliseconds
	ts := time.Now().UnixNano() >> 20

	if ts < f.lastTimestamp {
		return 0, ErrTimeBackwards
	}

	if f.lastTimestamp == ts {
		f.sequence = (f.sequence + 1) & sequenceMask
		if f.sequence == 0 {
			return 0, ErrSequenceExpired
		}
	} else {
		f.sequence = 0
	}

	f.lastTimestamp = ts

	// id = [ 37位ts + ?位 workerId + 12位 sequence ]
	id := guid(((ts - twepoch) << timestampShift) |
		(workerID << workerIDShift) |
		f.sequence)

	if id <= f.lastID {
		return 0, ErrIDBackwards
	}

	f.lastID = id

	return id, nil
}

workerid 的长度

光看这个代码, 就很疑惑, 既然参数 workerID 是 int64类型, 就是64位, 那岂不是在 跟 ts 拼的时候, 当workerid大于某个值的时候, 位数会”溢出”, “侵入” 到 ts 的值中? 那岂不是 特定的两个workerid会在特定的 ts 时得到相同的 guid? 比如:

ts workid guid
1110110001110011101101100011011110100 111111111111000000000000 11101100011100111011011000110111101111111111111000000000000…
1110110001110011101101100011011110111 1111111111000000000000 11101100011100111011011000110111101111111111111000000000000…

搜了下issues, #592 就有人提出来, workerid 必须 小于 1024, 也就是10位; 而这个是靠启动nsqd的时候检查配置项实现的

if opts.ID < 0 || opts.ID >= 1024 {
	n.logf("FATAL: --worker-id must be [0,1024)")
	os.Exit(1)
}

最开始觉得, 这个从代码角度讲, 很不够处女座, guid 中workID 只有10位, 而参数 workID 是 int64 类型 , 不如干脆减少squence 或者 ts的位数, 让workerID是16位使用 int16;

后来仔细想想这个逻辑, ts 是 nano » 20, 也就是 1 ts = 1048576 nano, 而 1 milliseconds = 1000000 nano; 这就是注释里说的 “giving pseudo-milliseconds” ,ts 近似等于 1毫秒

然后两次生成, 当ts 相同时, 靠 sequence 来区分, sequence 是 12位, 最大值 4096; 那也就是说, 这个生成算法, 同一个workerid, 在1毫秒内的最多能生成 4096个id;

所以, 假如减少 ts的位数或者 sequence 的位数, 让workerid的位数增加, 都会降低生成器的”性能”; 所以这三者的长度是一个权衡

这也是 NewGUID 错误处理中 做一次 runtime.Gosched() 的原因

for {
	id, err := factory.NewGUID(workerID)
	if err != nil {
		// 生成的太快了, 一个ts 中 生成了 4096个 id(sequence)
		// ts + sequence 满了, 休息一会, 让ts 发生改变
		runtime.Gosched()
		continue
	}
}