ai - ARL 的重启任务的机制

访问量: 39

1. 状态修改一下。

2. celery 需要被重新提交到任务队列。

root@45205:/opt/ARL# cat run_failed_task.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# restart_waiting_tasks.py - 重新提交所有处于waiting状态的任务到Celery队列

import os
import sys
import logging
import json
from datetime import datetime

# 确保能够导入ARL模块
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from app import celerytask
from app.helpers.task import CeleryAction, TaskType, TaskStatus
from app import utils
from bson import ObjectId

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler()]
)
logger = logging.getLogger("restart_tasks")

# 自定义JSON编码器处理ObjectId
class MongoEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, ObjectId):
            return str(obj)
        return super(MongoEncoder, self).default(obj)

def convert_objectid_to_str(obj):
    """递归地将所有ObjectId转换为字符串"""
    if isinstance(obj, ObjectId):
        return str(obj)
    elif isinstance(obj, dict):
        return {k: convert_objectid_to_str(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [convert_objectid_to_str(item) for item in obj]
    return obj

def get_celery_action(task_type):
    """根据任务类型确定Celery动作"""
    type_map_action = {
        TaskType.DOMAIN: CeleryAction.DOMAIN_TASK,
        TaskType.IP: CeleryAction.IP_TASK,
        TaskType.RISK_CRUISING: CeleryAction.RUN_RISK_CRUISING,
        TaskType.ASSET_SITE_UPDATE: CeleryAction.ASSET_SITE_UPDATE,
        TaskType.FOFA: CeleryAction.FOFA_TASK,
        TaskType.ASSET_SITE_ADD: CeleryAction.ADD_ASSET_SITE_TASK,
        TaskType.ASSET_WIH_UPDATE: CeleryAction.ASSET_WIH_UPDATE,
    }
    return type_map_action.get(task_type)

def restart_waiting_tasks(task_ids=None):
    """
    重新提交所有处于waiting状态的任务到Celery队列

    参数:
        task_ids: 可选,指定要重启的任务ID列表。如果不提供,将重启所有waiting状态的任务

    返回:
        成功重启的任务数量
    """
    query = {"status": TaskStatus.WAITING}

    # 如果指定了特定任务ID,添加到查询条件
    if task_ids:
        object_ids = [ObjectId(task_id) for task_id in task_ids if ObjectId.is_valid(task_id)]
        if object_ids:
            query["_id"] = {"$in": object_ids}

    # 获取所有符合条件的任务
    waiting_tasks = list(utils.conn_db('task').find(query))

    logger.info(f"找到 {len(waiting_tasks)} 个等待中的任务")

    success_count = 0

    # 遍历每个任务并重新提交
    for task_data in waiting_tasks:
        task_id = str(task_data["_id"])
        task_type = task_data["type"]
        task_name = task_data.get("name", "未命名任务")
        target = task_data.get("target", "未知目标")

        # 创建任务数据的副本,并转换所有ObjectId为字符串
        task_data_copy = convert_objectid_to_str(task_data)

        # 确保task_id字段存在
        task_data_copy.pop("_id", None)  # 移除_id字段避免重复
        task_data_copy["task_id"] = task_id

        # 确定Celery动作
        celery_action = get_celery_action(task_type)

        if not celery_action:
            logger.warning(f"任务ID: {task_id}, 名称: {task_name}, 目标: {target} - 未知的任务类型: {task_type}")
            continue

        # 构建任务选项
        task_options = {
            "celery_action": celery_action,
            "data": task_data_copy
        }

        try:
            # 提交到Celery
            celery_id = celerytask.arl_task.delay(options=task_options)

            # 更新Celery ID和状态
            utils.conn_db('task').update_one(
                {"_id": ObjectId(task_id)},
                {"$set": {
                    "celery_id": str(celery_id),
                    "restart_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                }}
            )

            logger.info(f"任务ID: {task_id}, 名称: {task_name}, 目标: {target} - 已重新提交,Celery ID: {celery_id}")
            success_count += 1

        except Exception as e:
            logger.error(f"任务ID: {task_id}, 名称: {task_name}, 目标: {target} - 重新提交失败: {str(e)}")
            logger.exception(e)  # 打印完整的堆栈跟踪

    logger.info(f"成功重启 {success_count}/{len(waiting_tasks)} 个任务")
    return success_count

if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser(description='重新提交等待中的任务到Celery队列')
    parser.add_argument('--ids', nargs='+', help='要重启的特定任务ID列表 (空格分隔)')
    args = parser.parse_args()

    if args.ids:
        logger.info(f"指定重启 {len(args.ids)} 个任务: {', '.join(args.ids)}")
        restart_waiting_tasks(args.ids)
    else:
        restart_waiting_tasks()

订阅/RSS Feed

Subscribe