nsq源码阅读(1)-启动和优雅退出
启动, 利用svc框架
nsq使用了svc框架来启动一个service, Run 时, 分别调用prg 实现的 Init 和 Start 方法 启动’program’,然后监听 后两个参数的信号量, 当信号量到达, 调用 prg 实现的 Stop 方法来退出
func main() {
prg := &program{}
// svc 框架的Run 方法启动一个service
if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
log.Fatal(err)
}
}
// svc 框架, 启动 service, 也可以理解为 daemon
//
// 方法会一直阻塞, 直到 指定的信号量到来
// 如果指定信号为空, 默认是 syscall.SIGINT and syscall.SIGTERM
func Run(service Service, sig ...os.Signal) error {
env := environment{}
//先调用Init 初始化
if err := service.Init(env); err != nil {
return err
}
//再调用Start
//其实有时候根本分不太清楚哪里是init哪里是start
//作为一个框架, 当然要做的优雅一点
if err := service.Start(); err != nil {
return err
}
//准备信号量处理
if len(sig) == 0 {
sig = []os.Signal{syscall.SIGINT, syscall.SIGTERM}
}
signalChan := make(chan os.Signal, 1)
signalNotify(signalChan, sig...)
//整个程序会阻塞在这里, 等待系统信号量
<-signalChan
// 当信号来到, 调用stop 方法停掉
return service.Stop()
}
svc 生命周期三个方法的实现:
func (p *program) Init(env svc.Environment) error {
//svc 封装了不同操作系统的Deamon服务进程相关的东西
//TODO window deamon的不同点
if env.IsWindowsService() {
dir := filepath.Dir(os.Args[0])
return os.Chdir(dir)
}
return nil
}
func (p *program) Start() error {
// 配置参数解析
flagSet.Parse(os.Args[1:])
if *showVersion {
fmt.Println(version.String("nsqlookupd"))
os.Exit(0)
}
var cfg map[string]interface{}
if *config != "" {
_, err := toml.DecodeFile(*config, &cfg)
if err != nil {
log.Fatalf("ERROR: failed to load config file %s - %s", *config, err.Error())
}
}
opts := nsqlookupd.NewOptions()
options.Resolve(opts, flagSet, cfg)
daemon := nsqlookupd.New(opts)
// 这里是主要的启动方法
daemon.Main()
p.nsqlookupd = daemon
return nil
}
func (p *program) Stop() error {
if p.nsqlookupd != nil {
p.nsqlookupd.Exit()
}
return nil
}
去掉svc框架, 把启动流程整个精简拼凑:
func main() {
p.nsqlookupd.Main()
<-signalChan
p.nsqlookupd.Exit()
}
优雅退出
NSQLookupd 的真正的启动函数是Main
lookup服务主要有通过两种方式提供服务, tcp 和 http
func (l *NSQLookupd) Main() {
ctx := &Context{l}
tcpListener, err := net.Listen("tcp", l.opts.TCPAddress)
if err != nil {
l.logf("FATAL: listen (%s) failed - %s", l.opts.TCPAddress, err)
os.Exit(1)
}
// 这里为什么要加锁?
// 当前场景不好理解, 换个场景:
//if l.playerList.count() > 0 {
// 如果这里l 不 lock的话, 多线程 执行了 l.playerList = anotherPlayerList
// 然后再 执行delete 就不对了
// l.playerList.delete(n)
//}
l.Lock()
l.tcpListener = tcpListener
l.Unlock()
tcpServer := &tcpServer{ctx: ctx} // 这个就是handler
//起一个tcp处理进程, 主要处理 client 和nsq server的连接
l.waitGroup.Wrap(func() {
protocol.TCPServer(tcpListener, tcpServer, l.opts.Logger)
})
//nsqlookup 还支持http操作, 所以再起一个线程
httpListener, err := net.Listen("tcp", l.opts.HTTPAddress)
if err != nil {
l.logf("FATAL: listen (%s) failed - %s", l.opts.HTTPAddress, err)
os.Exit(1)
}
l.Lock()
l.httpListener = httpListener
l.Unlock()
httpServer := newHTTPServer(ctx)
l.waitGroup.Wrap(func() {
http_api.Serve(httpListener, httpServer, "HTTP", l.opts.Logger)
})
}
现在我们就有了3个线程:
- Main线程, 启动两个服务进程后阻塞等待系统信号量
- tcp 服务线程, 循环调用 listener.Accept() 接收连接请求
- http 服务线程, 其实跟tcp一样, 经过http封装后, 循环处理http request 请求
假如Main线程在接收到系统信号量之后直接退出, 那么其他两个线程也会跟着销毁. 于是这里就会出现一个问题, tcp服务线程 和 http服务线程, 是业务逻辑实现的主要线程, 里面可能正在”干活”, 此时需要退出, 是否有数据需要持久化了? 事务是否正好执行到一半?
所以退出时需要做优雅退出处理. Main所在线程退出之前, 需要 ‘通知’ 并且 ‘等待’ 两个服务线程退出后才能退出
为了实现这一点, nsq 利用了 sync包中的 waitgroup 组件
nsq 为了方便使用, 做了一个封装:
type WaitGroupWrapper struct {
sync.WaitGroup
}
func (w *WaitGroupWrapper) Wrap(cb func()) {
w.Add(1) // +1
// 起一个线程, 就是go 的 goroutine
go func() {
cb() //执行任务
w.Done() // -1
}()
}
那么比如nsq 启动 tcp 服务线程 的代码, 就相当于:
w.Add(1)
go func() {
protocol.TCPServer(tcpListener, tcpServer, l.opts.Logger)
w.Done()
}()
w.Wait() //Main 线程就会阻塞在这里; 等待TCPServer 结束时调用w.Done()后才退出
通知 goroutine 退出
好了, 现在Main 会等待TCPServer 退出之后再退出了, 那, Main要退出的时候, 如何通知 TCPServer 退出?
protocol.TCPServer 就是循环的监听 tcp listener的 accept 连接, 交给 handler 处理; 然后这个for 循环是一个死循环, 依靠 accept 返回 error 才退出
// ==================== 注意, 这里的 TCPServer 运行在一个新的 goroutine 中
// 这个TCPServer 也是一个可以复用的比较不错的 tcp 程序 accept 模式
func TCPServer(listener net.Listener, handler TCPHandler, l app.Logger) {
l.Output(2, fmt.Sprintf("TCP: listening on %s", listener.Addr()))
for {
clientConn, err := listener.Accept()
if err != nil {
if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
l.Output(2, fmt.Sprintf("NOTICE: temporary Accept() failure - %s", err))
runtime.Gosched() // 是临时的错误, 暂停一下继续
continue
}
// theres no direct way to detect this error because it is not exposed
if !strings.Contains(err.Error(), "use of closed network connection") {
l.Output(2, fmt.Sprintf("ERROR: listener.Accept() - %s", err))
}
break
}
go handler.Handle(clientConn)
}
l.Output(2, fmt.Sprintf("TCP: closing %s", listener.Addr()))
}
再看Exit函数:
// ==================== 注意, 这里 是 Main 所在线程
func (l *NSQLookupd) Exit() {
if l.tcpListener != nil {
//Close 之后, listener.Accept() 就会返回error,
//从而退出循环, TCPServer 线程退出
l.tcpListener.Close()
}
if l.httpListener != nil {
l.httpListener.Close()
}
l.waitGroup.Wait()
}
把整个流程代码的主干拼凑在一起, 就比较明了了:
func (l *NSQLookupd) Main() {
w.Add(1)
go func() {
//=== TCPServer 线程 start
for {
clientConn, err := listener.Accept()
if err != nil {
break
}
go handler.Handle(clientConn)
}
w.Done()
//=== TCPServer 线程 end
}()
}
func (l *NSQLookupd) Exit() {
l.tcpListener.Close()
l.waitGroup.Wait()
}
如果把 svc 的也套进来, 整个程序的 启动流程就是:
func main() {
// 告诉wg 准备要启动一个goroutine
waitGroup.Add(1)
go func() {
for {
clientConn, err := listener.Accept()
if err != nil {
break
}
go handler.Handle(clientConn)
}
waitGroup.Done()
}()
// 当退出信号到达
<-signalChan
// 关闭listener
tcpListener.Close()
//检查并等待 所有的 goroutine 都结束, 而不是强制退出
waitGroup.Wait()
}
这里信号量到达之后, 做了一个 tcpListener.Close(), 其实这个动作的目的是通知 tcpListener 服务所在的线程退出
这里直接利用了 listener 被 close 之后再 accept 调用会返回error的特性, 既退出了线程, 又关闭了 listener, 一举两得
nsq 里还有各种退出线程的方式
精简总流程:
这里的大框架是app 启动和优雅退出的总流程
func main() {
// 告诉wg 准备要启动一个goroutine
waitGroup.Add(1)
go func() {
for {
clientConn, err := listener.Accept()
if err != nil {
break
}
go handler.Handle(clientConn)
}
waitGroup.Done()
}()
// 当退出信号到达
<-signalChan
// 关闭listener
// tcp 服务线程也会因为accept 返回error 而退出
tcpListener.Close()
//检查并等待 所有的 goroutine 都结束, 而不是强制退出
waitGroup.Wait()
}