博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Celery的基本使用
阅读量:4627 次
发布时间:2019-06-09

本文共 3739 字,大约阅读时间需要 12 分钟。

Celery

1、什么是Celery

  • Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。
  • 用Python写的执行 定时任务和异步任务的框架

执行异步任务:

  • 创建任务:tasks.py
  • 把任务添加到队列中:add_task.py
  • 开启work,执行任务
    • 用命令:celery -A tasks worker -l info
    • 在 Windows下:celery -A tasks worker -l info -P eventlet
  • 查看任务结果:task_resut.py

多任务结构:

  • 重点:执行work的时候:celery -A tasks worker -l info -P eventlet

2、Celery架构

1445850-20190214205259340-414073700.png

Celery的架构由三部分组成,消息中间件,任务执行单元和任务执行结果存储(task result store )组成。

消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

3、使用场景

异步任务:将耗时操作任务提交给Celery取异步执行 ,比如发送 短信、邮件、消息推送、音频/视频处理等等

定时任务:定时执行某件事情,比如每天数据统计

4、Celery的安装配置

pip install celery

app=Celery('任务名', backend='xxx',broker='xxx')

5、Celery执行异步任务

基本使用

创建项目:celerytest

创建py文件:task.py

`from celery import Celeryimport time# broker:消息中间人用redisbroker = 'redis://127.0.0.1:6397/1'# 结果存储在redis中backend = 'redis://127.0.0.1:6379/2'# 第一个参数是别名,可以随便写app = Celery('test', broker=broker, backend=backend)@app.taskdef add(x, y):    time.sleep(3)    return x + y

创建py文件:add_task.py,添加任务

import taskif __name__ == '__main__':    # 之前这样写,直接就执行 函数    task.add()    # 现在把函数添加到执行队列中,参数写在delay中    # result不是函数的执行结果,他是个对象    result = task.add.delay(2, 3)    # 这个任务唯一的id    print(result.id)

创建py文件:result.py,查看任务执行结果

from celery.result import AsyncResultfrom task import appasync = AsyncResult(id='ac2a7e52-ef66-4caa-bffd-81414d869f85', app=app)if async.successful():    # 任务执行的结果,也就是返回值    result = async.get()    print(result)    # result.forget() #将结果删除elif async.failed():    print('执行失败')elif async.status == 'PENDING':    print('任务等待中被执行')elif async.status == 'RETRY':    print('任务异常后正在重试')elif async.status == 'STARTED':    print('任务已经开始被执行')

执行add_task.py,添加任务,并获取任务ID

执行命令:celery worker -A celery_app_task -l info -P eventlet

执行result.py检查任务状态并获取结果

6、Celery执行定时任务

设定时间让celery执行一个任务

add_task.py

from celery_app_task import addfrom datetime import datetime# 方式一# v1 = datetime(2019, 2, 13, 18, 19, 56)# print(v1)# v2 = datetime.utcfromtimestamp(v1.timestamp())# print(v2)# result = add.apply_async(args=[1, 3], eta=v2)# print(result.id)# 方式二ctime = datetime.now()# 默认用utc时间utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())from datetime import timedeltatime_delay = timedelta(seconds=10)task_time = utc_ctime + time_delay# 使用apply_async并设定时间result = add.apply_async(args=[4, 3], eta=task_time)print(result.id)

启动一个beat: celery beat -A celery_task -l info

启动work执行:celery worker -A celery_task -l info -P eventlet

7、Django中使用Celery

在项目目录下创建celeryconfig.py

import djcelerydjcelery.setup_loader()CELERY_IMPORTS=(    'app01.tasks',)#有些情况可以防止死锁CELERYD_FORCE_EXECV=True# 设置并发worker数量CELERYD_CONCURRENCY=4#允许重试CELERY_ACKS_LATE=True# 每个worker最多执行100个任务被销毁,可以防止内存泄漏CELERYD_MAX_TASKS_PER_CHILD=100# 超时时间CELERYD_TASK_TIME_LIMIT=12*30

在app01目录下创建tasks.py

from celery import task@taskdef add(a,b):    with open('a.text', 'a',encoding='utf-8') as f:        f.write('a')    print(a+b)

视图函数views.py

from django.shortcuts import render,HttpResponsefrom app01.tasks import addfrom datetime import datetimedef test(request):    # result=add.delay(2,3)    ctime = datetime.now()    # 默认用utc时间    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())    from datetime import timedelta    time_delay = timedelta(seconds=5)    task_time = utc_ctime + time_delay    result = add.apply_async(args=[4, 3], eta=task_time)    print(result.id)    return HttpResponse('ok')

settings.py

INSTALLED_APPS = [    ...    'djcelery',    'app01']...from django_celery import celeryconfigBROKER_BACKEND='redis'BOOKER_URL='redis://127.0.0.1:6379/1'CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'

转载于:https://www.cnblogs.com/xuecaichang/p/10380646.html

你可能感兴趣的文章
虚拟目录
查看>>
面向对象的3大特性
查看>>
Express4.x API (四):Router (译)
查看>>
device.cpp
查看>>
django学习笔记--数据库中的多表操作
查看>>
LESS 的 operation 是 特性
查看>>
[VBScript] 自动删除2小时以前生成的文件
查看>>
通过BeanShell获取UUID并将参数传递给Jmeter
查看>>
[03] 处理注解:反射
查看>>
示例-添加删除附件
查看>>
textarea输入框限制字数(JS)
查看>>
2.1 mac下多版本jdk的安装和管理
查看>>
调手表
查看>>
Wannafly挑战赛14
查看>>
开发微信小程序入门前
查看>>
Word英文字符间距太大 中英文输入切换都不行
查看>>
java后端判断用户是否关注公众号
查看>>
[LeetCode] Count Different Palindromic Subsequences 计数不同的回文子序列的个数
查看>>
Javascript使用三大家族和事件来DIY动画效果相关笔记(一)
查看>>
投影纹理映射(Projective Texture Mapping)
查看>>