Skip to content

Commit

Permalink
set raise_task_lock_exception_on_collision as False
Browse files Browse the repository at this point in the history
  • Loading branch information
mski-iksm committed Jan 10, 2024
1 parent 684f3af commit 1ee1aba
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 73 deletions.
66 changes: 0 additions & 66 deletions docs/using_task_cache_collision_lock.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,69 +49,3 @@ How to use
3. Done

With the above configuration, all tasks that inherits gokart.TaskOnKart will ask the redis server if any other node is not trying to access the same cache file at the same time whenever they access the file with dump or load.


Advanced: Using efficient task cache collision lock
-----------------------------------------

The cache lock introduced above will prevent cache collision.
However, above setting check collisions only when the task access the cache file (i.e. ``task.dump()``, ``task.load()`` and ``task.remove()``).
This will allow applications to run ``run()`` of same task at the same time, which is not time efficient.

Settings in this section will prevent running ``run()`` at the same time for efficiency.

If you try to run() the same task on multiple worker nodes at the same time, run() will fail on the second and subsequent node's tasks.
gokart will execute other unaffected tasks in the meantime. Since we have also set up the retry process, we will come back to the failed task later.
When it comes back, the first worker node has already completed run() and a cache has been created, so there is no need to run() on the second and subsequent nodes.
In this way, efficient distributed processing is made possible.


This setting must be done to each gokart task which you want to lock the ``run()```.

1. Set normal cache collision lock

Follow the steps in ``How to use`` to set up cache collision lock.


2. Decorate ``run()`` with ``@RunWithLock``

Decorate ``run()`` of your gokart tasks you want to lock with ``@RunWithLock``.

.. code:: python
from gokart.run_with_lock import RunWithLock
class SomeTask(gokart.TaskOnKart):
@RunWithLock
def run(self):
...
3. Set ``raise_task_lock_exception_on_collision`` parameter to true.

This parameter will affect the behavior when the task's lock is taken by other applications or nodes.
Setting ``raise_task_lock_exception_on_collision=True`` will make the task to be failed if the task's lock is taken by others.

The parameter can be set by config file.

.. code::
[TaskOnKart]
redis_host=localhost
redis_port=6379
raise_task_lock_exception_on_collision=true
4. Set retry parameters

Set following parameters to retry when task failed.
* ``retry_count``: the max number of retries
* ``retry_delay``: this value is set in seconds

.. code::
[scheduler]
retry_count=10000
retry_delay=10
[worker]
keep_alive=true
10 changes: 3 additions & 7 deletions gokart/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ class TaskOnKart(luigi.Task):
redis_host = luigi.OptionalParameter(default=None, description='Task lock check is deactivated, when None.', significant=False)
redis_port = luigi.OptionalParameter(default=None, description='Task lock check is deactivated, when None.', significant=False)
redis_timeout = luigi.IntParameter(default=180, description='Redis lock will be released after `redis_timeout` seconds', significant=False)
raise_task_lock_exception_on_collision: bool = luigi.BoolParameter(
default=False,
description='True for failing the task immediately when the cache is locked, instead of waiting for the lock to be released',
significant=False)
fail_on_empty_dump: bool = ExplicitBoolParameter(default=False, description='Fail when task dumps empty DF', significant=False)
store_index_in_feather: bool = ExplicitBoolParameter(default=True,
description='Wether to store index when using feather as a output object.',
Expand Down Expand Up @@ -176,7 +172,7 @@ def make_target(self, relative_file_path: Optional[str] = None, use_unique_id: b
redis_host=self.redis_host,
redis_port=self.redis_port,
redis_timeout=self.redis_timeout,
raise_task_lock_exception_on_collision=self.raise_task_lock_exception_on_collision)
raise_task_lock_exception_on_collision=False)
return gokart.target.make_target(file_path=file_path,
unique_id=unique_id,
processor=processor,
Expand All @@ -193,7 +189,7 @@ def make_large_data_frame_target(self, relative_file_path: Optional[str] = None,
redis_host=self.redis_host,
redis_port=self.redis_port,
redis_timeout=self.redis_timeout,
raise_task_lock_exception_on_collision=self.raise_task_lock_exception_on_collision)
raise_task_lock_exception_on_collision=False)
return gokart.target.make_model_target(file_path=file_path,
temporary_directory=self.local_temporary_directory,
unique_id=unique_id,
Expand Down Expand Up @@ -222,7 +218,7 @@ def make_model_target(self,
redis_host=self.redis_host,
redis_port=self.redis_port,
redis_timeout=self.redis_timeout,
raise_task_lock_exception_on_collision=self.raise_task_lock_exception_on_collision)
raise_task_lock_exception_on_collision=False)
return gokart.target.make_model_target(file_path=file_path,
temporary_directory=self.local_temporary_directory,
unique_id=unique_id,
Expand Down

0 comments on commit 1ee1aba

Please sign in to comment.