Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TTL to Operator RPC Client #259

Closed
wants to merge 34 commits into from
Closed

Conversation

emlautarom1
Copy link
Contributor

@emlautarom1 emlautarom1 commented Jun 20, 2024

Current Behavior

Currently, failed messages are stored in a queue to be processed later by a background thread. The current design does not take into consideration the age of the message, meaning that old messages can still linger around for a lot of time. See #212 for more details

New Behavior

This PR completely changes the design of the RPC client:

  • The client requires a RpcClient that performs the actual RPC call. Go's native rpc.Client implements this interface.
  • Instead of using a background thread to process failed messages in a queue, the client will block while trying to send each message, applying a provided retry strategy.
  • The provided default strategy retries messages at most 10 times, with a delay of 2 seconds between each call, dropping messages older than 24 hrs (this is an arbitrary, very conservative value to be discussed)

The reasoning behind this changes is as follows:

  • Usages of the client are usually in the form of "fire and forget" (ex. go client.SendX), so we can handle retrying on each individual goroutine without requiring a background goroutine.
  • Due to this usage, my understanding is that the order of messages is not relevant, thus it makes no sense to enforce it (ex. if two parallel goroutines use the client there is no guarantee to which call will be executed first).
  • The original design had the retry mechanism deeply embedded, while this design allows for extension without modification.
  • Since we don't change users of the client (ie. we preserve all "fire and forget"s), we're still at risk of unbounded memory usage (the old design had this property + an unbounded queue of messages), though a reasonable retry strategy should mitigate this issue.

Breaking Changes

In case order of messages is a requirement, then this is a breaking change since now we definitely not guarantee it (the old design might have tried (unsuccessfully) to guarantee it).

Since now we're adding TTL to messages, messages that used to be sent with large delays now will be dropped altogether.

As usual with refactors of this kind, there might be other implicit details that have been unintentionally changed so thorough review is required.

Comment on lines +176 to +177
// TODO: We never close `httpRpcClient`
httpRpcClient, err := NewHTTPAggregatorRpcClient(c.AggregatorServerIpPortAddress, operatorId, registryCoordinatorAddress, logger)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Up to discussion: how do we want to handle this dependency, in particular closing it?

listener RpcClientEventListener
type RpcClient interface {
Call(serviceMethod string, args any, reply any) error
// TODO: Do we also want a `Close` method?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm against the idea of adding a Close method here, but it would allow us to add a Close method to the AggregatorRpcClient such that it would close the provided RpcClient. Ideally, a component should not be responsible for closing the provided dependencies, but it could be a quick & dirty solution.

Comment on lines +220 to +222
nodeConfig, _, _ := genOperatorConfig(t, ctx, "3", mainnetAnvil, rollupAnvils, rabbitMq)
operator := startOperator(t, ctx, nodeConfig)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had to change the order of initialization in the integration test: if the operator cannot connect to the aggregator on start it will quickly crash. Note that this has no effect during processing: if the connection to the aggregator fails then requests will be retried according to a provided strategy

Comment on lines 94 to 101
// By defaul, retry with a delay of 2 seconds between calls,
// at most 10 times, and only if the error is recent enough (24 hours)
// TODO: Discuss the "recent enough" part
func DefaultAggregatorRpcRetry() RetryStrategy {
return RetryAnd(
RetryWithDelay(2*time.Second), RetryAnd(
RetryAtMost(10),
RetryIfRecentEnough(24*time.Hour)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be a reasonable TTL for messages? This is a very conservative default just to start the discussion.

@taco-paco
Copy link
Contributor

taco-paco commented Jun 21, 2024

I guess let me first start with current design & misunderstandings regarding it, and then I will share my opinion about your approach.

There were some evolutions since first rework but I will describe current one:
Operator makes a call to Client on one of SendSigned* methods.
We get to sendOperatorMessage. If rpcClient isn't initialized in onTick yet, we add message to queue. If we have client we move to RPC call. In case of error we check If client was ShutDown and if it was we drop it so it can be reInited in onTick. Then we add the unsuccessful message to the queue.
And here its important moment for us that will find continuation in your implementation:
this is it, we drop and free goroutine. After that the resending is a problem of an onTick method which is a single goroutine.

Now we come to this part

The current design does not take into consideration the age of the message

We were aware of this problem and in one of the editions @Hyodar introduced message expiration here. So the message is a candidate to resend only 10 times.

Now about this proposal and its important flaw, which is a reason why in my opinion we can't proceed with it. It pretty much comes down to this part in SendSigned* methods.

	for err != nil && shouldRetry(submittedAt, err) {
		a.listener.IncErroredCheckpointSubmissions(retried)
		err = action()
		retried = true
	}

Instead of one goroutine trying to resend a message once in ResendInterval we keep all goroutines alive for the time of (max) 20 seconds while they trying to resend the message. Meanwhile new messages to operator keep coming thus creating even more goruotines in case aggregator is down. This solution is not scalable and very resource demanding especially with the fast TX rate from multiple rollups that we eager to support unlike the current implementation described above.

Overall I'm open to discussions here but would like also hear @Hyodar opinion on this radical change and would propose to wait with this PR until then.

Copy link
Contributor

@taco-paco taco-paco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to the comment above

@emlautarom1
Copy link
Contributor Author

emlautarom1 commented Jun 21, 2024

this is it, we drop and free goroutine. After that the resending is a problem of an onTick method which is a single goroutine.

Instead of one goroutine trying to resend a message once in ResendInterval we keep all goroutines alive for the time of (max) 20 seconds while they trying to resend the message. Meanwhile new messages to operator keep coming thus creating even more goruotines in case aggregator is down. This solution is not scalable and very resource demanding especially with the fast TX rate from multiple rollups that we eager to support unlike the current implementation described above.

Appreciate the feedback. This is mentioned in the PR description but I believe is mostly a tradeoff:

  • In the current design, the unsentMessages queue grows unbounded, and we retry messages one at a time
  • In the proposed designs, the number of goroutines grows unbounded although each has a fixed size, and we retry sending messages independently of each other.

We were aware of this problem and in one of the editions @Hyodar introduced message expiration here. So the message is a candidate to resend only 10 times.

It was my understanding that #212 was asking for three retry conditions: Try at most 10 times with a timeout in between of each retry of 2 seconds while the messages are recent enough (TTL). Note that TTL != (Retry Count * Timeout) since a request could take several seconds to complete. If this is not correct, that is, TTL is already implemented as RetryCount * Timeout, then this PR is not required.

@Hyodar
Copy link
Contributor

Hyodar commented Jul 1, 2024

I don't particularly dislike this - the scalability thing is concerning but maybe not that much, would need a benchmark for that -, but I think it's not necessary indeed and the current solution is already more robust in this sense.

In #212 I already described and suggested solutions - IMO for the operator we just need to tidy up the retry mechanism a bit, maybe, and use a queue that already drops expired messages automatically, and for the aggregator just ignore old messages and fix the timing check. Those are simple changes and should be tackled before anything more complex.

@emlautarom1
Copy link
Contributor Author

In #212 I already described and suggested solutions - IMO for the operator we just need to tidy up the retry mechanism a bit, maybe, and use a queue that already drops expired messages automatically

Maybe I'm missing something but we're already dropping messages after 10 retries with 2 second delays in between. Do you suggest to add an additional goroutine that periodically inspects the queue and removes messages that are too old?

for the aggregator just ignore old messages and fix the timing check

Moving the discussion regarding the aggregator back to #212.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants