nsq源码阅读(3)-与Client的交互处理IOLoop

具体 每个’命令’处理的封装

前面看到, listener.Accept() 之后, 解析了版本号, 当前由 protocol_v2 具体实现了 IOLoop

func (p *protocolV2) IOLoop(conn net.Conn) error {
	var err error
	var line []byte
	var zeroTime time.Time

	// nsqd 范围的 clientID 序列
	clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1)
	client := newClientV2(clientID, conn, p.ctx)

	// 这个messagePump 名字很形象, 把要发送给client 的messgae 从 缓存池子里Pump 抽出来 做具体的发送
	messagePumpStartedChan := make(chan bool)
	go p.messagePump(client, messagePumpStartedChan)
	<-messagePumpStartedChan  //TODO messagePump 里要做初始化, 具体是什么? 为什么需要这样阻塞一下IOLoop ?

	// 底下这个地方是 处理 client 的请求的
	for {
		// 如果 client 设置需要做心跳检查, 则设置 读超时为 两倍 心跳检查间隔
		// 也就是说, 在这个时间里, 正常情况下, client肯定会在这个间隔内发一个心跳包过来
		// read 不会因为client 本来就没消息而超时,
		// 但是如果还是超时了, 那就肯定是 网络连接除了问题
		if client.HeartbeatInterval > 0 {
			client.SetReadDeadline(time.Now().Add(client.HeartbeatInterval * 2))
		} else {
			// 加入client 不设置做心跳则不设置读超时
			client.SetReadDeadline(zeroTime)
		}

		//TODO 没太懂这个什么意思.
		// ReadSlice does not allocate new space for the data each request
		// ie. the returned slice is only valid until the next call to it
		line, err = client.Reader.ReadSlice('\n')
		if err != nil {
			if err == io.EOF {
				err = nil
			} else {
				err = fmt.Errorf("failed to read command - %s", err)
			}
			break
		}

		// 这个V2 版本的协议, 一行是一个命令
		line = line[:len(line)-1]
		// optionally trim the '\r' , 处理一下\r, 有可能win版本的回车是 \r\n
		if len(line) > 0 && line[len(line)-1] == '\r' {
			line = line[:len(line)-1]
		}
		// 每个命令, 用 " "空格来划分 参数
		params := bytes.Split(line, separatorBytes)

		if p.ctx.nsqd.getOpts().Verbose {
			p.ctx.nsqd.logf("PROTOCOL(V2): [%s] %s", client, params)
		}

		// 执行命令
		var response []byte
		response, err = p.Exec(client, params)
		if err != nil {
			ctx := ""
			if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
				ctx = " - " + parentErr.Error()
			}
			p.ctx.nsqd.logf("ERROR: [%s] - %s%s", client, err, ctx)

			sendErr := p.Send(client, frameTypeError, []byte(err.Error()))
			if sendErr != nil {
				p.ctx.nsqd.logf("ERROR: [%s] - %s%s", client, sendErr, ctx)
				break
			}

			// errors of type FatalClientErr should forceably close the connection
			if _, ok := err.(*protocol.FatalClientErr); ok {
				break
			}
			continue
		}

		// 加入命令是有 '响应' 的, 发送响应
		if response != nil {
			err = p.Send(client, frameTypeResponse, response)
			if err != nil {
				err = fmt.Errorf("failed to send response - %s", err)
				break
			}
		}
	}

	p.ctx.nsqd.logf("PROTOCOL(V2): [%s] exiting ioloop", client)
	conn.Close()

	// 通知 messagePump 退出
	close(client.ExitChan)
	if client.Channel != nil {
		client.Channel.RemoveClient(client.ID)
	}

	return err
}

func (p *protocolV2) Send(client *clientV2, frameType int32, data []byte) error {
	// 因为client 的处理 还有一个 messagePump 线程, 所以 发送要锁
	client.writeLock.Lock()

	var zeroTime time.Time
	if client.HeartbeatInterval > 0 {
		client.SetWriteDeadline(time.Now().Add(client.HeartbeatInterval))
	} else {
		client.SetWriteDeadline(zeroTime)
	}

	//V2 协议版本 发送给client, 是 使用 [(4byte)消息长度 , (4byte)消息类型, (载体)] 的 帧格式
	//但是为什么这个 格式的封装不是 写在 protocal_v2.go 而是在 protocaol 定义上?
	//个人觉得, 具体封包格式的 '具体实现' 应该在 '协议的具体实现'里,  也就是 protocal_v2, 而不是 '协议的定义' 里
	_, err := protocol.SendFramedResponse(client.Writer, frameType, data)
	if err != nil {
		client.writeLock.Unlock()
		return err
	}

	if frameType != frameTypeMessage {
		err = client.Flush()
	}

	client.writeLock.Unlock()

	return err
}

func SendFramedResponse(w io.Writer, frameType int32, data []byte) (int, error) {
	beBuf := make([]byte, 4)
	size := uint32(len(data)) + 4

	binary.BigEndian.PutUint32(beBuf, size)
	n, err := w.Write(beBuf)
	if err != nil {
		return n, err
	}

	binary.BigEndian.PutUint32(beBuf, uint32(frameType))
	n, err = w.Write(beBuf)
	if err != nil {
		return n + 4, err
	}

	n, err = w.Write(data)
	return n + 8, err
}

读取并解析请求之后, 传给 p.Exec(client, params) 执行具体的请求

func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
	//这个命令不需要做认证
	if bytes.Equal(params[0], []byte("IDENTIFY")) {
		return p.IDENTIFY(client, params)
	}
	// client 是否做了认证
	err := enforceTLSPolicy(client, p, params[0])
	if err != nil {
		return nil, err
	}
	//一行一个请求, 一个命令中用 " " 空格分割参数, 第一个参数是 命令标志
	switch {
	case bytes.Equal(params[0], []byte("FIN")):
		return p.FIN(client, params)
	case bytes.Equal(params[0], []byte("RDY")):
		return p.RDY(client, params)
	case bytes.Equal(params[0], []byte("REQ")):
		return p.REQ(client, params)
	case bytes.Equal(params[0], []byte("PUB")):
		return p.PUB(client, params)
	case bytes.Equal(params[0], []byte("MPUB")):
		return p.MPUB(client, params)
	case bytes.Equal(params[0], []byte("DPUB")):
		return p.DPUB(client, params)
	case bytes.Equal(params[0], []byte("NOP")):
		return p.NOP(client, params)
	case bytes.Equal(params[0], []byte("TOUCH")):
		return p.TOUCH(client, params)
	case bytes.Equal(params[0], []byte("SUB")):
		return p.SUB(client, params)
	case bytes.Equal(params[0], []byte("CLS")):
		return p.CLS(client, params)
	case bytes.Equal(params[0], []byte("AUTH")):
		return p.AUTH(client, params)
	}
	return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
}

精简总流程:

同样, 代码精简一下, IOLoop 里主要就是启动了两个线程, 一个处理订阅的消息的发送, 一个处理client 发过来的命令请求

func (p *protocolV2) IOLoop(conn net.Conn) error {

	messagePumpStartedChan := make(chan bool)
	go p.messagePump(client, messagePumpStartedChan)
	<-messagePumpStartedChan

	for {
		line, err = client.Reader.ReadSlice('\n')   // 读取
		params := bytes.Split(line, separatorBytes) // 解析
		response, err = p.Exec(client, params)		// 执行

		err = p.Send(client, frameTypeResponse, response) //发送结果
	}
}