当前位置 : 主页 > 编程语言 > python >

Celery 分布式任务队列快速入门

来源:互联网 收集:自由互联 发布时间:2022-06-15
本节内容 Celery介绍和基本使用 在项目中如何使用celery 启用多个workers Celery 定时任务 与django结合 通过django配置celery periodic task 一、Celery介绍和基本使用 Celery 是一个 基于python开发的分

本节内容

Celery介绍和基本使用

在项目中如何使用celery

启用多个workers

Celery 定时任务

与django结合

通过django配置celery periodic task

一、Celery介绍和基本使用

Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子:

  • 你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情。
  • 你想做一个定时任务,比如每天检测一下你们所有客户的资料,如果发现今天 是客户的生日,就给他发个短信祝福

  • Celery 在执行任务时需要通过一个消息中间件来接收和发送任务消息,以及存储任务结果, 一般使用rabbitMQ or Redis,后面会讲

    1.1 Celery有以下优点:

  • 简单:一单熟悉了celery的工作流程后,配置和使用还是比较简单的
  • 高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务
  • 快速:一个单进程的celery每分钟可处理上百万个任务
  • 灵活: 几乎celery的各个组件都可以被扩展及自定制
  • Celery基本工作流程图

    Celery 分布式任务队列快速入门_定时任务

    1.2 Celery安装使用

    Celery的默认broker是RabbitMQ, 仅需配置一行就可以

    broker_url = 'amqp://guest:guest@localhost:5672//'

      rabbitMQ 没装的话请装一下,安装看这里  http://docs.celeryproject.org/en/latest/getting-started/brokers/rabbitmq.html#id3

    使用Redis做broker也可以

    安装redis组件

    pip install -U "celery[redis]"


    配置

    Configuration is easy, just configure the location of your Redis database:

    app.conf.broker_url = 'redis://localhost:6379/0'

    Where the URL is in the format of:

    redis://:password@hostname:port/db_number

    all fields after the scheme are optional, and will default to ​​localhost​​ on port 6379, using database 0.


    如果想获取每个任务的执行结果,还需要配置一下把任务结果存在哪

    If you also want to store the state and return values of tasks in Redis, you should configure these settings:

    app.conf.result_backend = 'redis://localhost:6379/0'


    1. 3 开始使用Celery啦  

    安装celery模块

    $ pip install celery


    创建一个celery application 用来定义你的任务列表

    创建一个任务文件就叫tasks.py吧

    from celery import Celery

    app = Celery('tasks',
    broker='redis://localhost',
    backend='redis://localhost')

    @app.task
    def add(x,y):
    print("running...",x,y)
    return x+y

      启动Celery Worker来开始监听并执行任务

    $ celery -A tasks worker --loglevel=info


    调用任务

    再打开一个终端, 进行命令行模式,调用任务

    >>> from tasks import add
    >>> add.delay(4, 4)

      看你的worker终端会显示收到 一个任务,此时你想看任务结果的话,需要在调用 任务时 赋值个变量

    >>> result = add.delay(4, 4)


    The ​​​ready()​​​ method returns whether the task has finished processing or not:

    >>> result.ready()
    False

    You can wait for the result to complete, but this is rarely used since it turns the asynchronous call into a synchronous one:

    >>> result.get(timeout=1)
    8

    In case the task raised an exception, ​​​get()​​​ will re-raise the exception, but you can override this by specifying the ​​propagate​​ argument:

    >>> result.get(propagate=False)

    If the task raised an exception you can also gain access to the original traceback:

    >>> result.traceback



    二、在项目中如何使用celery

    可以把celery配置成一个应用

    目录格式如下

    proj/__init__.py
    /celery.py
    /tasks.py

     ​​proj/celery.py内容​​

    from __future__ import absolute_import, unicode_literals
    from celery import Celery

    app = Celery('proj',
    broker='amqp://',
    backend='amqp://',
    include=['proj.tasks'])

    # Optional configuration, see the application user guide.
    app.conf.update(
    result_expires=3600,
    )

    if __name__ == '__main__':
    app.start()

      ​​proj/tasks.py中的内容​​

    from __future__ import absolute_import, unicode_literals
    from .celery import app


    @app.task
    def add(x, y):
    return x + y


    @app.task
    def mul(x, y):
    return x * y


    @app.task
    def xsum(numbers):
    return sum(numbers)

    启动worker 

      $ celery -A proj worker -l info

    输出

    -------------- celery@Alexs-MacBook-Pro.local v4.0.2 (latentcall)
    ---- **** -----
    --- * *** * -- Darwin-15.6.0-x86_64-i386-64bit 2017-01-26 21:50:24
    -- * - **** ---
    - ** ---------- [config]
    - ** ---------- .> app: proj:0x103a020f0
    - ** ---------- .> transport: redis://localhost:6379//
    - ** ---------- .> results: redis://localhost/
    - *** --- * --- .> concurrency: 8 (prefork)
    -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
    --- ***** -----
    -------------- [queues]
    .> celery exchange=celery(direct) key=celery


    后台启动worker

    In the background

    In production you’ll want to run the worker in the background, this is described in detail in the ​​daemonization tutorial​​.

    The daemonization scripts uses the celery multi command to start one or more workers in the background:

    $ celery multi start w1 -A proj -l info
    celery multi v4.0.0 (latentcall)
    > Starting nodes...
    > w1.halcyon.local: OK

    You can restart it too:

    $ celery multi restart w1 -A proj -l info
    celery multi v4.0.0 (latentcall)
    > Stopping nodes...
    > w1.halcyon.local: TERM -> 64024
    > Waiting for 1 node.....
    > w1.halcyon.local: OK
    > Restarting node w1.halcyon.local: OK
    celery multi v4.0.0 (latentcall)
    > Stopping nodes...
    > w1.halcyon.local: TERM -> 64052

    or stop it:

    $ celery multi stop w1 -A proj -l info

    The ​​stop​​ command is asynchronous so it won’t wait for the worker to shutdown. You’ll probably want to use the ​​stopwait​​ command instead, this ensures all currently executing tasks is completed before exiting:

    $ celery multi stopwait w1 -A proj -l info



    三、Celery 定时任务

    celery支持定时任务,设定好任务的执行时间,celery就会定时自动帮你执行, 这个定时任务模块叫celery beat


    写一个脚本 叫periodic_task.py

    from celery import Celery
    from celery.schedules import crontab

    app = Celery()

    @app.on_after_configure.connect
    def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

    # Calls test('world') every 30 seconds
    sender.add_periodic_task(30.0, test.s('world'), expires=10)

    # Executes every Monday morning at 7:30 a.m.
    sender.add_periodic_task(
    crontab(hour=7, minute=30, day_of_week=1),
    test.s('Happy Mondays!'),
    )

    @app.task
    def test(arg):
    print(arg)

      add_periodic_task 会添加一条定时任务

    上面是通过调用函数添加定时任务,也可以像写配置文件 一样的形式添加, 下面是每30s执行的任务app.conf.beat_schedule = {
    'add-every-30-seconds': {
    'task': 'tasks.add',
    'schedule': 30.0,
    'args': (16, 16)
    },
    }
    app.conf.timezone = 'UTC'


    任务添加好了,需要让celery单独启动一个进程来定时发起这些任务, 注意, 这里是发起任务,不是执行,这个进程只会不断的去检查你的任务计划, 每发现有任务需要执行了,就发起一个任务调用消息,交给celery worker去执行

    启动任务调度器 celery beat

    $ celery -A periodic_task beat

    输出like below

    celery beat v4.0.2 (latentcall) is starting.
    __ - ... __ - _
    LocalTime -> 2017-02-08 18:39:31
    Configuration ->
    . broker -> redis://localhost:6379//
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%WARNING
    . maxinterval -> 5.00 minutes (300s)


    此时还差一步,就是还需要启动一个worker,负责执行celery beat发起的任务

    启动celery worker来执行任务

    $ celery -A periodic_task worker

    -------------- celery@Alexs-MacBook-Pro.local v4.0.2 (latentcall)
    ---- **** -----
    --- * *** * -- Darwin-15.6.0-x86_64-i386-64bit 2017-02-08 18:42:08
    -- * - **** ---
    - ** ---------- [config]
    - ** ---------- .> app: tasks:0x104d420b8
    - ** ---------- .> transport: redis://localhost:6379//
    - ** ---------- .> results: redis://localhost/
    - *** --- * --- .> concurrency: 8 (prefork)
    -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
    --- ***** -----
    -------------- [queues]
    .> celery exchange=celery(direct) key=celery


    好啦,此时观察worker的输出,是不是每隔一小会,就会执行一次定时任务呢!

    注意:Beat needs to store the last run times of the tasks in a local database file (named celerybeat-schedule by default), so it needs access to write in the current directory, or alternatively you can specify a custom location for this file:

    $ celery -A periodic_task beat -s /home/celery/var/run/celerybeat-schedule


    更复杂的定时配置  

    上面的定时任务比较简单,只是每多少s执行一个任务,但如果你想要每周一三五的早上8点给你发邮件怎么办呢?哈,其实也简单,用crontab功能,跟linux自带的crontab功能是一样的,可以个性化定制任务执行时间

    from celery.schedules import crontab

    app.conf.beat_schedule = {
    # Executes every Monday morning at 7:30 a.m.
    'add-every-monday-morning': {
    'task': 'tasks.add',
    'schedule': crontab(hour=7, minute=30, day_of_week=1),
    'args': (16, 16),
    },
    }


    上面的这条意思是每周1的早上7.30执行tasks.add任务

    还有更多定时配置方式如下:

    Example

    Meaning

    ​​crontab()​​

    Execute every minute.

    ​​crontab(minute=0, hour=0)​​

    Execute daily at midnight.

    ​​crontab(minute=0, hour='*/3')​​

    Execute every three hours: midnight, 3am, 6am, 9am, noon, 3pm, 6pm, 9pm.

    ​​crontab(minute=0,​​​​hour='0,3,6,9,12,15,18,21')​​

    Same as previous.

    ​​crontab(minute='*/15')​​

    Execute every 15 minutes.

    ​​crontab(day_of_week='sunday')​​

    Execute every minute (!) at Sundays.

    ​​crontab(minute='*',​​​​hour='*',​​​​day_of_week='sun')​​

    Same as previous.

    ​​crontab(minute='*/10',​​​​hour='3,17,22',​​​​day_of_week='thu,fri')​​

    Execute every ten minutes, but only between 3-4 am, 5-6 pm, and 10-11 pm on Thursdays or Fridays.

    ​​crontab(minute=0,hour='*/2,*/3')​​

    Execute every even hour, and every hour divisible by three. This means: at every hour except: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm

    ​​crontab(minute=0, hour='*/5')​​

    Execute hour divisible by 5. This means that it is triggered at 3pm, not 5pm (since 3pm equals the 24-hour clock value of “15”, which is divisible by 5).

    ​​crontab(minute=0, hour='*/3,8-17')​​

    Execute every hour divisible by 3, and every hour during office hours (8am-5pm).

    ​​crontab(0, 0,day_of_month='2')​​

    Execute on the second day of every month.

    ​​crontab(0, 0,​​​​day_of_month='2-30/3')​​

    Execute on every even numbered day.

    ​​crontab(0, 0,​​​​day_of_month='1-7,15-21')​​

    Execute on the first and third weeks of the month.

    ​​crontab(0, 0,day_of_month='11',​​​​month_of_year='5')​​

    Execute on the eleventh of May every year.

    ​​crontab(0, 0,​​​​month_of_year='*/3')​​

    Execute on the first month of every quarter.


    上面能满足你绝大多数定时任务需求了,甚至还能根据潮起潮落来配置定时任务, 具体看 http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#solar-schedules 


    四、最佳实践之与django结合

    django 可以轻松跟celery结合实现异步任务,只需简单配置即可

    If you have a modern Django project layout like:

    - proj/
    - proj/__init__.py
    - proj/settings.py
    - proj/urls.py
    - manage.py

    then the recommended way is to create a new proj/proj/celery.py module that defines the Celery instance:

    file: proj/proj/celery.py  

    from __future__ import absolute_import, unicode_literals
    import os
    from celery import Celery

    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

    app = Celery('proj')

    # Using a string here means the worker don't have to serialize
    # the configuration object to child processes.
    # - namespace='CELERY' means all celery-related configuration keys
    # should have a `CELERY_` prefix.
    app.config_from_object('django.conf:settings', namespace='CELERY')

    # Load task modules from all registered Django app configs.
    app.autodiscover_tasks()


    @app.task(bind=True)
    def debug_task(self):
    print('Request: {0!r}'.format(self.request))


    Then you need to import this app in your ​​proj/proj/__init__.py​​ module. This ensures that the app is loaded when Django starts so that the ​​@shared_task​​ decorator (mentioned later) will use it:  ​​​proj/proj/__init__.py​​:​​

    from __future__ import absolute_import, unicode_literals

    # This will make sure the app is always imported when
    # Django starts so that shared_task will use this app.
    from .celery import app as celery_app

    __all__ = ['celery_app']


    Note that this example project layout is suitable for larger projects, for simple projects you may use a single contained module that defines both the app and tasks, like in the ​​First Steps with Celery​​ tutorial.  

    ​​Let’s break down what happens in the first module, first we import absolute imports from the future, so that our ​​celery.py​​ module won’t clash with the library:​​

    from __future__ import absolute_import

      Then we set the default ​​​DJANGO_SETTINGS_MODULE​​​ environment variable for the celery command-line program:

    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

      You don’t need this line, but it saves you from always passing in the settings module to the ​​celery​​ program. It must always come before creating the app instances, as is what we do next:

    app = Celery('proj')

      This is our instance of the library.

    We also add the Django settings module as a configuration source for Celery. This means that you don’t have to use multiple configuration files, and instead configure Celery directly from the Django settings; but you can also separate them if wanted.

    The uppercase name-space means that all Celery configuration options must be specified in uppercase instead of lowercase, and start with ​​CELERY_​​, so for example the ​​task_always_eager`​​ setting becomes ​​CELERY_TASK_ALWAYS_EAGER​​, and the ​​​broker_url​​​ setting becomes ​​CELERY_BROKER_URL​​.

    You can pass the object directly here, but using a string is better since then the worker doesn’t have to serialize the object.

    app.config_from_object('django.conf:settings', namespace='CELERY')

      Next, a common practice for reusable apps is to define all tasks in a separate ​​tasks.py​​module, and Celery does have a way to  auto-discover these modules:

    app.autodiscover_tasks()

      With the line above Celery will automatically discover tasks from all of your installed apps, following the ​​tasks.py​​ convention:

    - app1/
    - tasks.py
    - models.py
    - app2/
    - tasks.py
    - models.py


    Finally, the ​​debug_task​​ example is a task that dumps its own request information. This is using the new ​​bind=True​​ task option introduced in Celery 3.1 to easily refer to the current task instance.

    然后在具体的app里的tasks.py里写你的任务

    # Create your tasks here
    from __future__ import absolute_import, unicode_literals
    from celery import shared_task


    @shared_task
    def add(x, y):
    return x + y


    @shared_task
    def mul(x, y):
    return x * y


    @shared_task
    def xsum(numbers):
    return sum(numbers)


    在你的django views里调用celery taskfrom django.shortcuts import render,HttpResponse

    # Create your views here.

    from bernard import tasks

    def task_test(request):

    res = tasks.add.delay(228,24)
    print("start running task")
    print("async task res",res.get() )

    return HttpResponse('res %s'%res.get())

      五、在django中使用计划任务功能

    There’s  the ​​django-celery-beat​​ extension that stores the schedule in the Django database, and presents a convenient admin interface to manage periodic tasks at runtime.

    To install and use this extension:

  • Usepipto install the package:
  • $ pip install django-celery-beat
  • Add the
  • ​​django_celery_beat​​

  • module to
  • ​​INSTALLED_APPS​​

  • in your Django project’
  • ​​settings.py​​

  • :
  • INSTALLED_APPS = (
    ...,
    'django_celery_beat',
    )

    Note that there is no dash in the module name, only underscores.
  • Apply Django database migrations so that the necessary tables are created:
  • $ python manage.py migrate
  • Start thecelery beatservice using thedjango
    scheduler:
  • $ celery -A proj beat -l info -S django
  • Visit the Django-Admin interface to set up some periodic tasks.

  • 在admin页面里,有3张表

    Celery 分布式任务队列快速入门_redis_02

    配置完长这样

    Celery 分布式任务队列快速入门_定时任务_03


    此时启动你的celery beat 和worker,会发现每隔2分钟,beat会发起一个任务消息让worker执行scp_task任务

    注意,经测试,每添加或修改一个任务,celery beat都需要重启一次,要不然新的配置不会被celery beat进程读到



    上一篇:字段访问API-get_field
    下一篇:没有了
    网友评论