读源码-Gunicorn篇-4-worker
本节说明
上一节我们梳理了 gunicorn 是如何处理配置的,这一节我们来看 gunicorn 的工作进程是如何运行的。
分类
gunicorn 默认实现了 sync、eventlet、gevent、tornado、gthread 这几种类型 worker 的实现,含义如下:
| 名称 | 说明 | 异步 | 实现 |
|---|---|---|---|
| sync | 默认类型,每个worker同一时间只处理一个请求 | 否 | workers.sync.SyncWorker |
| eventlet | 基于 eventlet 实现,需要 monkey patch | 是 | workers.geventlet.EventletWorker |
| gevent | 基于 gevent 实现,需要 monkey patch | 是 | workers.ggevent.GeventWorker |
| tornado | 集成 Tornado 框架的事件循环机制,适合与 Tornado 应用结合使用 | 是 | workers.gtornado.TornadoWorker |
| gthread | 每个 worker 是一个进程,内部开启多个线程同时处理请求 | 否 | workers.gthread.ThreadWorker |
我们从最简单的默认的 sync 类型来看 gunicorn 是如何加载一个 worker 的。
加载
在 Arbiter.spawn_worker 方法中,有这样一行:
1 | worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS, |
这里使用 Arbiter.worker_class 直接实例化了一个 worker 对象,说明 worker_class 其实是一个 Class 类,它是从哪里来的?
在 Arbiter.setup 中,有这样一行:
1 | self.worker_class = self.cfg.worker_class |
可以看出它使用的是 Config 里配置的 worker_class。但是指定使用 worker 的配置项 --worker-class 是一个字符串啊,为什么这里可以直接当作一个类来实例化对象呢?
我们看 Config 类,它里面实现了一个 worker_class 方法:
1 |
|
这个方法通过 @property 进行了装饰,这样在调用 Arbiter.cfg.worker_class 时,其实是调用了这个方法,而不是像其他属性那样最后会到 Config.__getattr__ 中查找。而 worker_class 方法中通过指定的 worker 类型,调用 util.load_class 将对应的类对象加载了,所以 Arbiter.worker_class 就是一个类对象了,可以直接拿进行实例化对象。
创建
Arbiter 在实例化 worker 对象后,执行了 fork 系统调用,我们回顾一下这部分代码:
1 | def spawn_worker(self): |
worker 进程在创建后,执行了 worker.init_process 对子进程进行了初始化。
这里我们先看一下 SyncWorker 的结构:
classDiagram
direction LR
class Worker {
- ...
- pid
- ppid
- sockets
- app
- cfg
- timeout
+ notify
+ init_process
+ init_signals
+ load_wsgi
+ handle_...
+ ...
}
class SyncWorker {
- ...
+ accept
+ wait
+ run
+ run_for_one
+ run_for_multiple
+ handle
+ handle_request
}
Worker <|-- SyncWorker
这里又是一个典型的 模板方法 设计模式。
可以看到 init_process 其实是在 Worker 类中实现的。其实 Worker 是 gunicorn 中所有 worker 的基类,最终都会交给 Worker.init_process 来初始化进程。我们看一下这个方法的实现:
1 | def init_process(self): |
在 worker 进程中也是采用与主进程一样的办法,通过创建一个 PIPE 来唤醒进程,调用 init_signals 方法注册了信号处理方法,之后加载了 wsgi 对象,其实就是我们创建的 myapp:run:
1 | Worker.load_wsgi() -> BaseApplication.wsgi() -> WSGIApplication.load() -> |
之后调用了 Worker.run 方法。Worker 中并没有对 run 方法进行实现,而是交给了子类来处理。SyncWorker.run 中设置了一下默认的超时时长,将监听的 socket 配置为非阻塞,之后就根据监听的 socket 数量进入到处理单个还是多个 socket 的方法中,这里我们从简单的入手,看一下 run_for_one 方法:
1 | def run_for_one(self, timeout): |
这里就是子进程的主循环了!
与主进程的交互
在主循环中,我们看到在每次循环的开始和最后,分别调用了 notify 和 wait 两个方法,作用如下:
notify: 更新时间,用于主进程判断子进程是否还活着wait: 当没有请求需要处理时,等待一段时间,让出cpu时间
我们先看一下 wait 方法:
1 | def wait(self, timeout): |
wait 方法很简单,与 Arbiter.sleep 方法类似,也是使用 select.select 来等待事件发生或超时,之后马上进入主循环进行处理。
notify 方法更简单,只有一行:
1 | def notify(self): |
那么这一行代码干了什么呢?
我们看一下 Worker.tmp,它是 WorkerTmp 的一个对象,它创建了一个不会进行读写的临时文件的文件描述符 fd,当执行 notify 方法时,会更新这个 fd 的时间。
在主进程的 murder_workers 方法中,会依据这个时间,来判断子进程是否超时,代码如下:
1 | for (pid, worker) in workers: |
能看到当 worker 长时间未更新时间(超时)后,主进程会通知子进程退出或杀掉子进程。
但是这个 fd 是子进程里的,主进程为什么可以访问呢?
在这里:
1 | def __init__(self, age, ppid, sockets, app, timeout, cfg, log): |
因为这个 fd 是在主进程中创建的,所以 os.fork 后子进程会继承这个 fd,之后两个进程就都可以访问这个 fd 了,实现了主进程与子进程的交互。
请求处理
回到 SyncWorker.run 方法,在主循环中,因为 listener 被设置为非阻塞,所以当没有请求进来时,会马上抛出异常,而这些异常会被忽略掉,接着会进入 wait 方法等待 0.5 秒,如果在 wait 时突然有请求进来了,wait 方法中的 select.select 马上就会返回,将线程唤醒,进入下一次循环去处理到来的请求。
在 SyncWorker.accept 方法中,我们看到在收到一个连接后,会交由 handle 方法进行处理,由于 handle 方法是同步的,新进来的连接请求会排队等待,等 handle 结束后再进行处理。
1 | parser = http.RequestParser(self.cfg, client, addr) |
可以看到在 handle 方法中创建了一个 http parser 来解析用户请求,之后就交给了 handle_request 来处理:
1 | def handle_request(self, listener, req, client, addr): |
这里就是最终处理 http 请求的地方,可以看到在请求处理前先调用了配置的 pre_request 钩子方法,之后创建了响应对象和 WSGI 环境对象,设置响应对象完成后强制关闭链接,然后调用了 WSGI 应用,将用户请求交给了我们定义的 main:app,接收 WSGI 应用返回的迭代器,将内容写入响应对象,处理完成后,调用了配置的 post_request 钩子,完成用户请求的处理。
请求已经处理完成了,之后在 handle 方法中关闭了用户连接。
处理完成,回到主循环,等待下一个用户的请求。
至此。
在这个章节中,我们学习了一个 worker 是如何加载、创建的,如何与主进程交互,以及是如何处理用户请求的,在下一个章节,我们将要看一下一个 http 请求是如何被解析、处理的。
马上来。