Skip to content

Commit

Permalink
add (#341)
Browse files Browse the repository at this point in the history
  • Loading branch information
mski-iksm authored Jan 10, 2024
1 parent f5f3956 commit bedeb6c
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 15 deletions.
6 changes: 3 additions & 3 deletions docs/using_task_cache_collision_lock.rst
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ This setting must be done to each gokart task which you want to lock the ``run()
...
3. Set ``redis_fail_on_collision`` parameter to true.
3. Set ``raise_task_lock_exception_on_collision`` parameter to true.

This parameter will affect the behavior when the task's lock is taken by other applications or nodes.
Setting ``redis_fail_on_collision=True`` will make the task to be failed if the task's lock is taken by others.
Setting ``raise_task_lock_exception_on_collision=True`` will make the task to be failed if the task's lock is taken by others.

The parameter can be set by config file.

Expand All @@ -99,7 +99,7 @@ This setting must be done to each gokart task which you want to lock the ``run()
[TaskOnKart]
redis_host=localhost
redis_port=6379
redis_fail_on_collision=true
raise_task_lock_exception_on_collision=true
4. Set retry parameters

Expand Down
8 changes: 4 additions & 4 deletions gokart/redis_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class RedisParams(NamedTuple):
redis_timeout: Optional[int]
redis_key: str
should_redis_lock: bool
redis_fail_on_collision: bool
raise_task_lock_exception_on_collision: bool
lock_extend_seconds: int


Expand Down Expand Up @@ -50,7 +50,7 @@ def _extend_lock(redis_lock: redis.lock.Lock, redis_timeout: int):

def _set_redis_lock(redis_params: RedisParams) -> redis.lock.Lock:
redis_client = RedisClient(host=redis_params.redis_host, port=redis_params.redis_port).get_redis_client()
blocking = not redis_params.redis_fail_on_collision
blocking = not redis_params.raise_task_lock_exception_on_collision
redis_lock = redis.lock.Lock(redis=redis_client, name=redis_params.redis_key, timeout=redis_params.redis_timeout, thread_local=False)
if not redis_lock.acquire(blocking=blocking):
raise TaskLockException('Lock already taken by other task.')
Expand Down Expand Up @@ -168,7 +168,7 @@ def make_redis_params(file_path: str,
redis_host: Optional[str] = None,
redis_port: Optional[int] = None,
redis_timeout: Optional[int] = None,
redis_fail_on_collision: bool = False,
raise_task_lock_exception_on_collision: bool = False,
lock_extend_seconds: int = 10):
redis_key = make_redis_key(file_path, unique_id)
should_redis_lock = redis_host is not None and redis_port is not None
Expand All @@ -179,6 +179,6 @@ def make_redis_params(file_path: str,
redis_key=redis_key,
should_redis_lock=should_redis_lock,
redis_timeout=redis_timeout,
redis_fail_on_collision=redis_fail_on_collision,
raise_task_lock_exception_on_collision=raise_task_lock_exception_on_collision,
lock_extend_seconds=lock_extend_seconds)
return redis_params
8 changes: 4 additions & 4 deletions gokart/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class TaskOnKart(luigi.Task):
redis_host = luigi.OptionalParameter(default=None, description='Task lock check is deactivated, when None.', significant=False)
redis_port = luigi.OptionalParameter(default=None, description='Task lock check is deactivated, when None.', significant=False)
redis_timeout = luigi.IntParameter(default=180, description='Redis lock will be released after `redis_timeout` seconds', significant=False)
redis_fail_on_collision: bool = luigi.BoolParameter(
raise_task_lock_exception_on_collision: bool = luigi.BoolParameter(
default=False,
description='True for failing the task immediately when the cache is locked, instead of waiting for the lock to be released',
significant=False)
Expand Down Expand Up @@ -176,7 +176,7 @@ def make_target(self, relative_file_path: Optional[str] = None, use_unique_id: b
redis_host=self.redis_host,
redis_port=self.redis_port,
redis_timeout=self.redis_timeout,
redis_fail_on_collision=self.redis_fail_on_collision)
raise_task_lock_exception_on_collision=self.raise_task_lock_exception_on_collision)
return gokart.target.make_target(file_path=file_path,
unique_id=unique_id,
processor=processor,
Expand All @@ -193,7 +193,7 @@ def make_large_data_frame_target(self, relative_file_path: Optional[str] = None,
redis_host=self.redis_host,
redis_port=self.redis_port,
redis_timeout=self.redis_timeout,
redis_fail_on_collision=self.redis_fail_on_collision)
raise_task_lock_exception_on_collision=self.raise_task_lock_exception_on_collision)
return gokart.target.make_model_target(file_path=file_path,
temporary_directory=self.local_temporary_directory,
unique_id=unique_id,
Expand Down Expand Up @@ -222,7 +222,7 @@ def make_model_target(self,
redis_host=self.redis_host,
redis_port=self.redis_port,
redis_timeout=self.redis_timeout,
redis_fail_on_collision=self.redis_fail_on_collision)
raise_task_lock_exception_on_collision=self.raise_task_lock_exception_on_collision)
return gokart.target.make_model_target(file_path=file_path,
temporary_directory=self.local_temporary_directory,
unique_id=unique_id,
Expand Down
8 changes: 4 additions & 4 deletions test/test_redis_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,13 +456,13 @@ def test_make_redis_params_with_valid_host(self):
redis_host='0.0.0.0',
redis_port='12345',
redis_timeout=180,
redis_fail_on_collision=False)
raise_task_lock_exception_on_collision=False)
expected = RedisParams(redis_host='0.0.0.0',
redis_port='12345',
redis_key='aaa_123',
should_redis_lock=True,
redis_timeout=180,
redis_fail_on_collision=False,
raise_task_lock_exception_on_collision=False,
lock_extend_seconds=10)
self.assertEqual(result, expected)

Expand All @@ -472,13 +472,13 @@ def test_make_redis_params_with_no_host(self):
redis_host=None,
redis_port='12345',
redis_timeout=180,
redis_fail_on_collision=False)
raise_task_lock_exception_on_collision=False)
expected = RedisParams(redis_host=None,
redis_port='12345',
redis_key='aaa_123',
should_redis_lock=False,
redis_timeout=180,
redis_fail_on_collision=False,
raise_task_lock_exception_on_collision=False,
lock_extend_seconds=10)
self.assertEqual(result, expected)

Expand Down

0 comments on commit bedeb6c

Please sign in to comment.