ai - ARL 的重启任务的机制
访问量: 39
1. 状态修改一下。
2. celery 需要被重新提交到任务队列。

root@45205:/opt/ARL# cat run_failed_task.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# restart_waiting_tasks.py - 重新提交所有处于waiting状态的任务到Celery队列
import os
import sys
import logging
import json
from datetime import datetime
# 确保能够导入ARL模块
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from app import celerytask
from app.helpers.task import CeleryAction, TaskType, TaskStatus
from app import utils
from bson import ObjectId
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger("restart_tasks")
# 自定义JSON编码器处理ObjectId
class MongoEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, ObjectId):
return str(obj)
return super(MongoEncoder, self).default(obj)
def convert_objectid_to_str(obj):
"""递归地将所有ObjectId转换为字符串"""
if isinstance(obj, ObjectId):
return str(obj)
elif isinstance(obj, dict):
return {k: convert_objectid_to_str(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [convert_objectid_to_str(item) for item in obj]
return obj
def get_celery_action(task_type):
"""根据任务类型确定Celery动作"""
type_map_action = {
TaskType.DOMAIN: CeleryAction.DOMAIN_TASK,
TaskType.IP: CeleryAction.IP_TASK,
TaskType.RISK_CRUISING: CeleryAction.RUN_RISK_CRUISING,
TaskType.ASSET_SITE_UPDATE: CeleryAction.ASSET_SITE_UPDATE,
TaskType.FOFA: CeleryAction.FOFA_TASK,
TaskType.ASSET_SITE_ADD: CeleryAction.ADD_ASSET_SITE_TASK,
TaskType.ASSET_WIH_UPDATE: CeleryAction.ASSET_WIH_UPDATE,
}
return type_map_action.get(task_type)
def restart_waiting_tasks(task_ids=None):
"""
重新提交所有处于waiting状态的任务到Celery队列
参数:
task_ids: 可选,指定要重启的任务ID列表。如果不提供,将重启所有waiting状态的任务
返回:
成功重启的任务数量
"""
query = {"status": TaskStatus.WAITING}
# 如果指定了特定任务ID,添加到查询条件
if task_ids:
object_ids = [ObjectId(task_id) for task_id in task_ids if ObjectId.is_valid(task_id)]
if object_ids:
query["_id"] = {"$in": object_ids}
# 获取所有符合条件的任务
waiting_tasks = list(utils.conn_db('task').find(query))
logger.info(f"找到 {len(waiting_tasks)} 个等待中的任务")
success_count = 0
# 遍历每个任务并重新提交
for task_data in waiting_tasks:
task_id = str(task_data["_id"])
task_type = task_data["type"]
task_name = task_data.get("name", "未命名任务")
target = task_data.get("target", "未知目标")
# 创建任务数据的副本,并转换所有ObjectId为字符串
task_data_copy = convert_objectid_to_str(task_data)
# 确保task_id字段存在
task_data_copy.pop("_id", None) # 移除_id字段避免重复
task_data_copy["task_id"] = task_id
# 确定Celery动作
celery_action = get_celery_action(task_type)
if not celery_action:
logger.warning(f"任务ID: {task_id}, 名称: {task_name}, 目标: {target} - 未知的任务类型: {task_type}")
continue
# 构建任务选项
task_options = {
"celery_action": celery_action,
"data": task_data_copy
}
try:
# 提交到Celery
celery_id = celerytask.arl_task.delay(options=task_options)
# 更新Celery ID和状态
utils.conn_db('task').update_one(
{"_id": ObjectId(task_id)},
{"$set": {
"celery_id": str(celery_id),
"restart_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}}
)
logger.info(f"任务ID: {task_id}, 名称: {task_name}, 目标: {target} - 已重新提交,Celery ID: {celery_id}")
success_count += 1
except Exception as e:
logger.error(f"任务ID: {task_id}, 名称: {task_name}, 目标: {target} - 重新提交失败: {str(e)}")
logger.exception(e) # 打印完整的堆栈跟踪
logger.info(f"成功重启 {success_count}/{len(waiting_tasks)} 个任务")
return success_count
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='重新提交等待中的任务到Celery队列')
parser.add_argument('--ids', nargs='+', help='要重启的特定任务ID列表 (空格分隔)')
args = parser.parse_args()
if args.ids:
logger.info(f"指定重启 {len(args.ids)} 个任务: {', '.join(args.ids)}")
restart_waiting_tasks(args.ids)
else:
restart_waiting_tasks()