Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

set TaskOnKart.raise_task_lock_exception_on_collision as false #342

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 0 additions & 66 deletions docs/using_task_cache_collision_lock.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,69 +49,3 @@ How to use
3. Done

With the above configuration, all tasks that inherits gokart.TaskOnKart will ask the redis server if any other node is not trying to access the same cache file at the same time whenever they access the file with dump or load.


Advanced: Using efficient task cache collision lock
-----------------------------------------

The cache lock introduced above will prevent cache collision.
However, above setting check collisions only when the task access the cache file (i.e. ``task.dump()``, ``task.load()`` and ``task.remove()``).
This will allow applications to run ``run()`` of same task at the same time, which is not time efficient.

Settings in this section will prevent running ``run()`` at the same time for efficiency.

If you try to run() the same task on multiple worker nodes at the same time, run() will fail on the second and subsequent node's tasks.
gokart will execute other unaffected tasks in the meantime. Since we have also set up the retry process, we will come back to the failed task later.
When it comes back, the first worker node has already completed run() and a cache has been created, so there is no need to run() on the second and subsequent nodes.
In this way, efficient distributed processing is made possible.


This setting must be done to each gokart task which you want to lock the ``run()```.

1. Set normal cache collision lock

Follow the steps in ``How to use`` to set up cache collision lock.


2. Decorate ``run()`` with ``@RunWithLock``

Decorate ``run()`` of your gokart tasks you want to lock with ``@RunWithLock``.

.. code:: python

from gokart.run_with_lock import RunWithLock

class SomeTask(gokart.TaskOnKart):
@RunWithLock
def run(self):
...


3. Set ``raise_task_lock_exception_on_collision`` parameter to true.

This parameter will affect the behavior when the task's lock is taken by other applications or nodes.
Setting ``raise_task_lock_exception_on_collision=True`` will make the task to be failed if the task's lock is taken by others.

The parameter can be set by config file.

.. code::

[TaskOnKart]
redis_host=localhost
redis_port=6379
raise_task_lock_exception_on_collision=true

4. Set retry parameters

Set following parameters to retry when task failed.
* ``retry_count``: the max number of retries
* ``retry_delay``: this value is set in seconds

.. code::

[scheduler]
retry_count=10000
retry_delay=10

[worker]
keep_alive=true
14 changes: 7 additions & 7 deletions gokart/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,7 @@ class TaskOnKart(luigi.Task):
redis_host = luigi.OptionalParameter(default=None, description='Task lock check is deactivated, when None.', significant=False)
redis_port = luigi.OptionalParameter(default=None, description='Task lock check is deactivated, when None.', significant=False)
redis_timeout = luigi.IntParameter(default=180, description='Redis lock will be released after `redis_timeout` seconds', significant=False)
raise_task_lock_exception_on_collision: bool = luigi.BoolParameter(
default=False,
description='True for failing the task immediately when the cache is locked, instead of waiting for the lock to be released',
significant=False)

fail_on_empty_dump: bool = ExplicitBoolParameter(default=False, description='Fail when task dumps empty DF', significant=False)
store_index_in_feather: bool = ExplicitBoolParameter(default=True,
description='Wether to store index when using feather as a output object.',
Expand Down Expand Up @@ -176,7 +173,8 @@ def make_target(self, relative_file_path: Optional[str] = None, use_unique_id: b
redis_host=self.redis_host,
redis_port=self.redis_port,
redis_timeout=self.redis_timeout,
raise_task_lock_exception_on_collision=self.raise_task_lock_exception_on_collision)
raise_task_lock_exception_on_collision=False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mski-iksm Plz describe why we need to keep raise_task_lock_exception_on_collision for make_redis_params while this is always set to be False

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Hi-king
In the coming feature, TaskOnKart.run() will be wrapped with task collision lock to prevent running same task at the same time.
I want to keep RedisParams.raise_task_lock_exception_on_collision to be used in this new feature.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mski-iksm Thx for clarifying :)


return gokart.target.make_target(file_path=file_path,
unique_id=unique_id,
processor=processor,
Expand All @@ -193,7 +191,8 @@ def make_large_data_frame_target(self, relative_file_path: Optional[str] = None,
redis_host=self.redis_host,
redis_port=self.redis_port,
redis_timeout=self.redis_timeout,
raise_task_lock_exception_on_collision=self.raise_task_lock_exception_on_collision)
raise_task_lock_exception_on_collision=False)

return gokart.target.make_model_target(file_path=file_path,
temporary_directory=self.local_temporary_directory,
unique_id=unique_id,
Expand Down Expand Up @@ -222,7 +221,8 @@ def make_model_target(self,
redis_host=self.redis_host,
redis_port=self.redis_port,
redis_timeout=self.redis_timeout,
raise_task_lock_exception_on_collision=self.raise_task_lock_exception_on_collision)
raise_task_lock_exception_on_collision=False)

return gokart.target.make_model_target(file_path=file_path,
temporary_directory=self.local_temporary_directory,
unique_id=unique_id,
Expand Down