当前位置 主页 > 网站技术 > 代码类 >

    异步redis队列实现 数据入库的方法

    栏目:代码类 时间:2019-10-25 15:03

    业务需求

    app客户端向服务端接口发送来json 数据 每天 发一次 清空缓存后会再次发送

    出问题之前业务逻辑:

    php 接口 首先将 json 转为数组 去重 在一张大表中插入不存在的数据

    该用户已经存在 和新增的id

    入另一种详情表

    问题所在:

    当用户因特殊情况清除缓存 导致app 发送json串 入库并发高 导致CPU 暴增到88% 并且居高不下

    优化思路:

    1、异步队列处理

    2、redis 过滤(就是只处理当天第一次请求)

    3、redis 辅助存储app名称(验证过后批量插入数据app名称表中)

    4、拼接插入的以及新增的如详细表中

    解决办法:

    1、接口修改 redis 过滤 + 如list队列 并将结果存入redis中

    首先 redis将之前的历史数据放在redis 哈希里面 中文为键名 id 为键值

    <?php
    /**
     * Created by haiyong.
     * User: jia
     * Date: 2017/9/18
     * Time: 20:06
     */
    namespace App\Http\Controllers\App;
     
     
    use App\Http\Controllers\Controller;
    use Illuminate\Http\Request;
    use Illuminate\Support\Facades\DB;
    use Illuminate\Support\Facades\Redis;
     
    class OtherAppController extends Controller{
     
     /**
      * app应用统计接口
      * @param Request $request
      * @return string
      */
     public function appTotal(Request $request)
     {
      // //历史数据入库
      //$redis = Redis::connection('web_active');
      // $app_name = DB::connection('phpLog')->table('app_set_name')->where("appName", '<>', ' ')->lists('id', 'appName');
      // $str = '';
      // foreach ($app_name as $key => $val) {
      //  $str.= "{$val} {$key} ";
      // }
      // $redis->hmset('app_name', $app_name);
      // echo $str;exit;
      $result = $request->input('res');
      $list = json_decode($result, true);
      if (empty ($list) || !is_array($list)) {
       return json_encode(['result' => 'ERROR', 'msg' => 'parameter error']);
      }
      $data['uid'] = isset($list['uid']) ? $list['uid'] : '20001' ;
      $data['time'] = date('Y-m-d');
      $redis_key = 'log_app:'.$data['time'];
      //redis 过滤
      $redis = Redis::connection('web_active');
      //redis 键值过期设置
      if (empty($redis->exists($redis_key))) {
       $redis->hset($redis_key, 1, 'start');
       $redis->EXPIREAT($redis_key, strtotime($data['time'].'+2 day'));
      }
      //值确定
      if ($redis->hexists($redis_key, $data['uid'])) {
       return json_encode(['result' => 'SUCCESS']);
      } else {
       //推入队列
       $redis->hset($redis_key, $data['uid'], $result);
       $redis->rpush('log_app_list', $data['time'] . ':' . $data['uid']);
       return json_encode(['result' => 'SUCCESS']);
      }
     }
    }

    2、php 脚本循环 监控redis 队列 执行逻辑 防止内存溢出

    mget 获取该用户的app id 不存在就会返回null

    通过判断null 运用redis 新值作为自增id指针 将null 补齐 之后批量入mysql 并跟新redis 哈希 和指针值 并入库 详情表

    <?php
     
    namespace App\Console\Commands;
     
    use Illuminate\Console\Command;
    use Illuminate\Support\Facades\Redis;
    use Illuminate\Support\Facades\DB;
    use Illuminate\Support\Facades\Storage;
     
    class AppTotal extends Command
    {
     /**
      * The name and signature of the console command.
      *
      * @var string
      */
     protected $signature = 'AppTotal:run';
     
     /**
      * The console command description.
      *
      * @var string
      */
     protected $description = 'Command description';
     
     /**
      * Create a new command instance.
      *
      * @return void
      */
     public function __construct()
     {
      parent::__construct();
     }
     
     /**
      * Execute the console command.
      *
      * @return mixed
      */
     public function handle()
     {
       //历史数据入库 
      // $redis = Redis::connection('web_active'); 
      // $app_name = DB::connection('phpLog')->table('app_set_name')->where("appName", '<>', ' ')->lists('id', 'appName'); 
      // $redis->hmset('app_name', $app_name); 
      // exit; 
       while(1) {
       $redis = Redis::connection('web_active');
       //队列名称
       $res = $redis->lpop('log_app_list'); 
       //开关按钮
       $lock = $redis->get('log_app_lock');
       if (!empty($res)) {
        list($date,$uid) = explode(':',$res);
        $result = $redis->hget('log_app:'.$date, $uid);
        if (!empty($result)) {
          $table_name = 'app_total'.date('Ym');
          $list = json_decode($result, true);
          $data['uid'] = isset($list['uid']) ? $list['uid'] : '20001' ;
          $data['sex'] = isset($list['sex']) ? $list['sex'] : '' ;
          $data['device'] = isset($list['device']) ? $list['device'] : '' ;
          $data['appList'] = isset($list['list']) ? $list['list'] : '' ;
          //数据去重 flip比unique更节约性能
          $data['appList'] = array_flip($data['appList']);
          $data['appList'] = array_flip($data['appList']);
          $data['time'] = date('Y-m-d');
          //app应用过滤
          $app_res = $redis->hmget('app_name', $data['appList']);
          //新增加app数组
          $new_app = [];
          //mysql 入库数组
          $mysql_new_app = [];
       //获取当前redis 自增指针
       $total = $redis->get('app_name_total');
       foreach ($app_res as $key =>& $val) {
        if (is_null($val)) {
         $total += 1;
         $new_app[$data['appList'][$key]] = $total; 
         $val = $total;
         array_push($mysql_new_app,['id' => $total, 'appName'=> $data['appList'][$key]]);
        }
       }
       if (count($new_app)){
        $str = "INSERT IGNORE INTO app_set_name (id,appName) values";
        foreach ($new_app as $key => $val) {
        $str.= "(".$val.",'".$key."'),";
        }
        $str = trim($str, ',');
        //$mysql_res = DB::connection('phpLog')->table('app_set_name')->insert($mysql_new_app);
        $mysql_res = DB::connection('phpLog')->statement($str);
        if ($mysql_res) {
         // 设置redis 指针
         $redis->set('app_name_total', $total);
         // redis 数据入库
         $redis->hmset('app_name', $new_app);
        }
      }
        // 详情数据入库
        $data['appList'] = implode(',', $app_res);
          //app统计入库
          DB::connection('phpLog')->statement("INSERT IGNORE INTO ".$table_name." (uid,sex,device,`time`,appList) 
      values('".$data['uid']."',".$data['sex'].",'".$data['device']."','".$data['time']."','".$data['appList']."')");
          //log 记录 当文件达到123MB的时候产生内存保错 所有这个地方可是利用日志切割 或者 不写入 日志
          Storage::disk('local')->append(DIRECTORY_SEPARATOR.'total'.DIRECTORY_SEPARATOR.'loaAppTotal.txt', date('Y-m-d H:i:s').' success '.$result."\n"); 
      } else {
       Storage::disk('local')->append(DIRECTORY_SEPARATOR.'total'.DIRECTORY_SEPARATOR.'loaAppTotal.txt', date('Y-m-d H:i:s').' error '.$result."\n");
      }
       }
       //执行间隔
       sleep(1);
       //结束按钮
       if ($lock == 2) {
        exit;
       }
       //内存检测
       if(memory_get_usage()>1000*1024*1024){
        exit('内存溢出');//大于100M内存退出程序,防止内存泄漏被系统杀死导致任务终端
       }
      }
     }
    }