当前位置 主页 > 网站技术 > 代码类 >

    通过celery异步处理一个查询任务的完整代码

    栏目:代码类 时间:2019-11-19 18:04

    今天介绍通过celery实现一个异步任务。有这样一个需求,前端发起一个查询的请求,但是发起查询后,查询可能不会立即返回结果。这时候,发起查询后,后端可以把这次查询当作一个task,并立即返回一个能唯一表明该task的值,如taskID(用户后面可以通过这个taskID 随时查看结果),用户收到这个taskID后,可以转去处理其他任务,而不必一直等待查询结果。后端API调用celery来处理这个task,并将结果值保存在一个csv文件中,后面用户通过taskID 查询时返回结果。

    def application(environ,start_response):
      """部分代码省略"""
      query_string = environ['QUERY_STRING']
      serviceGroupName = ""
      for getParam in query_string.split("&"):
        params = getParam.split("=")
        resultInfo = ""
        if params[0] == "type":
          alertType = params[1]
        elif params[0] == "projectName":
          projectName = params[1]
        elif params[0] == "serviceGroupName":
          serviceGroupName = params[1]
        else:
          resultInfo = error_info(-1, "GET参数只能为type=<?>&projectName=<?>&serviceGroupName=<?>;必须指定三个参数", {})
        return [resultInfo]  
      taskId = 1
      result_file_name = '/var/www/dba_api/api/test/'+ str(taskId) + '.csv'
      contentInfo = json.dumps({"taskId":1,"opType":"continue","serviceGroupName":serviceGroupName,"dbHost":dbHost,"dbPasswd":dbPasswd,"dbUser":dbUser,"dbPort":dbPort})
      result = getServiceInfo.apply_async((contentInfo,),queue="getServiceInfo")
      taskInfo = "任务已经创建,详情请查看:http://10.4.34.254/api/task?taskId=%s"% (taskId)
      return [resultInfo]

    getServiceInfo.apply_async((contentInfo,),queue=”getServiceInfo”),重点是这一行,apply_async()方法会返回一个AsyncResult实例,通过这个实例可以跟踪任务状态轨迹。

    要使用此功能,需要提供结果后台(result backend),这样才有地方存储任务状态等信息。其中,getServiceInfo是自定义的一个task,后续会介绍到,contentInfo是传递的一个参数,queue是指定队列名称。

    上面这个函数的原型如下:

    task.apply_async(args[, kwargs[, …]])

    其中 args 和 kwargs 分别是 task 接收的参数,当然它也接受额外的参数对任务进行控制。

    在 Celery 中执行任务的方法一共有三种:

    1. delay, 用来进行最简单便捷的任务执行(delay在第3小节的测试中使用过,它可以看作是apply_async的一个快捷方式);

    2. apply_async, 对于任务的执行附加额外的参数,对任务进行控制;

    3. app.send_task, 可以执行未在 Celery 中进行注册的任务。

    celery文件配置

    在python的库存放路径中(一般是/usr/lib/python2.6/site-packages),创建一个文件夹proj,进入proj目录,创建三个文件,init,将proj声明一个python包,celepy,其内容如下:

    #_*_ coding:utf-8 _*_
    from __future__ import absolute_import
    from celery import Celery
    
    app = Celery("proj",
    broker="amqp://user:password@localhost//",
    backend="amqp",
    include=["proj.tasks"]
    )
    app.conf.update(
    CELERY_ROUTES={
    "proj.tasks.getServerInfo":{"queue":"getServerInfo"},
    }
    )
    if __name__=="__main__":
      app.start()
    

    这里我们定义了模块名称proj以及celery 路由。

    还有一个文件,task.py

    #_*_ coding:utf-8 _*_i
    from __future__ import absolute_import
    from proj.celery import app
    import random
    import simplejson as json
    import types
    import time
    import MySQLdb
    import urllib2
    import ConfigParser as cparser
    import hmac
    import hashlib
    import base64
    @app.task
    def getServiceInfo(contentInfo):
      contentInfo = json.loads(contentInfo)
      serviceGroupName = contentInfo['serviceGroupName']
    
      dbHost = contentInfo['dbHost']
      dbPort = int(contentInfo['dbPort'])
      dbUser = contentInfo['dbUser']
      dbPasswd = contentInfo['dbPasswd']
      msgLib = MessageLib.MessageLib()
      Sql = "Your SQL"
      #第三步:连接数据库,执行代码逻辑
      try:
        db_connection = MySQLdb.connect(host=dbHost, port=dbPort, passwd=dbPasswd, db="cmdb", user=dbUser, connect_timeout=2, charset="utf8")
        cursor = db_connection.cursor()
        cursor.execute(getServiceGroupHostSql)
        row = cursor.fetchall()
        result = []
        for line in row:
          ...
          result.append(tempMysqlHighInfo)
    
      resultInfo = msgLib.success_info(result)
      return resultInfo
      except Exception, e:
        raise
        errorInfo = "dbhost:%s, port:%s, error:%s" % (dbHost, dbPort, str(e))
        #return getServiceGroupHostSql,errorInfo
        return msgLib.error_info(-1, errorInfo, {})