-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcelery_app.py
61 lines (52 loc) · 1.69 KB
/
celery_app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import importlib
from celery import Celery
from celery.schedules import crontab
import mongoengine
from container import Container
from config import get_config_by_env
TASKS_MODULE = [
"tasks.securities_tasks",
"tasks.stock_tasks",
]
def make_celery():
config = get_config_by_env()
celery = Celery(
backend=config.CELERY_RESULT_BACKEND,
broker=config.CELERY_BROKER_URL,
include=TASKS_MODULE
)
celery.conf.update(
beat_schedule={
'sync_stocks': {
'task': 'tasks.stock_tasks.sync_stocks',
'schedule': crontab(hour='18')
},
'sync_account': {
'task': 'tasks.securities_tasks.sync_account',
'schedule': crontab(hour='18')
},
'sync_holding_summary': {
'task': 'tasks.securities_tasks.sync_holding_summary',
'schedule': crontab(hour='18')
},
'sync_all_daily_summaries': {
'task': 'tasks.securities_tasks.sync_all_daily_summaries',
'schedule': crontab(hour='18')
}
}
)
mongoengine.connect(
db=config.MONGODB_SETTINGS['db'],
host=config.MONGODB_SETTINGS['host'],
port=config.MONGODB_SETTINGS['port'])
container = Container()
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
modules = [importlib.import_module(
full_module_name) for full_module_name in TASKS_MODULE]
container.wire(modules=modules)
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
app = make_celery()
app.autodiscover_tasks()