588 lines
22 KiB
Ruby
588 lines
22 KiB
Ruby
require 'concurrent/configuration'
|
|
require 'concurrent/atomic/atomic_reference'
|
|
require 'concurrent/atomic/thread_local_var'
|
|
require 'concurrent/collection/copy_on_write_observer_set'
|
|
require 'concurrent/concern/observable'
|
|
require 'concurrent/synchronization'
|
|
|
|
module Concurrent
|
|
|
|
# `Agent` is inspired by Clojure's [agent](http://clojure.org/agents)
|
|
# function. An agent is a shared, mutable variable providing independent,
|
|
# uncoordinated, *asynchronous* change of individual values. Best used when
|
|
# the value will undergo frequent, complex updates. Suitable when the result
|
|
# of an update does not need to be known immediately. `Agent` is (mostly)
|
|
# functionally equivalent to Clojure's agent, except where the runtime
|
|
# prevents parity.
|
|
#
|
|
# Agents are reactive, not autonomous - there is no imperative message loop
|
|
# and no blocking receive. The state of an Agent should be itself immutable
|
|
# and the `#value` of an Agent is always immediately available for reading by
|
|
# any thread without any messages, i.e. observation does not require
|
|
# cooperation or coordination.
|
|
#
|
|
# Agent action dispatches are made using the various `#send` methods. These
|
|
# methods always return immediately. At some point later, in another thread,
|
|
# the following will happen:
|
|
#
|
|
# 1. The given `action` will be applied to the state of the Agent and the
|
|
# `args`, if any were supplied.
|
|
# 2. The return value of `action` will be passed to the validator lambda,
|
|
# if one has been set on the Agent.
|
|
# 3. If the validator succeeds or if no validator was given, the return value
|
|
# of the given `action` will become the new `#value` of the Agent. See
|
|
# `#initialize` for details.
|
|
# 4. If any observers were added to the Agent, they will be notified. See
|
|
# `#add_observer` for details.
|
|
# 5. If during the `action` execution any other dispatches are made (directly
|
|
# or indirectly), they will be held until after the `#value` of the Agent
|
|
# has been changed.
|
|
#
|
|
# If any exceptions are thrown by an action function, no nested dispatches
|
|
# will occur, and the exception will be cached in the Agent itself. When an
|
|
# Agent has errors cached, any subsequent interactions will immediately throw
|
|
# an exception, until the agent's errors are cleared. Agent errors can be
|
|
# examined with `#error` and the agent restarted with `#restart`.
|
|
#
|
|
# The actions of all Agents get interleaved amongst threads in a thread pool.
|
|
# At any point in time, at most one action for each Agent is being executed.
|
|
# Actions dispatched to an agent from another single agent or thread will
|
|
# occur in the order they were sent, potentially interleaved with actions
|
|
# dispatched to the same agent from other sources. The `#send` method should
|
|
# be used for actions that are CPU limited, while the `#send_off` method is
|
|
# appropriate for actions that may block on IO.
|
|
#
|
|
# Unlike in Clojure, `Agent` cannot participate in `Concurrent::TVar` transactions.
|
|
#
|
|
# ## Example
|
|
#
|
|
# ```
|
|
# def next_fibonacci(set = nil)
|
|
# return [0, 1] if set.nil?
|
|
# set + [set[-2..-1].reduce{|sum,x| sum + x }]
|
|
# end
|
|
#
|
|
# # create an agent with an initial value
|
|
# agent = Concurrent::Agent.new(next_fibonacci)
|
|
#
|
|
# # send a few update requests
|
|
# 5.times do
|
|
# agent.send{|set| next_fibonacci(set) }
|
|
# end
|
|
#
|
|
# # wait for them to complete
|
|
# agent.await
|
|
#
|
|
# # get the current value
|
|
# agent.value #=> [0, 1, 1, 2, 3, 5, 8]
|
|
# ```
|
|
#
|
|
# ## Observation
|
|
#
|
|
# Agents support observers through the {Concurrent::Observable} mixin module.
|
|
# Notification of observers occurs every time an action dispatch returns and
|
|
# the new value is successfully validated. Observation will *not* occur if the
|
|
# action raises an exception, if validation fails, or when a {#restart} occurs.
|
|
#
|
|
# When notified the observer will receive three arguments: `time`, `old_value`,
|
|
# and `new_value`. The `time` argument is the time at which the value change
|
|
# occurred. The `old_value` is the value of the Agent when the action began
|
|
# processing. The `new_value` is the value to which the Agent was set when the
|
|
# action completed. Note that `old_value` and `new_value` may be the same.
|
|
# This is not an error. It simply means that the action returned the same
|
|
# value.
|
|
#
|
|
# ## Nested Actions
|
|
#
|
|
# It is possible for an Agent action to post further actions back to itself.
|
|
# The nested actions will be enqueued normally then processed *after* the
|
|
# outer action completes, in the order they were sent, possibly interleaved
|
|
# with action dispatches from other threads. Nested actions never deadlock
|
|
# with one another and a failure in a nested action will never affect the
|
|
# outer action.
|
|
#
|
|
# Nested actions can be called using the Agent reference from the enclosing
|
|
# scope or by passing the reference in as a "send" argument. Nested actions
|
|
# cannot be post using `self` from within the action block/proc/lambda; `self`
|
|
# in this context will not reference the Agent. The preferred method for
|
|
# dispatching nested actions is to pass the Agent as an argument. This allows
|
|
# Ruby to more effectively manage the closing scope.
|
|
#
|
|
# Prefer this:
|
|
#
|
|
# ```
|
|
# agent = Concurrent::Agent.new(0)
|
|
# agent.send(agent) do |value, this|
|
|
# this.send {|v| v + 42 }
|
|
# 3.14
|
|
# end
|
|
# agent.value #=> 45.14
|
|
# ```
|
|
#
|
|
# Over this:
|
|
#
|
|
# ```
|
|
# agent = Concurrent::Agent.new(0)
|
|
# agent.send do |value|
|
|
# agent.send {|v| v + 42 }
|
|
# 3.14
|
|
# end
|
|
# ```
|
|
#
|
|
# @!macro agent_await_warning
|
|
#
|
|
# **NOTE** Never, *under any circumstances*, call any of the "await" methods
|
|
# ({#await}, {#await_for}, {#await_for!}, and {#wait}) from within an action
|
|
# block/proc/lambda. The call will block the Agent and will always fail.
|
|
# Calling either {#await} or {#wait} (with a timeout of `nil`) will
|
|
# hopelessly deadlock the Agent with no possibility of recovery.
|
|
#
|
|
# @!macro thread_safe_variable_comparison
|
|
#
|
|
# @see http://clojure.org/Agents Clojure Agents
|
|
# @see http://clojure.org/state Values and Change - Clojure's approach to Identity and State
|
|
class Agent < Synchronization::LockableObject
|
|
include Concern::Observable
|
|
|
|
ERROR_MODES = [:continue, :fail].freeze
|
|
private_constant :ERROR_MODES
|
|
|
|
AWAIT_FLAG = ::Object.new
|
|
private_constant :AWAIT_FLAG
|
|
|
|
AWAIT_ACTION = ->(value, latch) { latch.count_down; AWAIT_FLAG }
|
|
private_constant :AWAIT_ACTION
|
|
|
|
DEFAULT_ERROR_HANDLER = ->(agent, error) { nil }
|
|
private_constant :DEFAULT_ERROR_HANDLER
|
|
|
|
DEFAULT_VALIDATOR = ->(value) { true }
|
|
private_constant :DEFAULT_VALIDATOR
|
|
|
|
Job = Struct.new(:action, :args, :executor, :caller)
|
|
private_constant :Job
|
|
|
|
# Raised during action processing or any other time in an Agent's lifecycle.
|
|
class Error < StandardError
|
|
def initialize(message = nil)
|
|
message ||= 'agent must be restarted before jobs can post'
|
|
super(message)
|
|
end
|
|
end
|
|
|
|
# Raised when a new value obtained during action processing or at `#restart`
|
|
# fails validation.
|
|
class ValidationError < Error
|
|
def initialize(message = nil)
|
|
message ||= 'invalid value'
|
|
super(message)
|
|
end
|
|
end
|
|
|
|
# The error mode this Agent is operating in. See {#initialize} for details.
|
|
attr_reader :error_mode
|
|
|
|
# Create a new `Agent` with the given initial value and options.
|
|
#
|
|
# The `:validator` option must be `nil` or a side-effect free proc/lambda
|
|
# which takes one argument. On any intended value change the validator, if
|
|
# provided, will be called. If the new value is invalid the validator should
|
|
# return `false` or raise an error.
|
|
#
|
|
# The `:error_handler` option must be `nil` or a proc/lambda which takes two
|
|
# arguments. When an action raises an error or validation fails, either by
|
|
# returning false or raising an error, the error handler will be called. The
|
|
# arguments to the error handler will be a reference to the agent itself and
|
|
# the error object which was raised.
|
|
#
|
|
# The `:error_mode` may be either `:continue` (the default if an error
|
|
# handler is given) or `:fail` (the default if error handler nil or not
|
|
# given).
|
|
#
|
|
# If an action being run by the agent throws an error or doesn't pass
|
|
# validation the error handler, if present, will be called. After the
|
|
# handler executes if the error mode is `:continue` the Agent will continue
|
|
# as if neither the action that caused the error nor the error itself ever
|
|
# happened.
|
|
#
|
|
# If the mode is `:fail` the Agent will become {#failed?} and will stop
|
|
# accepting new action dispatches. Any previously queued actions will be
|
|
# held until {#restart} is called. The {#value} method will still work,
|
|
# returning the value of the Agent before the error.
|
|
#
|
|
# @param [Object] initial the initial value
|
|
# @param [Hash] opts the configuration options
|
|
#
|
|
# @option opts [Symbol] :error_mode either `:continue` or `:fail`
|
|
# @option opts [nil, Proc] :error_handler the (optional) error handler
|
|
# @option opts [nil, Proc] :validator the (optional) validation procedure
|
|
def initialize(initial, opts = {})
|
|
super()
|
|
synchronize { ns_initialize(initial, opts) }
|
|
end
|
|
|
|
# The current value (state) of the Agent, irrespective of any pending or
|
|
# in-progress actions. The value is always available and is non-blocking.
|
|
#
|
|
# @return [Object] the current value
|
|
def value
|
|
@current.value # TODO (pitr 12-Sep-2015): broken unsafe read?
|
|
end
|
|
|
|
alias_method :deref, :value
|
|
|
|
# When {#failed?} and {#error_mode} is `:fail`, returns the error object
|
|
# which caused the failure, else `nil`. When {#error_mode} is `:continue`
|
|
# will *always* return `nil`.
|
|
#
|
|
# @return [nil, Error] the error which caused the failure when {#failed?}
|
|
def error
|
|
@error.value
|
|
end
|
|
|
|
alias_method :reason, :error
|
|
|
|
# @!macro agent_send
|
|
#
|
|
# Dispatches an action to the Agent and returns immediately. Subsequently,
|
|
# in a thread from a thread pool, the {#value} will be set to the return
|
|
# value of the action. Action dispatches are only allowed when the Agent
|
|
# is not {#failed?}.
|
|
#
|
|
# The action must be a block/proc/lambda which takes 1 or more arguments.
|
|
# The first argument is the current {#value} of the Agent. Any arguments
|
|
# passed to the send method via the `args` parameter will be passed to the
|
|
# action as the remaining arguments. The action must return the new value
|
|
# of the Agent.
|
|
#
|
|
# * {#send} and {#send!} should be used for actions that are CPU limited
|
|
# * {#send_off}, {#send_off!}, and {#<<} are appropriate for actions that
|
|
# may block on IO
|
|
# * {#send_via} and {#send_via!} are used when a specific executor is to
|
|
# be used for the action
|
|
#
|
|
# @param [Array<Object>] args zero or more arguments to be passed to
|
|
# the action
|
|
# @param [Proc] action the action dispatch to be enqueued
|
|
#
|
|
# @yield [agent, value, *args] process the old value and return the new
|
|
# @yieldparam [Object] value the current {#value} of the Agent
|
|
# @yieldparam [Array<Object>] args zero or more arguments to pass to the
|
|
# action
|
|
# @yieldreturn [Object] the new value of the Agent
|
|
#
|
|
# @!macro send_return
|
|
# @return [Boolean] true if the action is successfully enqueued, false if
|
|
# the Agent is {#failed?}
|
|
def send(*args, &action)
|
|
enqueue_action_job(action, args, Concurrent.global_fast_executor)
|
|
end
|
|
|
|
# @!macro agent_send
|
|
#
|
|
# @!macro send_bang_return_and_raise
|
|
# @return [Boolean] true if the action is successfully enqueued
|
|
# @raise [Concurrent::Agent::Error] if the Agent is {#failed?}
|
|
def send!(*args, &action)
|
|
raise Error.new unless send(*args, &action)
|
|
true
|
|
end
|
|
|
|
# @!macro agent_send
|
|
# @!macro send_return
|
|
def send_off(*args, &action)
|
|
enqueue_action_job(action, args, Concurrent.global_io_executor)
|
|
end
|
|
|
|
alias_method :post, :send_off
|
|
|
|
# @!macro agent_send
|
|
# @!macro send_bang_return_and_raise
|
|
def send_off!(*args, &action)
|
|
raise Error.new unless send_off(*args, &action)
|
|
true
|
|
end
|
|
|
|
# @!macro agent_send
|
|
# @!macro send_return
|
|
# @param [Concurrent::ExecutorService] executor the executor on which the
|
|
# action is to be dispatched
|
|
def send_via(executor, *args, &action)
|
|
enqueue_action_job(action, args, executor)
|
|
end
|
|
|
|
# @!macro agent_send
|
|
# @!macro send_bang_return_and_raise
|
|
# @param [Concurrent::ExecutorService] executor the executor on which the
|
|
# action is to be dispatched
|
|
def send_via!(executor, *args, &action)
|
|
raise Error.new unless send_via(executor, *args, &action)
|
|
true
|
|
end
|
|
|
|
# Dispatches an action to the Agent and returns immediately. Subsequently,
|
|
# in a thread from a thread pool, the {#value} will be set to the return
|
|
# value of the action. Appropriate for actions that may block on IO.
|
|
#
|
|
# @param [Proc] action the action dispatch to be enqueued
|
|
# @return [Concurrent::Agent] self
|
|
# @see #send_off
|
|
def <<(action)
|
|
send_off(&action)
|
|
self
|
|
end
|
|
|
|
# Blocks the current thread (indefinitely!) until all actions dispatched
|
|
# thus far, from this thread or nested by the Agent, have occurred. Will
|
|
# block when {#failed?}. Will never return if a failed Agent is {#restart}
|
|
# with `:clear_actions` true.
|
|
#
|
|
# Returns a reference to `self` to support method chaining:
|
|
#
|
|
# ```
|
|
# current_value = agent.await.value
|
|
# ```
|
|
#
|
|
# @return [Boolean] self
|
|
#
|
|
# @!macro agent_await_warning
|
|
def await
|
|
wait(nil)
|
|
self
|
|
end
|
|
|
|
# Blocks the current thread until all actions dispatched thus far, from this
|
|
# thread or nested by the Agent, have occurred, or the timeout (in seconds)
|
|
# has elapsed.
|
|
#
|
|
# @param [Float] timeout the maximum number of seconds to wait
|
|
# @return [Boolean] true if all actions complete before timeout else false
|
|
#
|
|
# @!macro agent_await_warning
|
|
def await_for(timeout)
|
|
wait(timeout.to_f)
|
|
end
|
|
|
|
# Blocks the current thread until all actions dispatched thus far, from this
|
|
# thread or nested by the Agent, have occurred, or the timeout (in seconds)
|
|
# has elapsed.
|
|
#
|
|
# @param [Float] timeout the maximum number of seconds to wait
|
|
# @return [Boolean] true if all actions complete before timeout
|
|
#
|
|
# @raise [Concurrent::TimeoutError] when timout is reached
|
|
#
|
|
# @!macro agent_await_warning
|
|
def await_for!(timeout)
|
|
raise Concurrent::TimeoutError unless wait(timeout.to_f)
|
|
true
|
|
end
|
|
|
|
# Blocks the current thread until all actions dispatched thus far, from this
|
|
# thread or nested by the Agent, have occurred, or the timeout (in seconds)
|
|
# has elapsed. Will block indefinitely when timeout is nil or not given.
|
|
#
|
|
# Provided mainly for consistency with other classes in this library. Prefer
|
|
# the various `await` methods instead.
|
|
#
|
|
# @param [Float] timeout the maximum number of seconds to wait
|
|
# @return [Boolean] true if all actions complete before timeout else false
|
|
#
|
|
# @!macro agent_await_warning
|
|
def wait(timeout = nil)
|
|
latch = Concurrent::CountDownLatch.new(1)
|
|
enqueue_await_job(latch)
|
|
latch.wait(timeout)
|
|
end
|
|
|
|
# Is the Agent in a failed state?
|
|
#
|
|
# @see #restart
|
|
def failed?
|
|
!@error.value.nil?
|
|
end
|
|
|
|
alias_method :stopped?, :failed?
|
|
|
|
# When an Agent is {#failed?}, changes the Agent {#value} to `new_value`
|
|
# then un-fails the Agent so that action dispatches are allowed again. If
|
|
# the `:clear_actions` option is give and true, any actions queued on the
|
|
# Agent that were being held while it was failed will be discarded,
|
|
# otherwise those held actions will proceed. The `new_value` must pass the
|
|
# validator if any, or `restart` will raise an exception and the Agent will
|
|
# remain failed with its old {#value} and {#error}. Observers, if any, will
|
|
# not be notified of the new state.
|
|
#
|
|
# @param [Object] new_value the new value for the Agent once restarted
|
|
# @param [Hash] opts the configuration options
|
|
# @option opts [Symbol] :clear_actions true if all enqueued but unprocessed
|
|
# actions should be discarded on restart, else false (default: false)
|
|
# @return [Boolean] true
|
|
#
|
|
# @raise [Concurrent:AgentError] when not failed
|
|
def restart(new_value, opts = {})
|
|
clear_actions = opts.fetch(:clear_actions, false)
|
|
synchronize do
|
|
raise Error.new('agent is not failed') unless failed?
|
|
raise ValidationError unless ns_validate(new_value)
|
|
@current.value = new_value
|
|
@error.value = nil
|
|
@queue.clear if clear_actions
|
|
ns_post_next_job unless @queue.empty?
|
|
end
|
|
true
|
|
end
|
|
|
|
class << self
|
|
|
|
# Blocks the current thread (indefinitely!) until all actions dispatched
|
|
# thus far to all the given Agents, from this thread or nested by the
|
|
# given Agents, have occurred. Will block when any of the agents are
|
|
# failed. Will never return if a failed Agent is restart with
|
|
# `:clear_actions` true.
|
|
#
|
|
# @param [Array<Concurrent::Agent>] agents the Agents on which to wait
|
|
# @return [Boolean] true
|
|
#
|
|
# @!macro agent_await_warning
|
|
def await(*agents)
|
|
agents.each { |agent| agent.await }
|
|
true
|
|
end
|
|
|
|
# Blocks the current thread until all actions dispatched thus far to all
|
|
# the given Agents, from this thread or nested by the given Agents, have
|
|
# occurred, or the timeout (in seconds) has elapsed.
|
|
#
|
|
# @param [Float] timeout the maximum number of seconds to wait
|
|
# @param [Array<Concurrent::Agent>] agents the Agents on which to wait
|
|
# @return [Boolean] true if all actions complete before timeout else false
|
|
#
|
|
# @!macro agent_await_warning
|
|
def await_for(timeout, *agents)
|
|
end_at = Concurrent.monotonic_time + timeout.to_f
|
|
ok = agents.length.times do |i|
|
|
break false if (delay = end_at - Concurrent.monotonic_time) < 0
|
|
break false unless agents[i].await_for(delay)
|
|
end
|
|
!!ok
|
|
end
|
|
|
|
# Blocks the current thread until all actions dispatched thus far to all
|
|
# the given Agents, from this thread or nested by the given Agents, have
|
|
# occurred, or the timeout (in seconds) has elapsed.
|
|
#
|
|
# @param [Float] timeout the maximum number of seconds to wait
|
|
# @param [Array<Concurrent::Agent>] agents the Agents on which to wait
|
|
# @return [Boolean] true if all actions complete before timeout
|
|
#
|
|
# @raise [Concurrent::TimeoutError] when timout is reached
|
|
# @!macro agent_await_warning
|
|
def await_for!(timeout, *agents)
|
|
raise Concurrent::TimeoutError unless await_for(timeout, *agents)
|
|
true
|
|
end
|
|
end
|
|
|
|
private
|
|
|
|
def ns_initialize(initial, opts)
|
|
@error_mode = opts[:error_mode]
|
|
@error_handler = opts[:error_handler]
|
|
|
|
if @error_mode && !ERROR_MODES.include?(@error_mode)
|
|
raise ArgumentError.new('unrecognized error mode')
|
|
elsif @error_mode.nil?
|
|
@error_mode = @error_handler ? :continue : :fail
|
|
end
|
|
|
|
@error_handler ||= DEFAULT_ERROR_HANDLER
|
|
@validator = opts.fetch(:validator, DEFAULT_VALIDATOR)
|
|
@current = Concurrent::AtomicReference.new(initial)
|
|
@error = Concurrent::AtomicReference.new(nil)
|
|
@caller = Concurrent::ThreadLocalVar.new(nil)
|
|
@queue = []
|
|
|
|
self.observers = Collection::CopyOnNotifyObserverSet.new
|
|
end
|
|
|
|
def enqueue_action_job(action, args, executor)
|
|
raise ArgumentError.new('no action given') unless action
|
|
job = Job.new(action, args, executor, @caller.value || Thread.current.object_id)
|
|
synchronize { ns_enqueue_job(job) }
|
|
end
|
|
|
|
def enqueue_await_job(latch)
|
|
synchronize do
|
|
if (index = ns_find_last_job_for_thread)
|
|
job = Job.new(AWAIT_ACTION, [latch], Concurrent.global_immediate_executor,
|
|
Thread.current.object_id)
|
|
ns_enqueue_job(job, index+1)
|
|
else
|
|
latch.count_down
|
|
true
|
|
end
|
|
end
|
|
end
|
|
|
|
def ns_enqueue_job(job, index = nil)
|
|
# a non-nil index means this is an await job
|
|
return false if index.nil? && failed?
|
|
index ||= @queue.length
|
|
@queue.insert(index, job)
|
|
# if this is the only job, post to executor
|
|
ns_post_next_job if @queue.length == 1
|
|
true
|
|
end
|
|
|
|
def ns_post_next_job
|
|
@queue.first.executor.post { execute_next_job }
|
|
end
|
|
|
|
def execute_next_job
|
|
job = synchronize { @queue.first }
|
|
old_value = @current.value
|
|
|
|
@caller.value = job.caller # for nested actions
|
|
new_value = job.action.call(old_value, *job.args)
|
|
@caller.value = nil
|
|
|
|
return if new_value == AWAIT_FLAG
|
|
|
|
if ns_validate(new_value)
|
|
@current.value = new_value
|
|
observers.notify_observers(Time.now, old_value, new_value)
|
|
else
|
|
handle_error(ValidationError.new)
|
|
end
|
|
rescue => error
|
|
handle_error(error)
|
|
ensure
|
|
synchronize do
|
|
@queue.shift
|
|
unless failed? || @queue.empty?
|
|
ns_post_next_job
|
|
end
|
|
end
|
|
end
|
|
|
|
def ns_validate(value)
|
|
@validator.call(value)
|
|
rescue
|
|
false
|
|
end
|
|
|
|
def handle_error(error)
|
|
# stop new jobs from posting
|
|
@error.value = error if @error_mode == :fail
|
|
@error_handler.call(self, error)
|
|
rescue
|
|
# do nothing
|
|
end
|
|
|
|
def ns_find_last_job_for_thread
|
|
@queue.rindex { |job| job.caller == Thread.current.object_id }
|
|
end
|
|
end
|
|
end
|