diff --git a/docs/using_task_cache_collision_lock.rst b/docs/using_task_cache_collision_lock.rst index d057dd43..c7fa151c 100644 --- a/docs/using_task_cache_collision_lock.rst +++ b/docs/using_task_cache_collision_lock.rst @@ -49,69 +49,3 @@ How to use 3. Done With the above configuration, all tasks that inherits gokart.TaskOnKart will ask the redis server if any other node is not trying to access the same cache file at the same time whenever they access the file with dump or load. - - -Advanced: Using efficient task cache collision lock ------------------------------------------ - -The cache lock introduced above will prevent cache collision. -However, above setting check collisions only when the task access the cache file (i.e. ``task.dump()``, ``task.load()`` and ``task.remove()``). -This will allow applications to run ``run()`` of same task at the same time, which is not time efficient. - -Settings in this section will prevent running ``run()`` at the same time for efficiency. - -If you try to run() the same task on multiple worker nodes at the same time, run() will fail on the second and subsequent node's tasks. -gokart will execute other unaffected tasks in the meantime. Since we have also set up the retry process, we will come back to the failed task later. -When it comes back, the first worker node has already completed run() and a cache has been created, so there is no need to run() on the second and subsequent nodes. -In this way, efficient distributed processing is made possible. - - -This setting must be done to each gokart task which you want to lock the ``run()```. - -1. Set normal cache collision lock - - Follow the steps in ``How to use`` to set up cache collision lock. - - -2. Decorate ``run()`` with ``@RunWithLock`` - - Decorate ``run()`` of your gokart tasks you want to lock with ``@RunWithLock``. - - .. code:: python - - from gokart.run_with_lock import RunWithLock - - class SomeTask(gokart.TaskOnKart): - @RunWithLock - def run(self): - ... - - -3. Set ``raise_task_lock_exception_on_collision`` parameter to true. - - This parameter will affect the behavior when the task's lock is taken by other applications or nodes. - Setting ``raise_task_lock_exception_on_collision=True`` will make the task to be failed if the task's lock is taken by others. - - The parameter can be set by config file. - - .. code:: - - [TaskOnKart] - redis_host=localhost - redis_port=6379 - raise_task_lock_exception_on_collision=true - -4. Set retry parameters - - Set following parameters to retry when task failed. - * ``retry_count``: the max number of retries - * ``retry_delay``: this value is set in seconds - - .. code:: - - [scheduler] - retry_count=10000 - retry_delay=10 - - [worker] - keep_alive=true diff --git a/gokart/task.py b/gokart/task.py index def842c7..5248b82c 100644 --- a/gokart/task.py +++ b/gokart/task.py @@ -62,10 +62,6 @@ class TaskOnKart(luigi.Task): redis_host = luigi.OptionalParameter(default=None, description='Task lock check is deactivated, when None.', significant=False) redis_port = luigi.OptionalParameter(default=None, description='Task lock check is deactivated, when None.', significant=False) redis_timeout = luigi.IntParameter(default=180, description='Redis lock will be released after `redis_timeout` seconds', significant=False) - raise_task_lock_exception_on_collision: bool = luigi.BoolParameter( - default=False, - description='True for failing the task immediately when the cache is locked, instead of waiting for the lock to be released', - significant=False) fail_on_empty_dump: bool = ExplicitBoolParameter(default=False, description='Fail when task dumps empty DF', significant=False) store_index_in_feather: bool = ExplicitBoolParameter(default=True, description='Wether to store index when using feather as a output object.', @@ -176,7 +172,7 @@ def make_target(self, relative_file_path: Optional[str] = None, use_unique_id: b redis_host=self.redis_host, redis_port=self.redis_port, redis_timeout=self.redis_timeout, - raise_task_lock_exception_on_collision=self.raise_task_lock_exception_on_collision) + raise_task_lock_exception_on_collision=False) return gokart.target.make_target(file_path=file_path, unique_id=unique_id, processor=processor, @@ -193,7 +189,7 @@ def make_large_data_frame_target(self, relative_file_path: Optional[str] = None, redis_host=self.redis_host, redis_port=self.redis_port, redis_timeout=self.redis_timeout, - raise_task_lock_exception_on_collision=self.raise_task_lock_exception_on_collision) + raise_task_lock_exception_on_collision=False) return gokart.target.make_model_target(file_path=file_path, temporary_directory=self.local_temporary_directory, unique_id=unique_id, @@ -222,7 +218,7 @@ def make_model_target(self, redis_host=self.redis_host, redis_port=self.redis_port, redis_timeout=self.redis_timeout, - raise_task_lock_exception_on_collision=self.raise_task_lock_exception_on_collision) + raise_task_lock_exception_on_collision=False) return gokart.target.make_model_target(file_path=file_path, temporary_directory=self.local_temporary_directory, unique_id=unique_id,