Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

celery队列替代django-q队列 #2809

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions archery/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
version = (1, 11, 3)
display_version = ".".join(str(i) for i in version)

from .celery import app as celery_app
19 changes: 19 additions & 0 deletions archery/celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/9/3 8:56
# @Author : sky
# @File : celery.py
# @Description : celery
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
#加载配置
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'archery.settings')
#创建celery app
app = Celery('archery')
app.config_from_object('django.conf:settings', namespace='CELERY')
#自动发现项目中的tasks
app.autodiscover_tasks()

#定时任务
app.conf.beat_schedule = {}
36 changes: 34 additions & 2 deletions archery/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))

environ.Env.read_env(os.path.join(BASE_DIR, ".env"))
# 定义 Redis 基本连接字符串
env = environ.Env(
DEBUG=(bool, False),
ALLOWED_HOSTS=(list, ["*"]),
SECRET_KEY=(str, "hfusaf2m4ot#7)fkw#di2bu6(cv0@opwmafx5n#6=3d%x^hpl6"),
DATABASE_URL=(str, "mysql://root:@127.0.0.1:3306/archery"),
CACHE_URL=(str, "redis://127.0.0.1:6379/0"),
CACHE_URL=(str, 'redis://127.0.0.1:6379/0'), # 使用 Redis 数据库 0 作为缓存
# 系统外部认证目前支持LDAP、OIDC、DINGDING三种,认证方式只能启用其中一种,如果启用多个,实际生效的只有一个,优先级LDAP > DINGDING > OIDC
ENABLE_LDAP=(bool, False),
ENABLE_OIDC=(bool, False),
Expand Down Expand Up @@ -121,6 +122,8 @@
"django.contrib.sessions",
"django.contrib.messages",
"django.contrib.staticfiles",
'django_celery_results',
'django_celery_beat',
"django_q",
"sql",
"sql_api",
Expand Down Expand Up @@ -221,7 +224,9 @@
**env.db(),
**{
"DEFAULT_CHARSET": "utf8mb4",
"CONN_MAX_AGE": 50,
# CONN_MAX_AGE设置为0,可以避免celery 报错mysql has gone away 问题
# https://github.com/celery/celery/pull/4292
"CONN_MAX_AGE": 0,
"OPTIONS": {
"init_command": "SET sql_mode='STRICT_TRANS_TABLES'",
"charset": "utf8mb4",
Expand Down Expand Up @@ -257,6 +262,33 @@
# https://docs.djangoproject.com/en/3.2/ref/settings/#std-setting-DEFAULT_AUTO_FIELD
DEFAULT_AUTO_FIELD = "django.db.models.AutoField"

# Celery Configuration
'''
Celery 默认已支持 Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy 等方式,
Redis:没有相应的机制保证消息的消费,当消费者消费失败的时候,消息体丢失,需要手动处理
RabbitMQ:具有消息消费确认,即使消费者消费失败,也会自动使消息体返回原队列,同时可全程持久化,保证消息体被正确消费
'''
CELERY_RESULT_BACKEND = 'django-db' #必须要存储任务结果,代码中有一些需要判断任务状态
#CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/2'
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
CELERYD_CONCURRENCY = 4
CELERY_TIMEZONE = 'UTC'
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERYD_MAX_TASKS_PER_CHILD = 10
CELERYD_FORCE_EXECV = True

# 设置默认不存结果
# CELERY_IGNORE_RESULT = True
CELERY_CREATE_MISSING_QUEUES = True
CELERY_DISABLE_RATE_LIMITS = True
CELERYD_TASK_SOFT_TIME_LIMIT = 600

CELERY_TASK_RESULT_EXPIRES = 600
CELERY_ENABLE_UTC = False
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

# API Framework
REST_FRAMEWORK = {
"DEFAULT_SCHEMA_CLASS": "drf_spectacular.openapi.AutoSchema",
Expand Down
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,7 @@ httpx
OpenAI
elasticsearch==8.14.0
opensearch_py==2.6.0
celery==5.2.7
django-celery-results==2.5.1
django-celery-beat==2.7.0

45 changes: 20 additions & 25 deletions sql/archiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from sql.utils.resource_group import user_instances, user_groups
from sql.models import ArchiveConfig, ArchiveLog, Instance, ResourceGroup
from sql.utils.workflow_audit import get_auditor, AuditException, Audit
from celery import shared_task

logger = logging.getLogger("default")
__author__ = "hhyo"
Expand Down Expand Up @@ -214,11 +215,12 @@ def archive_apply(request):
if audit_handler.audit.current_status == WorkflowStatus.PASSED:
audit_handler.workflow.state = True
audit_handler.workflow.save()
async_task(
notify_for_audit,
workflow_audit=audit_handler.audit,
timeout=60,
task_name=f"archive-apply-{audit_handler.workflow.id}",
notify_for_audit.apply_async(
args=[
audit_handler.audit.audit_id,
],
time_limit=60, # 设置此次任务的超时时间为60秒
task_id=f"archive-apply-{audit_handler.workflow.id}" # 可选,自定义任务ID
)
return JsonResponse(
{
Expand Down Expand Up @@ -274,17 +276,18 @@ def archive_audit(request):
if auditor.audit.current_status == WorkflowStatus.PASSED:
auditor.workflow.state = True
auditor.workflow.save()
async_task(
notify_for_audit,
workflow_audit=auditor.audit,
workflow_audit_detail=workflow_audit_detail,
timeout=60,
task_name=f"archive-audit-{archive_id}",
notify_for_audit.apply_async(
args=[
auditor.audit.audit_id,
workflow_audit_detail.audit_detail_id,
],
time_limit=60, # 设置此次任务的超时时间为60秒
task_id=f"archive-audit-{archive_id}" # 可选,自定义任务ID
)

return HttpResponseRedirect(reverse("sql:archive_detail", args=(archive_id,)))


@shared_task
def add_archive_task(archive_ids=None):
"""
添加数据归档异步任务,仅处理有效归档任务
Expand All @@ -309,15 +312,12 @@ def add_archive_task(archive_ids=None):
# 添加task任务
for archive_info in archive_cnf_list:
archive_id = archive_info.id
async_task(
"sql.archiver.archive",
archive_id,
group=f'archive-{time.strftime("%Y-%m-%d %H:%M:%S ")}',
timeout=-1,
task_name=f"archive-{archive_id}",
archive.apply_async(
args=[archive_id],
task_id=f"archive-{archive_id}", # Celery 允许你指定自定义的任务ID
)


@shared_task
def archive(archive_id):
"""
执行数据库归档
Expand Down Expand Up @@ -528,10 +528,5 @@ def archive_switch(request):
def archive_once(request):
"""单次立即调用归档任务"""
archive_id = request.GET.get("archive_id")
async_task(
"sql.archiver.archive",
archive_id,
timeout=-1,
task_name=f"archive-{archive_id}",
)
archive.apply_async(args=[archive_id],task_id=f"archive-{archive_id}") # 使用 Celery 的apply_async方法调度任务
return JsonResponse({"status": 0, "msg": "ok", "data": {}})
24 changes: 13 additions & 11 deletions sql/binlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from sql.plugins.my2sql import My2SQL
from sql.notify import notify_for_my2sql
from .models import Instance
from celery import shared_task

logger = logging.getLogger("default")

Expand Down Expand Up @@ -205,16 +206,17 @@ def my2sql(request):

# 异步保存到文件
if save_sql:
args.update({"instance": instance})
args.pop("password")
args.pop("output-toScreen")
async_task(
my2sql_file,
args=args,
user=request.user,
hook=notify_for_my2sql,
timeout=-1,
task_name=f"my2sql-{time.time()}",
save_file_time = time.time()
my2sql_file.apply_async(
args=[
args,
instance_name,
request.user.username,
],
task_id=f"my2sql-{save_file_time}", # 可选,自定义任务ID
link=notify_for_my2sql.s(task_id=f"my2sql-{save_file_time}")
)

# 返回查询结果
Expand All @@ -223,16 +225,16 @@ def my2sql(request):
content_type="application/json",
)


def my2sql_file(args, user):
@shared_task
def my2sql_file(args,instance_name, user):
"""
用于异步保存binlog解析的文件
:param args: 参数
:param user: 操作用户对象,用户消息推送
:return:
"""
my2sql = My2SQL()
instance = args.pop("instance")
instance = Instance.objects.get(instance_name=instance_name)
args.update(
{
"host": instance.host,
Expand Down
1 change: 1 addition & 0 deletions sql/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ class SqlWorkflow(models.Model, WorkflowAuditMixin):
engineer = models.CharField("发起人", max_length=30)
engineer_display = models.CharField("发起人中文名", max_length=50, default="")
status = models.CharField(max_length=50, choices=SQL_WORKFLOW_CHOICES)
timing_task_id = models.CharField("定时task_id", max_length=100, default="")
audit_auth_groups = models.CharField("审批权限组列表", max_length=255)
run_date_start = models.DateTimeField("可执行起始时间", null=True, blank=True)
run_date_end = models.DateTimeField("可执行结束时间", null=True, blank=True)
Expand Down
22 changes: 15 additions & 7 deletions sql/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
ArchiveConfigSerializer,
InstanceSerializer,
)
from celery.result import AsyncResult
from celery import shared_task
import json

logger = logging.getLogger("default")

Expand Down Expand Up @@ -514,8 +517,9 @@ def notify_for_execute(workflow: SqlWorkflow, sys_config: SysConfig = None):
auto_notify(workflow=workflow, sys_config=sys_config, event_type=EventType.EXECUTE)


@shared_task
def notify_for_audit(
workflow_audit: WorkflowAudit, workflow_audit_detail: WorkflowAuditDetail = None
workflow_audit_id, workflow_audit_detail_id=None
):
"""
工作流消息通知适配器, 供 async_task 调用, 方便后续的 mock
Expand All @@ -524,6 +528,9 @@ def notify_for_audit(
:param workflow_audit_detail:
:return:
"""
workflow_audit = WorkflowAudit.objects.get(audit_id=workflow_audit_id)
workflow_audit_detail = WorkflowAuditDetail.objects.get(
audit_detail_id=workflow_audit_detail_id) if workflow_audit_detail_id else None
sys_config = SysConfig()
auto_notify(
workflow=None,
Expand All @@ -533,21 +540,22 @@ def notify_for_audit(
event_type=EventType.AUDIT,
)


def notify_for_my2sql(task):
@shared_task
def notify_for_my2sql(result,task_id):
"""
my2sql执行结束的通知
:param task:
:return:
"""
if task.success:
task_result = AsyncResult(task_id)
if task_result.status == 'SUCCESS':
result = My2SqlResult(
success=True, submitter=task.kwargs["user"], file_path=task.result[1]
success=True, submitter=result[0], file_path=result[1]
)
else:
result = My2SqlResult(
success=False, submitter=task.kwargs["user"], error=task.result
success=False, submitter=result[0], error=result
)
# 发送
sys_config = SysConfig()
auto_notify(workflow=result, sys_config=sys_config, event_type=EventType.M2SQL)
auto_notify(workflow=result, sys_config=sys_config, event_type=EventType.M2SQL)
3 changes: 2 additions & 1 deletion sql/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from sql.utils.tasks import add_kill_conn_schedule, del_schedule
from .models import QueryLog, Instance
from sql.engines import get_engine
from celery import shared_task

logger = logging.getLogger("default")

Expand Down Expand Up @@ -308,7 +309,7 @@ def favorite(request):
json.dumps({"status": 0, "msg": "ok"}), content_type="application/json"
)


@shared_task
def kill_query_conn(instance_id, thread_id):
"""终止查询会话,用于schedule调用"""
instance = Instance.objects.get(pk=instance_id)
Expand Down
24 changes: 13 additions & 11 deletions sql/query_privileges.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,11 +301,12 @@ def query_priv_apply(request):
audit_handler.workflow.apply_id, audit_handler.audit.current_status
)
# 消息通知
async_task(
notify_for_audit,
workflow_audit=audit_handler.audit,
timeout=60,
task_name=f"query-priv-apply-{audit_handler.workflow.apply_id}",
notify_for_audit.apply_async(
args=[
audit_handler.audit.audit_id,
],
time_limit=60, # 设置此次任务的超时时间为 300 秒
task_id=f"query-priv-apply-{audit_handler.workflow.apply_id}" # 可选,自定义任务ID
)
return HttpResponse(json.dumps(result), content_type="application/json")

Expand Down Expand Up @@ -452,12 +453,13 @@ def query_priv_audit(request):
)

# 消息通知
async_task(
notify_for_audit,
workflow_audit=auditor.audit,
workflow_audit_detail=workflow_audit_detail,
timeout=60,
task_name=f"query-priv-audit-{apply_id}",
notify_for_audit.apply_async(
args=[
auditor.audit.audit_id,
workflow_audit_detail.audit_detail_id,
],
time_limit=60, # 设置此次任务的超时时间为 300 秒
task_id=f"query-priv-audit-{apply_id}" # 可选,自定义任务ID
)

return HttpResponseRedirect(reverse("sql:queryapplydetail", args=(apply_id,)))
Expand Down
Loading
Loading