Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.11.0 Using redis as jobstore will cause a large number of abnormal processes and eventually cause memory overflow #996

Open
3 tasks done
caiwenju opened this issue Nov 29, 2024 · 24 comments
Labels

Comments

@caiwenju
Copy link

Things to check first

  • I have checked that my issue does not already have a solution in the FAQ

  • I have searched the existing issues and didn't find my bug already reported there

  • I have checked that my bug is still present in the latest release

Version

3.11.0

What happened?

我使用gunicorn 启动 flask 服务, 并且使用aps 3.11.0 ,redis作为jobstore, addjob之后会出现大量的异常进程,最终导致内存溢出

How can we reproduce the bug?

from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.triggers.cron import CronTrigger
from flask_apscheduler import APScheduler

APS_TZ = "Asia/Shanghai"
APS_JDE_COALESCE = True
APS_JDE_MAX_INSTANCES = 3
APS_JDE_MFG_TIME = 5
APS_JST_RDS_DB = 12
APS_JST_RDS_HOST = "192.168.0.17"
APS_JST_RDS_PORT = 46379
APS_JST_RDS_PWD = "Redis#2022@Cms"
APS_JDE_JITTER = 15

Executor 配置

executors = {
'default': ThreadPoolExecutor(max_workers=10),
'processpool': ProcessPoolExecutor(max_workers=5)
}

Job 默认配置

job_defaults = {
'coalesce': APS_JDE_COALESCE,
'max_instances': APS_JDE_MAX_INSTANCES,
"misfire_grace_time": APS_JDE_MFG_TIME
}

Redis jobstore 配置

redis_jobstore = RedisJobStore(db=APS_JST_RDS_DB,
host=APS_JST_RDS_HOST,
port=APS_JST_RDS_PORT,
password=APS_JST_RDS_PWD)

jobstores 字典配置

jobstores = {'redis': redis_jobstore}

BackgroundScheduler 配置

bg_scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)

使用 Flask-APScheduler 的 APScheduler 配置

scheduler = APScheduler(scheduler=bg_scheduler)

定时任务的函数

def task_1():
print("Task 1 is running")

添加任务

scheduler.add_job(
id="1", # 定时任务 ID
func=task_1,
replace_existing=True,
trigger=CronTrigger.from_crontab("* * * * *", timezone=APS_TZ),
jobstore='redis', # 使用 redis jobstore
executor="processpool", # 使用 processpool executor
misfire_grace_time=APS_JDE_MFG_TIME,
jitter=APS_JDE_JITTER
)

添加任务

scheduler.add_job(
id="14", # 定时任务 ID
func=task_1,
replace_existing=True,
trigger=CronTrigger.from_crontab("* * * * *", timezone=APS_TZ),
jobstore='redis', # 使用 redis jobstore
executor="processpool", # 使用 processpool executor
misfire_grace_time=APS_JDE_MFG_TIME,
jitter=APS_JDE_JITTER
)

添加任务

scheduler.add_job(
id="111111111", # 定时任务 ID
func=task_1,
replace_existing=True,
trigger=CronTrigger.from_crontab("* * * * *", timezone=APS_TZ),
jobstore='redis', # 使用 redis jobstore
executor="processpool", # 使用 processpool executor
misfire_grace_time=APS_JDE_MFG_TIME,
jitter=APS_JDE_JITTER
)

添加任务

scheduler.add_job(
id="1111", # 定时任务 ID
func=task_1,
replace_existing=True,
trigger=CronTrigger.from_crontab("* * * * *", timezone=APS_TZ),
jobstore='redis', # 使用 redis jobstore
executor="processpool", # 使用 processpool executor
misfire_grace_time=APS_JDE_MFG_TIME,
jitter=APS_JDE_JITTER
)

添加任务

scheduler.add_job(
id="11", # 定时任务 ID
func=task_1,
replace_existing=True,
trigger=CronTrigger.from_crontab("* * * * *", timezone=APS_TZ),
jobstore='redis', # 使用 redis jobstore
executor="processpool", # 使用 processpool executor
misfire_grace_time=APS_JDE_MFG_TIME,
jitter=APS_JDE_JITTER
)

添加任务

scheduler.add_job(
id="12", # 定时任务 ID
func=task_1,
replace_existing=True,
trigger=CronTrigger.from_crontab("* * * * *", timezone=APS_TZ),
jobstore='redis', # 使用 redis jobstore
executor="processpool", # 使用 processpool executor
misfire_grace_time=APS_JDE_MFG_TIME,
jitter=APS_JDE_JITTER
)

启动调度器

scheduler.start()

@caiwenju caiwenju added the bug label Nov 29, 2024
@agronholm
Copy link
Owner

Try again in English please.

@caiwenju
Copy link
Author

root 9434 8777 9 13:21 pts/0 00:00:03 /usr/local/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=6, pipe_handle=28) --multiprocessing-fork

@agronholm
Copy link
Owner

Still not English. Unless you rewrite your issue in English, I will close it. I cannot read Chinese.

@caiwenju caiwenju changed the title 3.11.0 使用redis作为jobstore会出现大量的异常进程,最终导致内存溢出 3.11.0 Using redis as jobstore will cause a large number of abnormal processes and eventually cause memory overflow Nov 29, 2024
@caiwenju
Copy link
Author

I used gunicorn to start flask service, and used aps 3.11.0,redis as jobstore, after addjob there would be a lot of abnormal processes, and eventually caused memory overflow
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.triggers.cron import CronTrigger
from flask_apscheduler import APScheduler

APS_TZ = "Asia/Shanghai"
APS_JDE_COALESCE = True
APS_JDE_MAX_INSTANCES = 3
APS_JDE_MFG_TIME = 5
APS_JST_RDS_DB = 12
APS_JST_RDS_HOST = "192.168.0.17"
APS_JST_RDS_PORT = 46379
APS_JST_RDS_PWD = "Redis#2022@Cms"
APS_JDE_JITTER = 15

Executor 配置

executors = {
'default': ThreadPoolExecutor(max_workers=10),
'processpool': ProcessPoolExecutor(max_workers=5)
}

Job 默认配置

job_defaults = {
'coalesce': APS_JDE_COALESCE,
'max_instances': APS_JDE_MAX_INSTANCES,
"misfire_grace_time": APS_JDE_MFG_TIME
}

Redis jobstore 配置

redis_jobstore = RedisJobStore(db=APS_JST_RDS_DB,
host=APS_JST_RDS_HOST,
port=APS_JST_RDS_PORT,
password=APS_JST_RDS_PWD)

jobstores 字典配置

jobstores = {'redis': redis_jobstore}

BackgroundScheduler 配置

bg_scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)

使用 Flask-APScheduler 的 APScheduler 配置

scheduler = APScheduler(scheduler=bg_scheduler)

定时任务的函数

def task_1():
print("Task 1 is running")

添加任务

scheduler.add_job(
id="1", # 定时任务 ID
func=task_1,
replace_existing=True,
trigger=CronTrigger.from_crontab("* * * * *", timezone=APS_TZ),
jobstore='redis', # 使用 redis jobstore
executor="processpool", # 使用 processpool executor
misfire_grace_time=APS_JDE_MFG_TIME,
jitter=APS_JDE_JITTER
)

添加任务

scheduler.add_job(
id="14", # 定时任务 ID
func=task_1,
replace_existing=True,
trigger=CronTrigger.from_crontab("* * * * *", timezone=APS_TZ),
jobstore='redis', # 使用 redis jobstore
executor="processpool", # 使用 processpool executor
misfire_grace_time=APS_JDE_MFG_TIME,
jitter=APS_JDE_JITTER
)

添加任务

scheduler.add_job(
id="111111111", # 定时任务 ID
func=task_1,
replace_existing=True,
trigger=CronTrigger.from_crontab("* * * * *", timezone=APS_TZ),
jobstore='redis', # 使用 redis jobstore
executor="processpool", # 使用 processpool executor
misfire_grace_time=APS_JDE_MFG_TIME,
jitter=APS_JDE_JITTER
)

添加任务

scheduler.add_job(
id="1111", # 定时任务 ID
func=task_1,
replace_existing=True,
trigger=CronTrigger.from_crontab("* * * * *", timezone=APS_TZ),
jobstore='redis', # 使用 redis jobstore
executor="processpool", # 使用 processpool executor
misfire_grace_time=APS_JDE_MFG_TIME,
jitter=APS_JDE_JITTER
)

添加任务

scheduler.add_job(
id="11", # 定时任务 ID
func=task_1,
replace_existing=True,
trigger=CronTrigger.from_crontab("* * * * *", timezone=APS_TZ),
jobstore='redis', # 使用 redis jobstore
executor="processpool", # 使用 processpool executor
misfire_grace_time=APS_JDE_MFG_TIME,
jitter=APS_JDE_JITTER
)

添加任务

scheduler.add_job(
id="12", # 定时任务 ID
func=task_1,
replace_existing=True,
trigger=CronTrigger.from_crontab("* * * * *", timezone=APS_TZ),
jobstore='redis', # 使用 redis jobstore
executor="processpool", # 使用 processpool executor
misfire_grace_time=APS_JDE_MFG_TIME,
jitter=APS_JDE_JITTER
)

启动调度器

scheduler.start()

@agronholm
Copy link
Owner

By "abnormal" processes, do you mean the processes from ProcessPoolExecutor? Abnormal how?

@caiwenju
Copy link
Author

This is what the exception process looks like: root 9434 8777 9 13:21 pts/0 00:00:03 /usr/local/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=6, pipe_handle=28) --multiprocessing-fork
There are a large number of such processes, increasing indefinitely until the server's memory is used up

@caiwenju
Copy link
Author

I'll give you a screenshot of my usage

@agronholm
Copy link
Owner

There should only be a maximum of 5 subprocesses, as defined by the configuration of ProcessPoolExecutor. Can you create a minimal working example that reproduces the issue?

@caiwenju
Copy link
Author

image
image
image

@caiwenju
Copy link
Author

I'll try to repeat it with a minimal example, and I'll send it to you later

@caiwenju
Copy link
Author

from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.triggers.cron import CronTrigger
from flask_apscheduler import APScheduler
from flask import Flask, jsonify

app = Flask(name)

APS_TZ = "Asia/Shanghai"
APS_JDE_COALESCE = True
APS_JDE_MAX_INSTANCES = 3
APS_JDE_MFG_TIME = 5
APS_JST_RDS_DB = 12
APS_JST_RDS_HOST = "192.168.0.17"
APS_JST_RDS_PORT = 46379
APS_JST_RDS_PWD = "Redis#2022@Cms"
APS_JDE_JITTER = 15

Executor 配置

executors = {
'default': ThreadPoolExecutor(max_workers=10),
'processpool': ProcessPoolExecutor(max_workers=5)
}

Job 默认配置

job_defaults = {
'coalesce': APS_JDE_COALESCE,
'max_instances': APS_JDE_MAX_INSTANCES,
"misfire_grace_time": APS_JDE_MFG_TIME
}

Redis jobstore 配置

redis_jobstore = RedisJobStore(db=APS_JST_RDS_DB,
host=APS_JST_RDS_HOST,
port=APS_JST_RDS_PORT,
password=APS_JST_RDS_PWD)

jobstores 字典配置

jobstores = {'redis': redis_jobstore}
bg_scheduler = BackgroundScheduler()
app.config.update({
"SCHEDULER_JOBSTORES": jobstores,
"SCHEDULER_EXECUTORS": executors,
"SCHEDULER_JOB_DEFAULTS": job_defaults,
"SCHEDULER_TIMEZONE": APS_TZ
})
scheduler = APScheduler(scheduler=bg_scheduler, app=app)

def heart_beat():
print(f"now: {datetime.now()}")

if not scheduler.get_job(id=f"{heart_beat.name}"):
scheduler.add_job(
id=f"{heart_beat.name}",
replace_existing=True,
func=heart_beat,
trigger=CronTrigger.from_crontab("*/1 * * * *", timezone=APS_TZ),
jobstore='redis',
)
else:
print(scheduler.get_job(id=f"{heart_beat.name}"))

scheduler.start()

def task1():
pass

@app.route('/', methods=['GET'])
def hello():
import time
scheduler.add_job(
id=str(int(time.time())), # 以定时任务id为调度id
func=task1,
replace_existing=True,
trigger=CronTrigger.from_crontab("* * * * *", timezone=APS_TZ),
start_date="2024-11-28 00:00:00",
end_date="2024-11-29 00:00:00",
jobstore='redis',
executor="processpool",
misfire_grace_time=APS_JDE_MFG_TIME,
jitter=APS_JDE_JITTER)
return jsonify(message="Hello, World!")

if name == "main":
app.run(host='0.0.0.0')

1.Write the above code to app.py and replace it with your own redis configuration

  1. Write the following code into gunicorn_conf.py, in the same directory as app.py, and start with gunicorn app:app -c gunicorn_conf.py
    workers = 10
    threads = 5
    backlog = 512
    x_forwarded_for_header = 'X-FORWARDED-FOR'
    reload = Tru

  2. Do the request curl http://0.0.0.0:8000/

  3. If you use ps -ef to view the process, a large number of /usr/local/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=13, pipe_handle=28) --multiprocessing-fork

@agronholm
Copy link
Owner

Is the problem only reproducible with Flask and all those environment variables?

@caiwenju
Copy link
Author

I'm not sure, because by running schedule.start() alone, it ends immediately, and I just start in flask and execute this addjob in the interface

@caiwenju
Copy link
Author

image

@agronholm
Copy link
Owner

I'm not sure, because by running schedule.start() alone, it ends immediately, and I just start in flask and execute this addjob in the interface

That's because you're using BackgroundScheduler. If you used BlockingScheduler instead, it would keep running.

@caiwenju
Copy link
Author

caiwenju commented Nov 29, 2024

I have verified that it has nothing to do with flask. I will send you the code

  1. Write the following code to p.y
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler, BlockingScheduler
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.triggers.cron import CronTrigger
from flask_apscheduler import APScheduler
from flask import Flask, jsonify

app = Flask(__name__)

APS_TZ = "Asia/Shanghai"
APS_JDE_COALESCE = True
APS_JDE_MAX_INSTANCES = 3
APS_JDE_MFG_TIME = 5
APS_JST_RDS_DB = 12
APS_JST_RDS_HOST = "192.168.0.17"
APS_JST_RDS_PORT = 46379
APS_JST_RDS_PWD = "Redis#2022@Cms"
APS_JDE_JITTER = 15


executors = {
    'default': ThreadPoolExecutor(max_workers=10),
    'processpool': ProcessPoolExecutor(max_workers=5)
}


job_defaults = {
    'coalesce': APS_JDE_COALESCE,
    'max_instances': APS_JDE_MAX_INSTANCES,
    "misfire_grace_time": APS_JDE_MFG_TIME
}


redis_jobstore = RedisJobStore(db=APS_JST_RDS_DB,
                               host=APS_JST_RDS_HOST,
                               port=APS_JST_RDS_PORT,
                               password=APS_JST_RDS_PWD)


jobstores = {'redis': redis_jobstore}
bg_scheduler = BlockingScheduler()
app.config.update({
    "SCHEDULER_JOBSTORES": jobstores,
    "SCHEDULER_EXECUTORS": executors,
    "SCHEDULER_JOB_DEFAULTS": job_defaults,
    "SCHEDULER_TIMEZONE": APS_TZ
})
scheduler = APScheduler(scheduler=bg_scheduler, app=app)


def heart_beat():
    print(f"now: {datetime.now()}")

if not scheduler.get_job(id=f"{heart_beat.__name__}"):
    scheduler.add_job(
        id=f"{heart_beat.__name__}",
        replace_existing=True,
        func=heart_beat,
        trigger=CronTrigger.from_crontab("*/1 * * * *", timezone=APS_TZ),
        jobstore='redis',
    )
else:
    print(scheduler.get_job(id=f"{heart_beat.__name__}"))

if __name__ == "__main__":
    scheduler.start()
  1. Write the following code to pp.py
from p import scheduler
from apscheduler.triggers.cron import CronTrigger
APS_TZ = "Asia/Shanghai"
APS_JDE_COALESCE = True
APS_JDE_MAX_INSTANCES = 3
APS_JDE_MFG_TIME = 5
APS_JST_RDS_DB = 12
APS_JST_RDS_HOST = "192.168.0.17"
APS_JST_RDS_PORT = 46379
APS_JST_RDS_PWD = "Redis#2022@Cms"
APS_JDE_JITTER = 15
def task1():
    pass

import time
scheduler.add_job(
    id=str(int(time.time())),  # 以定时任务id为调度id
    func=task1,
    replace_existing=True,
    trigger=CronTrigger.from_crontab("* * * * *", timezone=APS_TZ),
    start_date="2024-11-28 00:00:00",
    end_date="2024-11-29 00:00:00",
    jobstore='redis',
    executor="processpool",
    misfire_grace_time=APS_JDE_MFG_TIME,
    jitter=APS_JDE_JITTER)
  1. Run the python p.py command
  2. Run the python pp.py command
  3. Run ps -ef to view a large number of processes

@agronholm
Copy link
Owner

Have you tested with only the default memory job store? Your issue title says that using specifically the redis job store causes this problem.

@caiwenju
Copy link
Author

3.10.4 verion is ok

@caiwenju
Copy link
Author

There is no problem without using redis, version 3.10.4 does not have this problem with redis

@HK-Mattew
Copy link
Contributor

1.Write the above code to app.py and replace it with your own redis configuration

  1. Write the following code into gunicorn_conf.py, in the same directory as app.py, and start with gunicorn app:app -c gunicorn_conf.py
    workers = 10
    threads = 5
    backlog = 512
    x_forwarded_for_header = 'X-FORWARDED-FOR'
    reload = Tru

It seems to me that the problem is the way you are starting your Flask application.

Notice that in the gunicorn settings you have set gunicorn to use 10 workers. This means that 10 copies of your application will be started, along with 10 schedulers.

If the max_workers value in the ProcessPoolExecutor class is 5 by default, then when you start 10 workers from your Flask application, that means you will have 5x10 scheduler processes, which is a total of 50 scheduler processes alone.

@agronholm
Copy link
Owner

The example still relies on Flask. Flask-apscheduler is not supported here. The example isn't exactly minimal either.
I made my own example, and I'm not seeing more than 2 subprocesses even if I let the script run for a while.

@caiwenju
Copy link
Author

caiwenju commented Dec 2, 2024

The example still relies on Flask. Flask-apscheduler is not supported here. The example isn't exactly minimal either. I made my own example, and I'm not seeing more than 2 subprocesses even if I let the script run for a while.

Ok, maybe there is this problem with flask. At present, I have gone back to version 3.10.4 and will not have this problem. Moreover, I used the stace command to see that aps link redis has been blocked and then waiting for timeout

@caiwenju
Copy link
Author

caiwenju commented Dec 2, 2024

1.Write the above code to app.py and replace it with your own redis configuration

  1. Write the following code into gunicorn_conf.py, in the same directory as app.py, and start with gunicorn app:app -c gunicorn_conf.py
    workers = 10
    threads = 5
    backlog = 512
    x_forwarded_for_header = 'X-FORWARDED-FOR'
    reload = Tru

It seems to me that the problem is the way you are starting your Flask application.

Notice that in the gunicorn settings you have set gunicorn to use 10 workers. This means that 10 copies of your application will be started, along with 10 schedulers.

If the max_workers value in the ProcessPoolExecutor class is 5 by default, then when you start 10 workers from your Flask application, that means you will have 5x10 scheduler processes, which is a total of 50 scheduler processes alone.

No, he'll add an infinite pile, more than 50

@agronholm
Copy link
Owner

agronholm commented Dec 2, 2024

Why is my example not reproducing your problem then? I asked to test without Flask (and flask-apscheduler) to eliminate them as a source of problems. APScheduler 3.11 switched to using spawn instead of fork as the subprocess spawning method, but that should not cause problems unless you're creating more subprocesses as import side effects.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants