Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Cache eviction of past executions #504

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

MorpheusXAUT
Copy link
Contributor

TL;DR

This PR implements the second part of the cache eviction RFC, evicting stored data of a past (task) execution.
A new CacheService has been added for interacting with Flyte's cache via flyteadmin.

Type

  • Bug Fix
  • Feature
  • Plugin

Are all requirements met?

  • Code completed
  • Smoke tested
  • Unit tests added
  • Code documentation added
  • Any pending items have an associated Issue

Complete description

The new CacheService exposes two endpoints, EvictExecutionCache and EvictTaskExecutionCache. These two endpoints allow for the respective (already completed) execution's cached output data to be removed from datacatalog.
flyteadmin retrieves the stored NodeExecutions and their associated TaskExecutions from its database, traversing the executions and evicting the cached data as indicated by the serialized TaskNodeMetadata.
flyteadmin acquires a reservation for the cached data from datacatalog before updating the NodeExecution's metadata/closure and removing the artifact data from datacatalog.
Both cache eviction endpoints share mostly the same implementation and are developed to be idempotent - evicting the cache of an execution without any stored data still completes successfully. Partial failures are supported, if some task's cache can't be evicted, the rest of the deletion will still continue and an appropriate error is returned to the client. The client is expected to display any encountered errors to the user accordingly and prompt for a retry (already evicted data will be skipped when retrying).

I've tested cache eviction with some basic cached workflows available in the flytesnacks repository, a demo-workflow created in a flytepropeller PR as well as a slightly more complex company-internal workflow. Additionally, I've tried to cover as many aspects as possible with unit tests.

The PR was created as a draft until the flyteidl, flyteplugins, flytepropeller and flytestdlib PRs have been merged and respective versions have been published.

Tracking Issue

flyteorg/flyte#2867

Follow-up issue

NA

@codecov
Copy link

codecov bot commented Dec 15, 2022

Codecov Report

Merging #504 (650613d) into master (d2215ed) will decrease coverage by 0.17%.
The diff coverage is 54.72%.

@@            Coverage Diff             @@
##           master     #504      +/-   ##
==========================================
- Coverage   60.07%   59.90%   -0.18%     
==========================================
  Files         168      169       +1     
  Lines       14997    15388     +391     
==========================================
+ Hits         9010     9218     +208     
- Misses       5196     5383     +187     
+ Partials      791      787       -4     
Flag Coverage Δ
unittests 59.90% <54.72%> (-0.18%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
pkg/manager/impl/util/shared.go 43.80% <0.00%> (-21.72%) ⬇️
pkg/repositories/gormimpl/node_execution_repo.go 64.19% <0.00%> (-3.34%) ⬇️
pkg/manager/impl/cache_manager.go 71.46% <71.46%> (ø)
pkg/manager/impl/node_execution_manager.go 70.96% <94.11%> (+1.03%) ⬆️
pkg/manager/impl/task_execution_manager.go 73.94% <100.00%> (+4.66%) ⬆️

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

Nick Müller added 5 commits January 4, 2023 15:36
Allows for fields to be explicity set/updated to nil

Signed-off-by: Nick Müller <nmueller@blackshark.ai>
Allows for re-use by cache manager

Signed-off-by: Nick Müller <nmueller@blackshark.ai>
Added endpoint for evicting execution cache
Added endpoint for evicting task execution cache

Signed-off-by: Nick Müller <nmueller@blackshark.ai>
Signed-off-by: Nick Müller <nmueller@blackshark.ai>
…epropeller and flytestdlib

Signed-off-by: Nick Müller <nmueller@blackshark.ai>
@MorpheusXAUT MorpheusXAUT force-pushed the cache-eviction-past-executions branch from 67e4383 to 650613d Compare January 4, 2023 15:33
@MorpheusXAUT
Copy link
Contributor Author

Rebased onto current master branch.

@pmahindrakar-oss
Copy link
Contributor

pmahindrakar-oss commented Jan 6, 2023

@MorpheusXAUT thanks for making the PR for this RFC. This PR makes multiple calls to datacatalog services to clear cache. Would be preferable if we could batch these calls by introducing a batch API in datacatalog to do the same.
Also may be not much of concern right now since this API is not on critical path, but this API could take a while if datacatcatalog takes a long time in any situation and cause grpc deadline expiry/timeouts. May be an async api would be useful in such a case

Let us know what you think. cc: @hamersaw @katrogan

@MorpheusXAUT
Copy link
Contributor Author

@MorpheusXAUT thanks for making the PR for this RFC. This PR makes multiple calls to datacatalog services to clear cache. Would be preferable if we could batch these calls by introducing a batch API in datacatalog to do the same.

Can do if we want to reduce individual API requests. Would you rather keep the current (non-bulk) new endpoint around as well or have me replace it completely with a bulk one?

Also may be not much of concern right now since this API is not on critical path, but this API could take a while if datacatcatalog takes a long time in any situation and cause grpc deadline expiry/timeouts. May be an async api would be useful in such a case

I was a bit afraid of timeouts as well, especially given large workflows, however as we've originally agreed on propagating results back to the user, that was the first I've come up with.
If we wanted to do it async, we'd probably need to assign some sort of "eviction ID" that clients could then poll to retrieve the status (requiring state to be tracked in a DB somewhere as well), unless there's some other way do keep it async and the user in the loop you could think of 🤔

@pmahindrakar-oss
Copy link
Contributor

pmahindrakar-oss commented Jan 9, 2023

Can do if we want to reduce individual API requests. Would you rather keep the current (non-bulk) new endpoint around as well or have me replace it completely with a bulk one?

Eventually using the bulk one would be what we should encourage the API users to use. If its maintainable we can keep the non-bulk version aswell but have a note in the API definition to use bulk version in most of the cases. Also do you think any reason someone would prefer to use bulk versus non-bulk

I was a bit afraid of timeouts as well, especially given large workflows, however as we've originally flyteorg/flyte#2633 (comment) on propagating results back to the user, that was the first I've come up with.
If we wanted to do it async, we'd probably need to assign some sort of "eviction ID" that clients could then poll to retrieve the status (requiring state to be tracked in a DB somewhere as well), unless there's some other way do keep it async and the user in the loop you could think of

With multiple service calls being involved in single API, i am afraid that some client calls could timeout where some others would still be ongoing unless we cleanly handle context cancellation if the parent api call times out. i am assuming this is handled by context passing automatically but just checking what happens in such cases.

eg : the api caller wait for 15 secs and timesout whereas the api implementations is still waiting on results from the downstream service may be with higher api timeout(datacatalog in our case for cache cleanup)

Also I think we dont have an infra to setup async flyteadmin api tasks and hence what you are suggesting would be using such a solution . Adding @katrogan to see if we ever designed for such usecases

@MorpheusXAUT
Copy link
Contributor Author

Eventually using the bulk one would be what we should encourage the API users to use. If its maintainable we can keep the non-bulk version aswell but have a note in the API definition to use bulk version in most of the cases. Also do you think any reason someone would prefer to use bulk versus non-bulk

Since both endpoints would be new, I think we could even remove the (not yet released) non-bulk version if we want to encourage users to use the bulk one anyways.
Should be relatively easy to maintain though, since we can just internally create a slice with one entry and re-use the bulk code.

With multiple service calls being involved in single API, i am afraid that some client calls could timeout where some others would still be ongoing unless we cleanly handle context cancellation if the parent api call times out. i am assuming this is handled by context passing automatically but just checking what happens in such cases.

That's a good point. I believe everything we use should support cancellation properly, however I haven't tested that myself either. The request context is being passed to all relevant parts though, so if they support it, it should happen.

Also I think we dont have an infra to setup async flyteadmin api tasks and hence what you are suggesting would be using such a solution . Adding @katrogan to see if we ever designed for such usecases

I didn't come across anything similar to what I described so far, but I'm also happy for suggestions on how to best implement that - the task ID/polling idea is just the first thing that came to mind.

@pmahindrakar-oss
Copy link
Contributor

pmahindrakar-oss commented Jan 12, 2023

Sounds good @MorpheusXAUT . Its ok to keep both and have it internally use the new bulk api of datacatalog.

We can check the behavior for timeouts through probably any golang client(flytectl by calling this new api) and setting context.WithTimeout and checking the behavior of the child rpc call if its getting cancelled too . It will probably leave the clearing of the cache half way but if the bulk api is idempotent then it would just continue from where it left off earlier in the next retry of the api

Regarding the design on async flyteadmin api tasks , we can do that as a followup cc : @katrogan

@MorpheusXAUT
Copy link
Contributor Author

MorpheusXAUT commented Jan 12, 2023

@pmahindrakar-oss Alright, sounds good to me, will add a bulk endpoint to datacatalog as a start, should hopefully be able to do so by the start of next week.
I'll also do some timeout testing, but I believe your assumptions are right. The current implementation already has idempotency, so adding that to the bulk endpoint should not be hard and allow us to easily continue cache eviction in case we run into timeouts.

Regarding the async implementation - happy to discuss further on how to best implement this. Is this something we want to absolutely have for the first version, or something we could improve in a future update to make the process more resilient?
@hamersaw also suggested the idea of offloading work to the clients, iterating the cached data in console/flytectl instead of in admin, which would make timeouts less of an issue, but requires duplicated work to implement traversing the workflow to find cached data on each client separately...

@katrogan
Copy link
Contributor

+1 for deferring the async implementation, I think this would be great to have down the road, especially if we can make it generalizable for other async actions like egress events (which are currently lossy) - having this functionality would be broadly useful for flyteadmin. Since the batch call should ideally be retryable in case of failures this seems like a good compromise on performance for now

@MorpheusXAUT
Copy link
Contributor Author

MorpheusXAUT commented Jan 19, 2023

@pmahindrakar-oss @katrogan Wanted to check back with you real quick before I rewrite this to use the bulk implementation of datacatalog endpoints.
Aside from bulk-deleting artifacts, I've also added bulk-acquiring and -releasing reservations to datacatalog: flyteorg/flyteidl@e9fb728#diff-3a79245dec4421c9ab4792e66d499076393dee184f774ff537084c55cbf7bb6d
flyteorg/datacatalog@2e80fba
Note that these bulk-operations are not implemented using transactions as that would require quite some changes to the gormrepo implementations.

flyteadmin would then continue walking the workflow and build a list of artifacts to:

  1. bulk-acquire reservation for
  2. filter out artifacts a reservation could not be acquired for, add them to the errors list
  3. bulk-delete artifacts
  4. bulk-release reservations

Do we still want to delete the database models before (while traversing the workflow), or should this be handled in the bulk-operation at the end as well?
Removing them while iterating would be a bit easier since all required information is already right there, but might lead to some DB/datacatalog inconsistencies if the artifact delete bulk-operation fails afterwards.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants