Skip to content
This repository has been archived by the owner on Sep 11, 2024. It is now read-only.

Commit

Permalink
Added RabbitMQ connection parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
vduseev committed Jun 16, 2021
1 parent 9d70ecf commit c95e290
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 17 deletions.
29 changes: 20 additions & 9 deletions dbload/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,16 @@ def send(actor_name, **kwargs):
ctx.infuse()

try:
from dramatiq import get_broker, actor as register_actor
broker = get_broker()
from dramatiq import set_broker, actor as register_actor
from dramatiq.brokers.rabbitmq import RabbitmqBroker

broker = RabbitmqBroker(
host=config.broker_host,
port=config.broker_port,
heartbeat=5,
blocked_connection_timeout=60
)
set_broker(broker)

# Decorate all queries and scenarios as actors
for query_name in ctx.queries:
Expand Down Expand Up @@ -339,10 +347,19 @@ def scheduler(actor_name, cron, **kwargs):
ctx.infuse()

try:
from dramatiq import get_broker, actor as register_actor
from dramatiq import set_broker, actor as register_actor
from dramatiq.brokers.rabbitmq import RabbitmqBroker
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger

broker = RabbitmqBroker(
host=config.broker_host,
port=config.broker_port,
heartbeat=5,
blocked_connection_timeout=60
)
set_broker(broker)

# Decorate all queries and scenarios as actors
for query_name in ctx.queries:
register_actor(ctx.queries[query_name].function)
Expand All @@ -351,7 +368,6 @@ def scheduler(actor_name, cron, **kwargs):

# Form a schedule if explicitly requested.
# Otherwise schedule from config will be used
broker = get_broker()
actors = broker.get_declared_actors()
if actor_name in actors:
cron = cron or "* * * * *"
Expand Down Expand Up @@ -426,12 +442,7 @@ def worker(**kwargs):
config = get_config(cli_args)

try:
from dramatiq import get_broker, actor as register_actor
from dramatiq.cli import main as dramatiq_main, CPUS
# from dramatiq.brokers.rabbitmq import RabbitmqBroker

# broker = RabbitmqBroker(host="localhost", port="5672", heartbeat=5, blocked_connection_timeout=60)
# dramatiq.set_broker(broker)

if dramatiq_args.processes is None:
dramatiq_args.processes = CPUS
Expand Down
5 changes: 4 additions & 1 deletion dbload/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ class Config(ilexconf.Config):
# Predefined simulations
predefined_simulations=["sap-hana"],
# Schedule for APScheduler
schedule=None
schedule=None,
# RabbitMQ connection parameters
broker_host="localhost",
broker_port="5672",
)

def __init__(self, cli_args):
Expand Down
21 changes: 16 additions & 5 deletions dbload/resources/scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,24 @@ def create_sale(con):


try:
import dramatiq
from dramatiq import set_broker, actor
from dramatiq.brokers.rabbitmq import RabbitmqBroker
from dbload import get_config

config = get_config()
broker = RabbitmqBroker(
host=config.broker_host,
port=config.broker_port,
heartbeat=5,
blocked_connection_timeout=60
)
set_broker(broker)

logger.info("Decorating scenarios")
dramatiq.actor(update_employee)
dramatiq.actor(create_client)
dramatiq.actor(update_client)
dramatiq.actor(create_sale)
actor(update_employee)
actor(create_client)
actor(update_client)
actor(create_sale)

except ImportError:
pass
2 changes: 1 addition & 1 deletion demo/sap-hana/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
1. Launch dbload worker process in a separate terminal window

```shell
dbload worker --predefined sap-hana --process 1 --thread 1
dbload worker --predefined sap-hana --processes 1 --threads 1
```

1. Launch dbload scheduler process in another terminal window.
Expand Down
4 changes: 3 additions & 1 deletion demo/sap-hana/dbload.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
"password": "HXEHana1"
},
"predefined": "sap-hana",
"broker_host": "rabbitmq-cluster",
"broker_port": "5672",
"schedule": {
"create_sale": "* * * * *"
}
}
}

0 comments on commit c95e290

Please sign in to comment.