Skip to content
/ agent Public

A thread-safe wrapper for stateful objects

License

Notifications You must be signed in to change notification settings

lbarasti/agent

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

7 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

GitHub release Build Status License Docs

agent

An Agent is a wrapper that makes it thread-safe to share object references across your application, without having to worry about concurrent access and destructive assignment.

It's an ideal companion for immutable data structures, but promises thread-safe access and modifications on mutable objects, too, provided that the user only manipulates state within an Agent's methods.

Installation

  • Add the dependency to your shard.yml:
dependencies:
  agent:
    github: lbarasti/agent
  • Run shards install

Usage

Let's define an Agent wrapping a hash.

require "agent"

concurrent_hash = Agent.new({"a" => 1, "c" => 41})

We can now traverse the hash safely with Agent.get.

concurrent_hash.get { |h| h["a"] } # => 1

Everything we do within the block passed to Agent#get is guaranteed to be thread-safe and consistent.

To update the hash in a thread-safe fashion we call Agent#update.

concurrent_hash.update { |h|
  h["b"] = 12
  h
} # => Agent::Result::Submitted

The block passed to Agent#update will run asynchronously, but any calls following it are guaranteed to see the updated version of the wrapped object - provided that the update was successful.

concurrent_hash.get { |h| h["b"] } # => 12

If you want synchronously fetch and update the state of the agent, then Agent#get_and_update will serve your purpose.

concurrent_hash.get_and_update { |h|
  old_b = h["b"]
  h["b"] = old_b + 1
  {h["b"] , h}
} # => 13

Agent#get_and_update expects a block of type

Hash(String, Int32) -> {Q, Hash(String, Int32)}

where Q is a generic type and is the type of the returned value. This means you can return any transformation of the current Agent's state and alter the state in a single pass.

Error handling and timeouts

Errors are handled within the Agent, and surfaced as Agent::Result::Error values. For example, if we try to fetch a value for a non existing key, the KeyError exception turns into an Agent::Result::Error.

concurrent_hash.get { |h| h["non-existing"] } # => Agent::Result::Error

If you'd rather deal with the exception yourself, check out the ! variant of Agent's getter methods.

concurrent_hash.get! { |h| h["non-existing"] } # raises Exception("Error")

In order to give responsiveness guarantees to the client's code, Agent's operations support timing out. The default timeout is 5 seconds, but you can pass a custom timeout on each operation.

concurrent_hash.get {
  sleep 3.seconds # simulates a time consuming operation
}

concurrent_hash.get(max_wait: 1.second) { |h|
  h["b"]
} # => Agent::Result::Timeout

concurrent_hash.get!(max_wait: 1.second) { |h|
  h["b"]
} # raises Exception("Timeout")

Agents in multi-threaded runtime

As of Crystal 0.34.0, by default, your code will be compiled to run on a single thread. In this scenario, using Agents still makes sense if you access or modify objects from different fibers. If that's not the case, then the only perk of adopting Agents is that your code will be future proof.

To see how multi-threading and concurrency can break the correctness of your application, think about the behaviour of the following code, where we spawn 10 fibers, and each one concurrently updates the value of a counter 1024 times.

done = Channel(Nil).new(10)
counter = 0
(1..10).each {
  spawn {
    (1..1024).each {
      counter += 1
    }
    done.send nil
  }
}
10.times { done.receive }
puts counter # => ?

Or just check out this repository and run

crystal build -Dpreview_mt examples/breaking_counter.cr
CRYSTAL_WORKERS=4 ./breaking_counter

You'll notice unpredictable results in the final count.

We can fix this with an Agent.

done = Channel(Nil).new(10)
counter = Agent.new(0)

(1..10).each {
  spawn {
    (1..1024).each {
      counter.update { |x| x + 1 }
    }
    done.send nil
  }
}
10.times { done.receive }
puts counter.get # => 1024 * 10

Now the final value for counter will always equal 10240, no matter the number of runtime threads.

FAQ

How does this differ from Atomic(T) (docs)?

Only primitive integer types, reference types or nilable reference types can be used with an Atomic type.
On the other hand, you can wrap any type in an Agent.

Are Agent updates atomic, i.e. do either all the instructions in a block take effect or none of it does?

No, atomicity is not guaranteed. In particular, if an exception is raised within a given get / update block, then any side-effecting operation preceding the exception will not be reverted.

Relying on immutable data structures and avoiding side-effects in Agent's get / update operations are good mitigations for the lack of atomicity.

My code uses immutable data structures such as these ones. Are these not thread-safe by definition?

Immutable data structures are thread-safe in the sense that you can safely access them from different fibers, but they are subject to the so-called lost update problem, where changes made by a fiber will not be recorded by another one - think of the case where multiple fibers close over the same variable, and then destructively assign values to such variable, concurrently.

You can dodge the lost-update bullet by making sure that all the updates to your immutable data structure happen in a single fiber, but that's not always possible or desirable. Furthermore, you might still have to implement custom logic to ensure transactionality - think of the scenario where a fiber wants to increment a counter by 1, but first has to fetch the current value of the counter. In a parallel¹ execution, the counter might change between the fetch and the set statement

¹ Things will be fine in a concurrent but not parallel execution, as the fiber will not yield control until after the update.

Development

Just check out the repository and run crystal spec to run the tests.

Contributing

  1. Fork it (https://github.com/lbarasti/agent/fork)
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create a new Pull Request

Contributors