目录
celery实现异步任务: 1
celery定时任务: 2
celery组件: 5
实例化celery: 5
发送1个celery任务: 5
任务组: 6
任务链: 6
重写celery基类; 6
celery命令: 7
celery监控: 7
celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度;
celery的架构由三部分组成:
消息中间件message broker,celery本身不提供消息服务,但可方便的和第三方提供的消息中间件集成包括RabbitMQ、redis等;
任务执行单元worker,worker是celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中;
任务执行结果存储task result store组成,task result store用来存储worker执行的任务的结果,celery支持以不同方式存储任务的结果,包括AMQP、redis等;
另celery还支持不同的并发和序列化的手段:
并发,prefork、eventlet、gevent、threads/singleThreaded;
序列化,pickle、json、yaml、mspack、zlib、bzip2Compression、cryptographic message signing等;
使用场景:
celery是一个强大的分布式任务队列的异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行,我们通常使用它来实现异步任务async task和定时任务crontab;
异步任务:将耗时操作任务提交给celery去异步执行,比如发送短信|邮件、消息推送、音视频处理等;
定时任务:定时执行某件事情,如每天数据统计;
celery具有以下优点:
simple简单:celery使用和维护都非常简单,并且不需要配置文件;
highly available高可用:worker和client会在网络连接丢失或失败时,自动进行重试,并且有的brokers也支持双主或主从实现高可用;
fast快速:单个的celery进程每分钟可处理百万级的任务,并且只需要ms级的往返延迟(使用rabbitmq|librabbitmq|优化设置时);
flexible灵活:celery几乎每个部分都可以扩展使用,自定义池实现、序列化、压缩方案、日志记录、调度器、消息者、生产者、broker传输等;
安装:
pip install -U Celery
取执行结果:
from celery.result import AsyncResult
from celery_task import cel
async_result = AsyncResult(id='477c9d77-c62b-4fe5-9035-9087f7ad018a', app=cel)
if async_result.successful():
result = async_result.get()
执行成功')
将结果删除
无论现在是什么时候,都要终止
如果任务还没有开始执行,就可以终止
elif async_result.failed():
执行失败')
elif async_result.status == 'PENDING':
任务等待中被执行')
elif async_result.status == 'RETRY':
任务出错正在重试')
elif async_result.status == 'STARTED':
任务已经开始执行')
celery_tasks/{__init__.py, celery.py, task01.py, task02.py}
celery_tasks/celery.py
from celery import Celery
cel = Celery('celery_demo', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=['celery_tasks.task01', 'celery_tasks.task02'])
cel.conf.timezone = 'Asia/Shanghai'
cel.conf.enable_utc = False
check_result.py
from celery.result import AsyncResult
from celery_task import cel
res = AsyncResult(id='...', app=cel)
if res.successful():
result = res.get()
print(result)
elif res.failed():
print('failed')
elif res.status == 'PENDING':
任务等待中被执行')
elif res.status == 'RETRY':
任务异常后正在重试')
elif res.status == 'STARTED':
任务已经开始被执行')
ctime = datetime.now()
utc_ctime = datetime.utcfromtimestramp(ctime.timestamp())
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay
result = send_email.apply_async(args=['egon'], eta=task_time) # 有eta就是定时任务
celery实现异步任务:
http://docs.celeryproject.org/en/latest/
celery通过消息进行通信,用专用的工作线程不断监视任务队列以执行新工作;
celery需要消息传输来发送和接收消息,mq和redis代理传输功能齐全,但也支持其它解决方案,这里用redis;
pip install celery eventlet redis #eventlet仅win下需要
tasks.py #用于配置任务,main.py用来执行
from celery import Celery
import time
celery = Celery("tasks",
broker="redis://:ane56pda@10.10.101.47:6378/0",
backend="redis://:ane56pda@10.10.101.47:6378/0")
@celery.task #加上此装饰器,这个函数就变成celery任务了(task)
def send_mail():
邮件开始发送....')
time.sleep(10)
邮件发送结束!')
main.py #执行后,会发现立马就结束,不会被阻塞等待10s
from tasks import send_mail
if __name__ == '__main__':
res = send_mail.delay() #这样调用,就变成异步任务了,不会被阻塞,是.apply_async()是快捷方式,.apply_async((2, 2), queue='lopri', countdown=10)可指定运行参数|运行的时间|使用的任务队列,返回结果为AsyncResult实例,可用于跟踪任务状况;res.ready()检测是否已处理完毕;res.get(timeout=1)将异步调用转为同步调用,res.get(propagate=False)如果任务出现异常,get()会再次引发异常使用此参数覆盖,res.traceback进行回溯;res.id获取任务ID;res.failed(),res.successful(),检查任务执行成功或失败;res.state,PENDING-->STARED-->SUCCESS,另RETRY;
>celery -A tasks.celery --pool=eventlet worker --loglevel=info上不指定--pool
注:
# broker(中间人):存储任务的队列
# worker:真正执行任务的工作者,单独手动运行worker,celery -A tasks.celery worker --loglevel=info
# backend:用来存储任务执行后的结果
# redis://:password@hostname:port/db_number
注:
Run a worker
If you jumped in and already executed the above code you will be disappointed to learn that .wait() will never actually return. That’s because you also need to run a Celery worker to receive and execute the task.
$ celery -A your_application.celery worker
The your_application string has to point to your application’s package or module that creates the celery object.
Now that the worker is running, wait will return the result once the task is finished.
celery定时任务:
生产配置:
redis
celery
django-celery-results
django-celery-beat
eventlet
INSTALLED_APPS = [
...
'django_celery_results',
'django_celery_beat'
]
>python manage.py makemigrations django_celery_beat
>python manage.py migrate django_celery_beat
>python manage.py makemigrations django_celery_results
>python manage.py migrate django_celery_results
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'
# CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_RESULT_SERIALIZER = 'json'
# CELERY_TIMEZONE = TIME_ZONE
celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery, platforms
from celery.schedules import crontab
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'yto_monitor.settings_prd')
# app = Celery('yto_monitor', broker='redis://127.0.0.1:6379/', backend='redis://127.0.0.1:6379/')
app = Celery('yto_monitor')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings_prd', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
# platforms.C_FORCE_ROOT = True
app.conf.beat_schedule = {
'add-task': {
'task': 'midmonitor.tasks.add',
另crontab(minute=30, hour=0),crontab(hour=6, minute=0, day_of_month='1'),datetime.timedelta(seconds=20),
'args': (5, 6)
}
}
app.conf.timezone = 'Asia/Shanghai'
midmonitor/tasks.py
# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task
@shared_task
def inspect_redis():
pass
>celery -A yto_monitor --pool=eventlet worker -l info #处理任务
>celery -A yto_monitor beat -l info #发送任务
> /usr/local/python368/bin/celery --workdir=/data/app/yto_monitor -A yto_monitor worker --loglevel=info
> /usr/local/python368/bin/celery --workdir=/data/app/yto_monitor -A yto_monitor beat --loglevel=info
/usr/local/python368/bin/uwsgi --ini /data/app/yto_monitor/yto_monitor/uwsgi_test.ini
celery组件:
worker (任务执行者),用来执行具体任务,可在多台服务器部署实现扩展,项目中我们使用 python 进行开发
broker (中间人),用来实现任务调度、worker 管理等功能;支持 RabbitMQ、Redis、Zookeeper 等中间件,项目中我们使用 redis
backend 用来存储任务结果,项目中我们使用 redis
application (应用),用来实例化 celery
tasks (任务),用来构建 application
实例化celery:
# 最简化构建一个 celery 应用,指定了 broker 和 backend
from celery import Celery
# 定义 broker 和 backend,分别为任务中间人和结果保存路径
BROKER = "redis://:@127.0.0.1:6379/3"
BACKEND = "redis://:@127.0.0.1:6379/4"
app = Celery("tasks",broker=BROKER,backend=BACKEND,)
# 定义一个任务,名字为 add
@app.task
def add(x, y):
c = x + y
print('计算结果为: %d ' % c)
return c
@app.task(bind=True,max_retries=3) # 最大重试 3 次
def test_retry(self):
print('执行 Celery 重试')
raise self.retry(countdown=1) # 1 秒后执行重试
@app.task(bind=True)
def test_fail(self):
print('执行 Celery 失败')
raise RuntimeError('测试 celery 失败')
发送1个celery任务:
test_sender.py
# 脚本用来发送 celery 任务
from test_celery import *
# 最简洁推送一个任务,不支持任何选项
add.delay(3,6)
# 推送一个任务,第一个参数,
# 如果任务只需要一个参数,必须添加逗号进行转换,格式 (var1,)
# countdown=10,10 秒后开始执行
add.apply_async((2,5), countdown=10)
# 参数的其他写法,
add.apply_async(kwargs={'x':4, 'y':8})
add.s(5,6).apply_async()
# 任务失败重试
test_retry.apply_async()
任务组:
test_sender.py中追加
# 任务组
from celery import group
numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]
res = group(add.subtask(i) for i in numbers).apply_async()
print(res.get())
任务链:
test_sender.py中追加
# 使用 link,将任务结果作为第一个参数传递到下一个任务
add.apply_async((2, 3), link=add.s(16))
# 同样,前一个任务结果作为下一个任务的第一个参数
from celery import chain
res = chain(add.s(2, 2), add.s(4), add.s(8))()
print(res.get())
# 使用管道符
(add.s(2, 2) | add.s(4) | add.s(8))().get()
重写celery基类;
celery event,监控celery相关事件,可定制worker进程报警|任务失败报警等功能;
定时任务;
celery命令:
celery -A test_celery worker -l info #worker会一直占用终端,可用-D放至后台;worker进程中包含多个子进程,默认为cpu核数,可用-c指定启动子进程个数;-Q queue_name1,queue_name2指定队列名称,多个用逗号分隔,推送任务到指定队列add.apply_async((10,20),queue='queue_name1')
celery -A test_celery report #查看celery相关信息
celery -A test_celery inspect active_queues #查看活动队列
celery -A test_celery inspect stats #检查状态
celery -A test_celery inspect report #检查报告
celery -A tset_celery purge #清除队列中的任务
celery -A test_celery inspect ping #发送ping
celery -A test_celery control shutdown #关闭worker进程
celery -A test_celery worker --autoscale=10,2 -n au #动态加载celery pool个数
celery -A test_celery status #查看worker集群中存活的节点
]# vim celery.py
app.conf.task_routes = {'cmdb.api.v_center.get_cluster_data': {'queue': 'vcenter'}}
/usr/local/python368/bin/celery --workdir=/data/app/yto_monitor -A yto_monitor worker -Q vcenter,celery -l info # -Q指定队列名,celery为默认的
celery监控:
pip install flower #安装celery监控插件
celery multi start 3 -A test_celery -l info -c 4 --pidfile=tmp/celery_%n.pid -f logs/celery.log #启动3个worker,每个worker启动4个子进程
celery flower -A test_celery --port=8080
/usr/local/python368/bin/celery flower --workdir=/data/app/yto_monitor -A yto_monitor --port=8080