Python消息队列工具 Python-rq 中文教程

原创 tiangr  2016-09-30 19:36  阅读 506 次

翻译至python-rq官网 http://python-rq.org

十分钟入门

安装方法

pip install rq

首先,需要运行一个Redis服务,你可以使用一个已经存在的Redis,放置任务(jobs)至队列(queues),你不需要做任何其他特别的事情,仅仅需要定义你自己需要加入队列中耗时的阻塞方法:

import requests

def count_words_at_url(url):
    resp = requests.get(url)
    return len(resp.text.split())

接着创建一个RQ队列:

from redis import Redis
from rq import Queue

q = Queue(connection=Redis())

入列方法:

from my_module import count_words_at_url
result = q.enqueue(
             count_words_at_url, 'http://nvie.com')

Worker进程 在后台开始执行队列中的方法需要开启一个worker进程。

$ rq worker
*** Listening for work on default
Got count_words_at_url('http://nvie.com') from default
Job result = 818
*** Listening for work on default

文档

队列

一个"任务"(job)是一个Python对象,表示在一个"工作"(worker)进程(后台进程)中被异步调用的方法。任何Python方法都可以通过传递函数和其参数的引用值的方式被异步调用,这个过程称作"入队"(enqueueing)。

任务入队(enqueueing jobs)

将任务放置入队列,首先申明一个函数:

import requests

def count_words_at_url(url):
    resp = requests.get(url)
    return len(resp.text.split())

注意到了吗?此函数没有任何特殊的地方,任何函数都可以将其放入RQ队列。

count_words_at_url 放入队列,在后台运行。

from rq import Queue
from redis import Redis
from somewhere import count_words_at_url

# Tell RQ what Redis connection to use
redis_conn = Redis()
q = Queue(connection=redis_conn)  # no args implies the default queue

# Delay execution of count_words_at_url('http://nvie.com')
job = q.enqueue(count_words_at_url, 'http://nvie.com')
print job.result   # => None

# Now, wait a while, until the worker is finished
time.sleep(2)
print job.result   # => 889

如果你想指定任务至特定的队列,可以如此定义队列名称:

q = Queue('low', connection=redis_conn)
q.enqueue(count_words_at_url, 'http://nvie.com')

注意示例中的Queue('low'),你可以使用任意的名称替代以获取符合需求、具有扩展性的分布式队列任务模式。默认通用的队列命名方式(e.g. high, medium, low)可以区分队列任务的优先级。

此外,你可以添加一些额外的参数来控制队列任务。默认情况下,有如下键值对参数可以传递给任务方法:

  • timeout , 指定任务的最大运行时间,超时将被丢弃。
  • result_ttl , 指定保存的任务结果过期时间。
  • ttl , 指定任务排队的最大时间,超时将被取消。
  • depends_on , 指定任务对应所需执行的依赖任务(或者job id),必须完成依赖任务再执行指定任务。
  • job_id , 为任务手动添加一个job_id标识。
  • at_front , 将任务放置在队列的最前端而不是最后。
  • kwargsargs , 绕开这些自动弹出的参数 ie: 为潜在的任务方法指定一个timeout参数。

最后,建议使用更明晰的方法.enqueue_call()来取代.enqueue()

q = Queue('low', connection=redis_conn)
q.enqueue_call(func=count_words_at_url,
               args=('http://nvie.com',),
               timeout=30)

有些情况下,web进程无法进入运行在工作(worker) 中的源码(i.e. X中的代码需要调用一个Y中延迟函数),你可以通过字符串引用传递函数。

q = Queue('low', connection=redis_conn)
q.enqueue('my_package.my_module.my_func', 3, 4)

队列的使用

除了任务入列,队列(queues)还包含了一些其它有用的方法:

from rq import Queue
from redis import Redis

redis_conn = Redis()
q = Queue(connection=redis_conn) 

# Getting the number of jobs in the queue
print len(q)

# Retrieving jobs
queued_job_ids = q.job_ids # Gets a list of job IDs from the queue
queued_jobs = q.jobs # Gets a list of enqueued job instances
job = q.fetch_job('my_id') # Returns job having ID "my_id"

RQ设计原理

使用RQ的过程中,你你不需要提前创建任何的队列,你也不需要指定任何使用渠道,数据交换规则,路由规则等。你只需要将你的任务放入到任何你想要放入的队列中,一旦你将一个任务入列至一个还未存在的队列中,它将迅速被创建。

RQ没有使用任何中间人来指定消息的位置,你可能认为这是优势也可能任务这是不合理的,这取决与你想要解决的具体问题。

最后,RQ没有使用简明的协议,因为它依据的是Python自带的pickle模块来序列化工作任务。

延迟结果

当任务入列,queue.enqueue() 方法将会返回一个Job实例,这仅仅是一个用于检测实际任务运行结果的代理对象。

为此,它拥有一个方便的结果访问属性,在任务还没有完成时将返回None,或者当任务完成时返回一个非空值(假设此任务第一时间返回了值)。

@job 装饰器

如果你想使用类似Celery的代码风格,你可能需要使用@task装饰器。RQ>=3.0版本将拥有类似的装饰器:

from rq.decorators import job

@job('low', connection=my_redis_conn, timeout=5)
def add(x, y):
    return x + y

job = add.delay(3, 4)
time.sleep(1)
print job.result
绕过workers

为了方便测试,你可以入列一个任务而不需要绑定一个实际运行的工作(worker)进程(RQ >= 0.3.1 可用)。为了实现此功能,你可以在队列构造器中传递参数async=False.

>>> q = Queue('low', async=False)
>>> job = q.enqueue(fib, 8)
>>> job.result
21

以上代码将在同一进程中同步执行函数fib(8)而不需要任何一个激活的工作(worker)进程。类似于Celery中的ALWAYS_EAGER。

依赖任务

RQ 0.4.0版本的新特性,可以用来管理多任务直接的依赖关系。使用depends_on参数实现执行一个依赖于另一个任务的Job。

q = Queue('low', async=False)
report_job = q.enqueue(generate_report)
q.enqueue(send_report, depends_on=report_job)
The ability to handle job dependencies allows you to split a big job into several smaller ones. A job that is dependent on another is enqueued only when its dependency finishes successfully.
工作进程

(略)

任务(jobs)的注意事项

技术上来说,你可以放置任何Python方法到队列,但这不意味着你这样做是明智的,有些因素你在入列任务前必须考虑:

  • 首先确保worker进程可以引入任务函数的__module__。这意味着你不能入列申明在__main__模块中的任务方法。
  • 确保工作进程和进程的生成器可以共享源码。
  • 确保任务方法在上下文中没有任何依赖,尤其是全局变量。另外,任务方法依赖的状态(比如,"当前用户",或者"当前web访问请求")在工作(worker)进程中将不复存在。如果你想要为"当前"的用户执行相关任务,你应该为此用户创建一个实例对象,并将对应的对象引用当做参数传递至任务方法。
限制条件

RQ 的工作进程依赖系统的fork()方法,这意味着Windows下无法运行。

工作(worker)

Worker是一个运行在后台的Python进程,用来执行一些你不想要在web进程中执行的冗长或者阻塞任务。

开启workers进程

从项目的root目录下开启一个worker进程:

$ rq worker high normal low
*** Listening for work on high, normal, low
Got send_newsletter('me@nvie.com') from default
Job ended normally without result
*** Listening for work on high, normal, low
...

工作进程将无限循环读取给定队列(顺序非常重要)中的任务,并在所有任务方法执行完毕后继续等待新的任务。

每一个工作进程一次将只执行一个任务,在一个worker进程中,不会并行处理任务,如果你想要并行执行任务,你只需要开启更多的worker进程。

突发模式

默认情况下,工作进程将立即运行并且在运行完所有任务后一直阻塞直至新的任务产生。Worker进程同样可以使用突发模式运行,此模式将完成所有队列中的任务,并且在所有给定的队列执行完毕后退出。

$ rq worker --burst high normal low
*** Listening for work on high, normal, low
Got send_newsletter('me@nvie.com') from default
Job ended normally without result
No more work, burst finished.
Registering death.

这对批量执行需要周期性执行的任务或者大幅升高暂时性worker进程的峰值来说十分有用。

Worker进程内部原理

Worker进程生命周期

  1. 启动。加载Python环境。
  2. 注册。worker进程将自己注册进系统中。
  3. 监听。一个Job任务从任意指定的Redis队列中弹出。如果所有的队列都是空的并且worker进程运行在突发模式下,将立即退出。否则将持续等待新任务。
  4. 任务执行准备工作。Worker进程将设置状态为 busy 以告知系统:Job任务即将开始工作,并且将Job注册至StartedJobRegistry
  5. Fork子进程。一个子进程("work horse")在有故障的情况下将关闭任务。
  6. 执行任务。处理在主进程中实际执行的任务。
  7. 清除执行任务。worker进程将设置它的状态至"闲置" idle,并且依据result_ttl设置任务和任务执行结果至过期。Job任务将从 StartedJobRegistry 里移除,并在成功执行之后新增至 FinishedJobRegistry,失败后新增至 FailedQueue
  8. 循环。重复步骤3.

性能

基本上,rq worker进程脚本是一个简单 获取-Fork-执行的循环。当大量任务做了冗长的启动设置,或者他们全都依赖于同样的模块,你在执行每个任务时都将花费大量额外的时间(因为你在fork进程之后才进行import操作)。这种方式很简洁,而且RQ也因此不会出现内存泄漏的情况,但与此同时带来的负面影响是执行效率降低了。

对此,你能够采用在fork进程之前先import引入必要模块的方式来提高生产力。RQ目前没有提供可以采取这种设置的参数,但是你可以在你的worker进程进行循环之前先做import导入。

你可以自定义自己的worker脚本(替代掉原来使用的rq进程).一个简单的实例:

#!/usr/bin/env python
import sys
from rq import Connection, Worker

# Preload libraries
import library_that_you_want_preloaded

# Provide queue names to listen to as arguments to this script,
# similar to rq worker
with Connection():
    qs = sys.argv[1:] or ['default']

    w = Worker(qs)
    w.work()
进程命名

Workers 进程命名方式默认等于当前的hostname与当前PID连接。覆盖默认设置,可以在开始worker进程时指定一个 --name 选项

worker进程关闭

任何时候,worker进程收到SIGINT (via Ctrl+C) 或者 SIGTERM (via kill)信号,worker进程将会等待当前正在执行任务完成工作后,再关闭循环,将其注册入死亡进程。

如果是在关闭进程阶段,再次收到SIGINT 或者 SIGTERMworker进程将强制性使子进程中断(发送SIGKILL),但依然会尝试将其注册入死亡进程。

使用配置文件

0.3.2 版本中的新特性

如果你想要通过配置文件而不是命令行参数来配置rq进程,你可以创建一个名为settings.py的Python文件:

REDIS_URL = 'redis://localhost:6379/1'

# You can also specify the Redis DB to use
# REDIS_HOST = 'redis.example.com'
# REDIS_PORT = 6380
# REDIS_DB = 3
# REDIS_PASSWORD = 'very secret'

# Queues to listen on
QUEUES = ['high', 'normal', 'low']

# If you're using Sentry to collect your runtime exceptions, you can use this
# to configure RQ for it in a single step
SENTRY_DSN = 'http://public:secret@example.com/1'

图上示例展示了所有可用的配置选项。 注意:QUEUESREDIS_PASSWORD设置在0.3.3以后的版本才存在。 指定worker进程读取配置文件的路径使用 -c 参数:

$ rq worker -c settings

自定义worker类

版本0.4.0 的新特性

There are times when you want to customize the worker's behavior. Some of the more common requests so far are:

  • 在执行任务前优先管理数据库连接.
  • 执行任务使用不包含os.fork的模型.
  • 使用多进程模型multiprocessing 或者 gevent. 使用 -w 参数指定 worker类路径:
$ rq worker -w 'path.to.GeventWorker'

自定义任务和队列类

未来开放

You can tell the worker to use a custom class for jobs and queues using --job-class and/or --queue-class.

$ rq worker --job-class 'custom.JobClass' --queue-class 'custom.QueueClass'

Don't forget to use those same classes when enqueueing the jobs.

For example:

from rq import Queue
from rq.job import Job

class CustomJob(Job):
    pass

class CustomQueue(Queue):
    job_class = CustomJob

queue = CustomQueue('default', connection=redis_conn)
queue.enqueue(some_func)

自定义异常捕获操作

版本 0.5.5 的新特性

如果你想根据不同类型的任务来决定对应的异常操作,或者仅仅想重写异常处理,可以通过--exception-handler选项:

$ rq worker --exception-handler 'path.to.my.ErrorHandler'

# Multiple exception handlers is also supported
$ rq worker --exception-handler 'path.to.my.ErrorHandler' --exception-handler 'another.ErrorHandler'

结果

入列任务是延迟函数的调用,也就是说我们正在解决一个问题,但是需要等待一会才回来执行。

处理结果

Python 方法如果有返回值,如果任务返回一个非空的值,worker进程将返回值写入Redis所存的任务对应Hash下,TTL默认的过期时间为任务结束后的500s。

The party that enqueued the job gets back a Job instance as a result of the enqueueing itself. Such a Job object is a proxy object that is tied to the job's ID, to be able to poll for results.

返回结果 TTL

返回结果写入Redis时伴有一个有限的生存时间,可以避免Redis数据库数据无限增长。

RQ >= 0.3.1时,TTL的值可以在调用 enqueue_call()时通过使用 result_ttl关键词参数指定。它同样可以禁用过期,这时你需要手动清理数据。

q.enqueue_call(func=foo)  # result expires after 500 secs (the default)
q.enqueue_call(func=foo, result_ttl=86400)  # result expires after 1 day
q.enqueue_call(func=foo, result_ttl=0)  # result gets deleted immediately
q.enqueue_call(func=foo, result_ttl=-1)  # result never expires--you should delete jobs manually

此外,你可以使用来继续执行一些没有返回值,默认会立即删除的已完成任务,

q.enqueue_call(func=func_without_rv, result_ttl=500)  # job kept explicitly

异常处理

通常,任务可以在失败后抛出异常,RQ将用以下方式处理:

失败的任务需要关注并且失败任务的返回值不应该设置过期时间。更进一步,失败的任务需要再次运行测试。一般这些事情需要手动操作,因为RQ本身是无法字典或者可靠地自动判断任务重新执行是否安全。

当一个异常在任务中抛出,worker进程可以获取得到,并将其序列化,以键为exc_info的hash存储在任务对应的Redis下。任务的引用随即被置于失败队列中。

任务本身拥有一些十分有用的属性帮助检测:

  • 任务最初的创建时间
  • 最近一个任务入列的时间
  • 起始队列
  • 所需方法的文本描述
  • 异常信息

这些可以帮助你检测和手动追查问题,并且再次提交任务。

处理任务中断

worker进程被 Ctrl+C or kill 中断,RQ 几乎不会丢失任务。当前任务完成后,worker进程将其它未执行的任务终止。

然而,worker进程可以使用kill -9强制终止,不过,这种方式worker进程无法优雅地结束任务或者将任务放入失败队列中。因此,强制关闭一个worker进程可能引起潜在的问题。

处理任务超时

默认任务执行时间为180s,如果逾期未执行完毕,worker进程将终止主进程并且将任务放入失败队列,并标识其超时。

如果一个任务需要更多(或更少)时间来完成,默认的超时时间也将改变,你可以在调用Queue.enqueue()时通过参数指定:

q = Queue()
q.enqueue(func=mytask, args=(foo,), kwargs={'bar': qux}, timeout=600)  # 10 mins
You can also change the default timeout for jobs that are enqueued via specific queue instances at once, which can be useful for patterns like this:

# High prio jobs should end in 8 secs, while low prio
# work may take up to 10 mins
high = Queue('high', default_timeout=8)  # 8 secs
low = Queue('low', default_timeout=600)  # 10 mins

# Individual jobs can still override these defaults
low.enqueue_call(really_really_slow, timeout=3600)  # 1 hr
Individual jobs can still specify an alternative timeout, as workers will respect these.

任务(job)

在一些情况下,你可能需要进入当前的任务ID或者任务方法的实例,或者为任务存于任意数据。

进入 "当前" 任务

版本 0.3.3 的新特性

因为任务方法和Python方法本质是一样的,你可以向RQ 获取当前任务ID:

from rq import get_current_job

def add(x, y):
    job = get_current_job()
    print 'Current job: %s' % (job.id,)
    return x + y

保存数据

版本 0.3.3 的新特性

为了给任务增加/更新自定义的状态信息,需要操作任务的meta属性,

import socket

def add(x, y):
    job = get_current_job()
    job.meta['handled_by'] = socket.gethostname()
    job.save()
    return x + y

队列任务的TTL

版本 0.4.7 的新特性

一个任务拥有两个TTL,其中一个是任务所产生的结果生存时间另一个则是任务本身的,这意味着如果你的任务在等待一段特定时间后仍未能执行的话将被抛弃:

# When creating the job:
job = Job.create(func=say_hello, ttl=43)

# or when queueing a new job:
job = q.enqueue(count_words_at_url, 'http://nvie.com', ttl=43)

执行失败的任务

如果一个任务失败并引起了异常操作,worker进程将把任务放入一个失败队列中。任务对象实例的is_failed属性将变为true,为了获取所有失败任务,可以使用get_failed_queue()扫描。

from redis import StrictRedis
from rq import push_connection, get_failed_queue, Queue
from rq.job import Job


con = StrictRedis()
push_connection(con)

def div_by_zero(x):
    return x / 0

job = Job.create(func=div_by_zero, args=(1, 2, 3))
job.origin = 'fake'
job.save()
fq = get_failed_queue()
fq.quarantine(job, Exception('Some fake error'))
assert fq.count == 1

fq.requeue(job.id)

assert fq.count == 0
assert Queue('fake').count == 1

监控

最简单的方式是安装RQ dashboard,一个分离的分布式工具,拥有一个轻量级的web端界面。

image

RQ dashboard

安装:

$ pip install rq-dashboard
$ rq-dashboard --help
Usage: rq-dashboard [OPTIONS]

  Run the RQ Dashboard Flask server.

  All configuration can be set on the command line or through environment
  variables of the form RQ_DASHBOARD_*. For example RQ_DASHBOARD_USERNAME.

  A subset of the configuration (the configuration parameters used by the
  underlying flask blueprint) can also be provided in a Python module
  referenced using --config, or with a .cfg file referenced by the
  RQ_DASHBOARD_SETTINGS environment variable.

Options:
  -b, --bind TEXT               IP or hostname on which to bind HTTP server
  -p, --port INTEGER            Port on which to bind HTTP server
  --url-prefix TEXT             URL prefix e.g. for use behind a reverse proxy
  --username TEXT               HTTP Basic Auth username (not used if not set)
  --password TEXT               HTTP Basic Auth password
  -c, --config TEXT             Configuration file (Python module on search
                                path)
  -H, --redis-host TEXT         IP address or hostname of Redis server
  -P, --redis-port INTEGER      Port of Redis server
  --redis-password TEXT         Password for Redis server
  -D, --redis-database INTEGER  Database of Redis server
  -u, --redis-url TEXT          Redis URL connection (overrides other
                                individual settings)
  --interval INTEGER            Refresh interval in ms
  --help                        Show this message and exit.

rq-dashboard github地址

它可以方便地接入你的 Flask 应用.

在控制台监控

查看已经存在的队列和激活的worker进程:

$ rq info
high       |██████████████████████████ 20
low        |██████████████ 12
default    |█████████ 8
3 queues, 45 jobs total

Bricktop.19233 idle: low
Bricktop.19232 idle: high, default, low
Bricktop.18349 idle: default
3 workers, 3 queues

使用队列名称进行查询

你也可以查询一个队列的子队列,如果你正在查询特定队列可以使用:

$ rq info high default
high       |██████████████████████████ 20
default    |█████████ 8
2 queues, 28 jobs total

Bricktop.19232 idle: high, default
Bricktop.18349 idle: default
2 workers, 2 queues

通过队列组织worker进程

默认rq info打印激活的worker进程,和正在监听的队列:

$ rq info
...

Mickey.26421 idle: high, default
Bricktop.25458 busy: high, default, low
Turkish.25812 busy: high, default
3 workers, 3 queues

如果想查看相同数据但是依据队列进行分组的worker进程可以使用 -R (or --by-queue) 参数:

$ rq info -R
...

high:    Bricktop.25458 (busy), Mickey.26421 (idle), Turkish.25812 (busy)
low:     Bricktop.25458 (busy)
default: Bricktop.25458 (busy), Mickey.26421 (idle), Turkish.25812 (busy)
failed:  –
3 workers, 4 queues

轮询

默认rq info会打印状态然后退出,如果你想指定一个轮询,可以使用 --interval 参数.

$ rq info --interval 1
rq info will now update the screen every second.
You may specify a float value to indicate fractions of seconds.
Be aware that low interval values will increase the load on Redis, of course.

$ rq info --interval 0.5

连接

虽然RQ的特点是使用更为方便的use_connection()命令,但是这种用法已经被弃用,因为它污染了全局命名空间,取而代之的是更简明的Connection(...):管理连接:上下文管理或者在Redis连接中直接传递队列的引用。

简单的单连接

Note:

使用 use_connection 已经过时了. 请不要在你的脚本中使用 use_connection. 而是使用连接管理取代。

在开发者模式下,连接一个默认的本地Redis服务:

from rq import use_connection
use_connection()

在生产环境下,连接一个指定的Redis服务:

from redis import Redis
from rq import use_connection

redis = Redis('my.host.org', 6789, password='secret')
use_connection(redis)

注意使用use_connection 将污染全局命名空间,它也意味着你只能使用单个连接。

多个Redis连接

然而,单连接模式只适用于你连接单个Redis实例的情况(通过调用use_connection()替换已存在的连接),仅仅当你完全可以控制你的web栈的情况下可以使用此模式。

任何其他的情形,或者当你想要使用多连接,你应该使用上下文连接或者传递明文连接。

明文连接(精确但是冗长)

每一个RQ对象实例(队列、worker,任务)都有一个能够传递至构造器的连接关键词参数。使用这个参数,你不需要使用use_connection(),你可以向下面这种方式创建你的队列:

from rq import Queue
from redis import Redis

conn1 = Redis('localhost', 6379)
conn2 = Redis('remote.host.org', 9836)

q1 = Queue('foo', connection=conn1)
q2 = Queue('bar', connection=conn2)

每一个入列的任务或者worker线程将明确自己属于哪一个连接,这种方式十分精确,同时语义化,也因此变得语法冗长。

上下文连接 (简明精确)

如果你打算使用多连接,这种方式无疑更好。每一个RQ实例对象,在创建时,将在RQ连接栈中使用最顶部的Redis连接,此机制临时替代默认的连接:

from rq import Queue, Connection
from redis import Redis

with Connection(Redis('localhost', 6379)):
    q1 = Queue('foo')
    with Connection(Redis('remote.host.org', 9836)):
        q2 = Queue('bar')
    q3 = Queue('qux')

assert q1.connection != q2.connection
assert q2.connection != q3.connection
assert q1.connection == q3.connection

你可以这样认为,在下文连接中,每一个最新创建的RQ对象实例可以明确设置连接参数,q2入列的任务保存于第二个Redis实例,甚至在连接的上下文之外。

Pushing/popping 连接

如果你的代码无法使用with语句,比如,如果你想在一个单元测试中设置,你可以使用push_connection()pop_connection()替代上下文连接。

import unittest
from rq import Queue
from rq import push_connection, pop_connection

class MyTest(unittest.TestCase):
    def setUp(self):
        push_connection(Redis())

    def tearDown(self):
        pop_connection()

    def test_foo(self):
        """Any queues created here use local Redis."""
        q = Queue()
        ...

异常

任务在发生异常时失败,当你的RQ Worker进程在后台运行,你如何能够感知这些异常呢?

默认: 失败队列

默认方式使用失败队列查看。每一个失败的任务都将存储在此队列中,并伴随着异常信息(类型,值,异常回溯)同时也确保了没有失败的任务丢失,this is of no use to get notified pro-actively about job failure.

自定义异常解决

在版本 0.3.1 之后, RQ 支持注册自定义的异常解决.它可以替换掉默认行为(发送任务至失败队列),或者增加额外的异常处理步骤

如下定义自定义异常类:

with Connection():
    q = Queue()
    w = Worker([q])
    worker.push_exc_handler(my_handler)
    worker.work()

同时异常处理是一个FILO (先进后出)栈,大多数情况你只需要注册一个异常捕获类,因此,为了方便你可以在构造器中直接传递。

with Connection():
    w = Worker([q], exception_handlers=[my_handler, self.move_to_failed_queue])
    ...

异常处理方法传递的三个参数: job, exc_type, exc_value and traceback:

def my_handler(job, exc_type, exc_value, traceback):
    # do custom things here
    # for example, write the exception info to a DB
    ...

你也可以使用集合参数:

def my_handler(job, *exc_info):
    # do custom things here
    ...
锁定异常解决

异常处理通过返回一个Boolean值进行判断,是该处理本身进行异常处理还是传递至下一个处理中,False表示停止异常处理,True表示继续或者传递至下一个异常处理。

对系统而言这个十分重要,当异常处理没有一个明确的 返回值(因此为None),则表示返回值为 True (i.e. continue with the next handler).

替换默认行为(i.e. 将失败的任务加入失败队列),使用自定义的异常处理:

def black_hole(job, *exc_info):
    return False

测试

进程内置单元测试

许多框架(比如:Django)使用内存数据库不能很好地使用RQ默认的fork()。 因此,你必须使用SimpleWorker 取代 fork()

from redis import Redis
from rq import SimpleWorker, Queue

queue = Queue(connection=Redis())
queue.enqueue(my_long_running_job)
worker = SimpleWorker([queue], connection=queue.connection)
worker.work(burst=True)  # Runs enqueued job
# Check for result...

更多

使用supervisor进行管理RQ

Supervisor 是一个在生产环境下使用、流行开源的"长期运行进程"管理工具。可以自动重启任何崩溃的进程,你还可以为所有运行的进程增加一个单独的管理面板。

示例配置:

[program:myworker]
; Point the command to the specific rq command you want to run.
; If you use virtualenv, be sure to point it to
; /path/to/virtualenv/bin/rq
; Also, you probably want to include a settings module to configure this
; worker.  For more info on that, see http://python-rq.org/docs/workers/
command=/path/to/rq worker -c mysettings high normal low
process_name=%(program_name)s

; If you want to run more than one worker instance, increase this
numprocs=1

; This is the directory from which RQ is ran. Be sure to point this to the
; directory where your source code is importable from
directory=/path/to

; RQ requires the TERM signal to perform a warm shutdown. If RQ does not die
; within 10 seconds, supervisor will forcefully kill it
stopsignal=TERM

; These are up to you
autostart=true
autorestart=true

 

特别提示:本站资源全部免费下载,因服务器需经费维护,文中部分外链点击后会进入广告,请耐心等待5秒即可跳过广告进入目标页面。如遇页面外链打不开或下载地址失效,您可以在评论中指出错误,或扫描页面底部二维码。
本文地址:http://www.tiangr.com/python-xiao-xi-dui-lie-python-rq-zhong-wen-jiao-cheng.html
版权声明:本文为原创文章,版权归 tiangr 所有,欢迎分享本文,转载请保留出处!

发表评论


表情