if defined?(EventMachine.library_type) and EventMachine.library_type == :pure_ruby # assume 'em/pure_ruby' was loaded already elsif RUBY_PLATFORM =~ /java/ require 'java' require 'jeventmachine' else begin require 'rubyeventmachine' rescue LoadError warn "Unable to load the EventMachine C extension; To use the pure-ruby reactor, require 'em/pure_ruby'" raise end end require 'em/version' require 'em/pool' require 'em/deferrable' require 'em/future' require 'em/streamer' require 'em/spawnable' require 'em/processes' require 'em/iterator' require 'em/buftok' require 'em/timers' require 'em/protocols' require 'em/connection' require 'em/callback' require 'em/queue' require 'em/channel' require 'em/file_watch' require 'em/process_watch' require 'em/tick_loop' require 'em/resolver' require 'em/completion' require 'em/threaded_resource' require 'shellwords' require 'thread' require 'resolv' # Top-level EventMachine namespace. If you are looking for EventMachine examples, see {file:docs/GettingStarted.md EventMachine tutorial}. # # ## Key methods ## # ### Starting and stopping the event loop ### # # * {EventMachine.run} # * {EventMachine.stop_event_loop} # # ### Implementing clients ### # # * {EventMachine.connect} # # ### Implementing servers ### # # * {EventMachine.start_server} # # ### Working with timers ### # # * {EventMachine.add_timer} # * {EventMachine.add_periodic_timer} # * {EventMachine.cancel_timer} # # ### Working with blocking tasks ### # # * {EventMachine.defer} # * {EventMachine.next_tick} # # ### Efficient proxying ### # # * {EventMachine.enable_proxy} # * {EventMachine.disable_proxy} module EventMachine class << self # Exposed to allow joining on the thread, when run in a multithreaded # environment. Performing other actions on the thread has undefined # semantics (read: a dangerous endevor). # # @return [Thread] attr_reader :reactor_thread end @next_tick_mutex = Mutex.new @reactor_running = false @next_tick_queue = [] @tails = [] @threadpool = @threadqueue = @resultqueue = nil @all_threads_spawned = false # System errnos # @private ERRNOS = Errno::constants.grep(/^E/).inject(Hash.new(:unknown)) { |hash, name| errno = Errno.__send__(:const_get, name) hash[errno::Errno] = errno hash } # Initializes and runs an event loop. This method only returns if code inside the block passed to this method # calls {EventMachine.stop_event_loop}. The block is executed after initializing its internal event loop but *before* running the loop, # therefore this block is the right place to call any code that needs event loop to run, for example, {EventMachine.start_server}, # {EventMachine.connect} or similar methods of libraries that use EventMachine under the hood # (like `EventMachine::HttpRequest.new` or `AMQP.start`). # # Programs that are run for long periods of time (e.g. servers) usually start event loop by calling {EventMachine.run}, and let it # run "forever". It's also possible to use {EventMachine.run} to make a single client-connection to a remote server, # process the data flow from that single connection, and then call {EventMachine.stop_event_loop} to stop, in other words, # to run event loop for a short period of time (necessary to complete some operation) and then shut it down. # # Once event loop is running, it is perfectly possible to start multiple servers and clients simultaneously: content-aware # proxies like [Proxymachine](https://github.com/mojombo/proxymachine) do just that. # # ## Using EventMachine with Ruby on Rails and other Web application frameworks ## # # Standalone applications often run event loop on the main thread, thus blocking for their entire lifespan. In case of Web applications, # if you are running an EventMachine-based app server such as [Thin](http://code.macournoyer.com/thin/) or [Goliath](https://github.com/postrank-labs/goliath/), # they start event loop for you. Servers like Unicorn, Apache Passenger or Mongrel occupy main Ruby thread to serve HTTP(S) requests. This means # that calling {EventMachine.run} on the same thread is not an option (it will result in Web server never binding to the socket). # In that case, start event loop in a separate thread as demonstrated below. # # # @example Starting EventMachine event loop in the current thread to run the "Hello, world"-like Echo server example # # #!/usr/bin/env ruby # # require 'rubygems' # or use Bundler.setup # require 'eventmachine' # # class EchoServer < EM::Connection # def receive_data(data) # send_data(data) # end # end # # EventMachine.run do # EventMachine.start_server("0.0.0.0", 10000, EchoServer) # end # # # @example Starting EventMachine event loop in a separate thread # # # doesn't block current thread, can be used with Ruby on Rails, Sinatra, Merb, Rack # # and any other application server that occupies main Ruby thread. # Thread.new { EventMachine.run } # # # @note This method blocks calling thread. If you need to start EventMachine event loop from a Web app # running on a non event-driven server (Unicorn, Apache Passenger, Mongrel), do it in a separate thread like demonstrated # in one of the examples. # @see file:docs/GettingStarted.md Getting started with EventMachine # @see EventMachine.stop_event_loop def self.run blk=nil, tail=nil, &block # Obsoleted the use_threads mechanism. # 25Nov06: Added the begin/ensure block. We need to be sure that release_machine # gets called even if an exception gets thrown within any of the user code # that the event loop runs. The best way to see this is to run a unit # test with two functions, each of which calls {EventMachine.run} and each of # which throws something inside of #run. Without the ensure, the second test # will start without release_machine being called and will immediately throw # if @reactor_running and @reactor_pid != Process.pid # Reactor was started in a different parent, meaning we have forked. # Clean up reactor state so a new reactor boots up in this child. stop_event_loop release_machine cleanup_machine @reactor_running = false end tail and @tails.unshift(tail) if reactor_running? (b = blk || block) and b.call # next_tick(b) else @conns = {} @acceptors = {} @timers = {} @wrapped_exception = nil @next_tick_queue ||= [] @tails ||= [] begin initialize_event_machine @reactor_pid = Process.pid @reactor_thread = Thread.current @reactor_running = true (b = blk || block) and add_timer(0, b) if @next_tick_queue && !@next_tick_queue.empty? add_timer(0) { signal_loopbreak } end # Rubinius needs to come back into "Ruby space" for GC to work, # so we'll crank the machine here. if defined?(RUBY_ENGINE) && RUBY_ENGINE == "rbx" while run_machine_once; end else run_machine end ensure until @tails.empty? @tails.pop.call end release_machine cleanup_machine @reactor_running = false @reactor_thread = nil end raise @wrapped_exception if @wrapped_exception end end # Sugars a common use case. Will pass the given block to #run, but will terminate # the reactor loop and exit the function as soon as the code in the block completes. # (Normally, {EventMachine.run} keeps running indefinitely, even after the block supplied to it # finishes running, until user code calls {EventMachine.stop}) # def self.run_block &block pr = proc { block.call EventMachine::stop } run(&pr) end # @return [Boolean] true if the calling thread is the same thread as the reactor. def self.reactor_thread? Thread.current == @reactor_thread end # Runs the given callback on the reactor thread, or immediately if called # from the reactor thread. Accepts the same arguments as {EventMachine::Callback} def self.schedule(*a, &b) cb = Callback(*a, &b) if reactor_running? && reactor_thread? cb.call else next_tick { cb.call } end end # Forks a new process, properly stops the reactor and then calls {EventMachine.run} inside of it again, passing your block. def self.fork_reactor &block # This implementation is subject to change, especially if we clean up the relationship # of EM#run to @reactor_running. # Original patch by Aman Gupta. # Kernel.fork do if reactor_running? stop_event_loop release_machine cleanup_machine @reactor_running = false @reactor_thread = nil end run block end end # Clean up Ruby space following a release_machine def self.cleanup_machine if @threadpool && !@threadpool.empty? # Tell the threads to stop @threadpool.each { |t| t.exit } # Join the threads or bump the stragglers one more time @threadpool.each { |t| t.join 0.01 || t.exit } end @threadpool = nil @threadqueue = nil @resultqueue = nil @all_threads_spawned = false @next_tick_queue = [] end # Adds a block to call as the reactor is shutting down. # # These callbacks are called in the _reverse_ order to which they are added. # # @example Scheduling operations to be run when EventMachine event loop is stopped # # EventMachine.run do # EventMachine.add_shutdown_hook { puts "b" } # EventMachine.add_shutdown_hook { puts "a" } # EventMachine.stop # end # # # Outputs: # # a # # b # def self.add_shutdown_hook &block @tails << block end # Adds a one-shot timer to the event loop. # Call it with one or two parameters. The first parameters is a delay-time # expressed in *seconds* (not milliseconds). The second parameter, if # present, must be an object that responds to :call. If 2nd parameter is not given, then you # can also simply pass a block to the method call. # # This method may be called from the block passed to {EventMachine.run} # or from any callback method. It schedules execution of the proc or block # passed to it, after the passage of an interval of time equal to # *at least* the number of seconds specified in the first parameter to # the call. # # {EventMachine.add_timer} is a non-blocking method. Callbacks can and will # be called during the interval of time that the timer is in effect. # There is no built-in limit to the number of timers that can be outstanding at # any given time. # # @example Setting a one-shot timer with EventMachine # # EventMachine.run { # puts "Starting the run now: #{Time.now}" # EventMachine.add_timer 5, proc { puts "Executing timer event: #{Time.now}" } # EventMachine.add_timer(10) { puts "Executing timer event: #{Time.now}" } # } # # @param [Integer] delay Delay in seconds # @see EventMachine::Timer # @see EventMachine.add_periodic_timer def self.add_timer *args, &block interval = args.shift code = args.shift || block if code # check too many timers! s = add_oneshot_timer((interval.to_f * 1000).to_i) @timers[s] = code s end end # Adds a periodic timer to the event loop. # It takes the same parameters as the one-shot timer method, {EventMachine.add_timer}. # This method schedules execution of the given block repeatedly, at intervals # of time *at least* as great as the number of seconds given in the first # parameter to the call. # # @example Write a dollar-sign to stderr every five seconds, without blocking # # EventMachine.run { # EventMachine.add_periodic_timer( 5 ) { $stderr.write "$" } # } # # @param [Integer] delay Delay in seconds # # @see EventMachine::PeriodicTimer # @see EventMachine.add_timer # def self.add_periodic_timer *args, &block interval = args.shift code = args.shift || block EventMachine::PeriodicTimer.new(interval, code) end # Cancel a timer (can be a callback or an {EventMachine::Timer} instance). # # @param [#cancel, #call] timer_or_sig A timer to cancel # @see EventMachine::Timer#cancel def self.cancel_timer timer_or_sig if timer_or_sig.respond_to? :cancel timer_or_sig.cancel else @timers[timer_or_sig] = false if @timers.has_key?(timer_or_sig) end end # Causes the processing loop to stop executing, which will cause all open connections and accepting servers # to be run down and closed. Connection termination callbacks added using {EventMachine.add_shutdown_hook} # will be called as part of running this method. # # When all of this processing is complete, the call to {EventMachine.run} which started the processing loop # will return and program flow will resume from the statement following {EventMachine.run} call. # # @example Stopping a running EventMachine event loop # # require 'rubygems' # require 'eventmachine' # # module Redmond # def post_init # puts "We're sending a dumb HTTP request to the remote peer." # send_data "GET / HTTP/1.1\r\nHost: www.microsoft.com\r\n\r\n" # end # # def receive_data data # puts "We received #{data.length} bytes from the remote peer." # puts "We're going to stop the event loop now." # EventMachine::stop_event_loop # end # # def unbind # puts "A connection has terminated." # end # end # # puts "We're starting the event loop now." # EventMachine.run { # EventMachine.connect "www.microsoft.com", 80, Redmond # } # puts "The event loop has stopped." # # # This program will produce approximately the following output: # # # # We're starting the event loop now. # # We're sending a dumb HTTP request to the remote peer. # # We received 1440 bytes from the remote peer. # # We're going to stop the event loop now. # # A connection has terminated. # # The event loop has stopped. # # def self.stop_event_loop EventMachine::stop end # Initiates a TCP server (socket acceptor) on the specified IP address and port. # # The IP address must be valid on the machine where the program # runs, and the process must be privileged enough to listen # on the specified port (on Unix-like systems, superuser privileges # are usually required to listen on any port lower than 1024). # Only one listener may be running on any given address/port # combination. start_server will fail if the given address and port # are already listening on the machine, either because of a prior call # to {.start_server} or some unrelated process running on the machine. # If {.start_server} succeeds, the new network listener becomes active # immediately and starts accepting connections from remote peers, # and these connections generate callback events that are processed # by the code specified in the handler parameter to {.start_server}. # # The optional handler which is passed to this method is the key # to EventMachine's ability to handle particular network protocols. # The handler parameter passed to start_server must be a Ruby Module # that you must define. When the network server that is started by # start_server accepts a new connection, it instantiates a new # object of an anonymous class that is inherited from {EventMachine::Connection}, # *into which your handler module have been included*. Arguments passed into start_server # after the class name are passed into the constructor during the instantiation. # # Your handler module may override any of the methods in {EventMachine::Connection}, # such as {EventMachine::Connection#receive_data}, in order to implement the specific behavior # of the network protocol. # # Callbacks invoked in response to network events *always* take place # within the execution context of the object derived from {EventMachine::Connection} # extended by your handler module. There is one object per connection, and # all of the callbacks invoked for a particular connection take the form # of instance methods called against the corresponding {EventMachine::Connection} # object. Therefore, you are free to define whatever instance variables you # wish, in order to contain the per-connection state required by the network protocol you are # implementing. # # {EventMachine.start_server} is usually called inside the block passed to {EventMachine.run}, # but it can be called from any EventMachine callback. {EventMachine.start_server} will fail # unless the EventMachine event loop is currently running (which is why # it's often called in the block suppled to {EventMachine.run}). # # You may call start_server any number of times to start up network # listeners on different address/port combinations. The servers will # all run simultaneously. More interestingly, each individual call to start_server # can specify a different handler module and thus implement a different # network protocol from all the others. # # @example # # require 'rubygems' # require 'eventmachine' # # # Here is an example of a server that counts lines of input from the remote # # peer and sends back the total number of lines received, after each line. # # Try the example with more than one client connection opened via telnet, # # and you will see that the line count increments independently on each # # of the client connections. Also very important to note, is that the # # handler for the receive_data function, which our handler redefines, may # # not assume that the data it receives observes any kind of message boundaries. # # Also, to use this example, be sure to change the server and port parameters # # to the start_server call to values appropriate for your environment. # module LineCounter # MaxLinesPerConnection = 10 # # def post_init # puts "Received a new connection" # @data_received = "" # @line_count = 0 # end # # def receive_data data # @data_received << data # while @data_received.slice!( /^[^\n]*[\n]/m ) # @line_count += 1 # send_data "received #{@line_count} lines so far\r\n" # @line_count == MaxLinesPerConnection and close_connection_after_writing # end # end # end # # EventMachine.run { # host, port = "192.168.0.100", 8090 # EventMachine.start_server host, port, LineCounter # puts "Now accepting connections on address #{host}, port #{port}..." # EventMachine.add_periodic_timer(10) { $stderr.write "*" } # } # # @param [String] server Host to bind to. # @param [Integer] port Port to bind to. # @param [Module, Class] handler A module or class that implements connection callbacks # # @note Don't forget that in order to bind to ports < 1024 on Linux, *BSD and Mac OS X your process must have superuser privileges. # # @see file:docs/GettingStarted.md EventMachine tutorial # @see EventMachine.stop_server def self.start_server server, port=nil, handler=nil, *args, &block begin port = Integer(port) rescue ArgumentError, TypeError # there was no port, so server must be a unix domain socket # the port argument is actually the handler, and the handler is one of the args args.unshift handler if handler handler = port port = nil end if port klass = klass_from_handler(Connection, handler, *args) s = if port start_tcp_server server, port else start_unix_server server end @acceptors[s] = [klass,args,block] s end # Attach to an existing socket's file descriptor. The socket may have been # started with {EventMachine.start_server}. def self.attach_server sock, handler=nil, *args, &block klass = klass_from_handler(Connection, handler, *args) sd = sock.respond_to?(:fileno) ? sock.fileno : sock s = attach_sd(sd) @acceptors[s] = [klass,args,block,sock] s end # Stop a TCP server socket that was started with {EventMachine.start_server}. # @see EventMachine.start_server def self.stop_server signature EventMachine::stop_tcp_server signature end # Start a Unix-domain server. # # Note that this is an alias for {EventMachine.start_server}, which can be used to start both # TCP and Unix-domain servers. # # @see EventMachine.start_server def self.start_unix_domain_server filename, *args, &block start_server filename, *args, &block end # Initiates a TCP connection to a remote server and sets up event handling for the connection. # {EventMachine.connect} requires event loop to be running (see {EventMachine.run}). # # {EventMachine.connect} takes the IP address (or hostname) and # port of the remote server you want to connect to. # It also takes an optional handler (a module or a subclass of {EventMachine::Connection}) which you must define, that # contains the callbacks that will be invoked by the event loop on behalf of the connection. # # Learn more about connection lifecycle callbacks in the {file:docs/GettingStarted.md EventMachine tutorial} and # {file:docs/ConnectionLifecycleCallbacks.md Connection lifecycle guide}. # # # @example # # # Here's a program which connects to a web server, sends a naive # # request, parses the HTTP header of the response, and then # # (antisocially) ends the event loop, which automatically drops the connection # # (and incidentally calls the connection's unbind method). # module DumbHttpClient # def post_init # send_data "GET / HTTP/1.1\r\nHost: _\r\n\r\n" # @data = "" # @parsed = false # end # # def receive_data data # @data << data # if !@parsed and @data =~ /[\n][\r]*[\n]/m # @parsed = true # puts "RECEIVED HTTP HEADER:" # $`.each {|line| puts ">>> #{line}" } # # puts "Now we'll terminate the loop, which will also close the connection" # EventMachine::stop_event_loop # end # end # # def unbind # puts "A connection has terminated" # end # end # # EventMachine.run { # EventMachine.connect "www.bayshorenetworks.com", 80, DumbHttpClient # } # puts "The event loop has ended" # # # @example Defining protocol handler as a class # # class MyProtocolHandler < EventMachine::Connection # def initialize *args # super # # whatever else you want to do here # end # # # ... # end # # # @param [String] server Host to connect to # @param [Integer] port Port to connect to # @param [Module, Class] handler A module or class that implements connection lifecycle callbacks # # @see EventMachine.start_server # @see file:docs/GettingStarted.md EventMachine tutorial def self.connect server, port=nil, handler=nil, *args, &blk # EventMachine::connect initiates a TCP connection to a remote # server and sets up event-handling for the connection. # It internally creates an object that should not be handled # by the caller. HOWEVER, it's often convenient to get the # object to set up interfacing to other objects in the system. # We return the newly-created anonymous-class object to the caller. # It's expected that a considerable amount of code will depend # on this behavior, so don't change it. # # Ok, added support for a user-defined block, 13Apr06. # This leads us to an interesting choice because of the # presence of the post_init call, which happens in the # initialize method of the new object. We call the user's # block and pass the new object to it. This is a great # way to do protocol-specific initiation. It happens # AFTER post_init has been called on the object, which I # certainly hope is the right choice. # Don't change this lightly, because accepted connections # are different from connected ones and we don't want # to have them behave differently with respect to post_init # if at all possible. bind_connect nil, nil, server, port, handler, *args, &blk end # This method is like {EventMachine.connect}, but allows for a local address/port # to bind the connection to. # # @see EventMachine.connect def self.bind_connect bind_addr, bind_port, server, port=nil, handler=nil, *args begin port = Integer(port) rescue ArgumentError, TypeError # there was no port, so server must be a unix domain socket # the port argument is actually the handler, and the handler is one of the args args.unshift handler if handler handler = port port = nil end if port klass = klass_from_handler(Connection, handler, *args) s = if port if bind_addr bind_connect_server bind_addr, bind_port.to_i, server, port else connect_server server, port end else connect_unix_server server end c = klass.new s, *args @conns[s] = c block_given? and yield c c end # {EventMachine.watch} registers a given file descriptor or IO object with the eventloop. The # file descriptor will not be modified (it will remain blocking or non-blocking). # # The eventloop can be used to process readable and writable events on the file descriptor, using # {EventMachine::Connection#notify_readable=} and {EventMachine::Connection#notify_writable=} # # {EventMachine::Connection#notify_readable?} and {EventMachine::Connection#notify_writable?} can be used # to check what events are enabled on the connection. # # To detach the file descriptor, use {EventMachine::Connection#detach} # # @example # # module SimpleHttpClient # def notify_readable # header = @io.readline # # if header == "\r\n" # # detach returns the file descriptor number (fd == @io.fileno) # fd = detach # end # rescue EOFError # detach # end # # def unbind # EM.next_tick do # # socket is detached from the eventloop, but still open # data = @io.read # end # end # end # # EventMachine.run { # sock = TCPSocket.new('site.com', 80) # sock.write("GET / HTTP/1.0\r\n\r\n") # conn = EventMachine.watch(sock, SimpleHttpClient) # conn.notify_readable = true # } # # @author Riham Aldakkak (eSpace Technologies) def EventMachine::watch io, handler=nil, *args, &blk attach_io io, true, handler, *args, &blk end # Attaches an IO object or file descriptor to the eventloop as a regular connection. # The file descriptor will be set as non-blocking, and EventMachine will process # receive_data and send_data events on it as it would for any other connection. # # To watch a fd instead, use {EventMachine.watch}, which will not alter the state of the socket # and fire notify_readable and notify_writable events instead. def EventMachine::attach io, handler=nil, *args, &blk attach_io io, false, handler, *args, &blk end # @private def EventMachine::attach_io io, watch_mode, handler=nil, *args klass = klass_from_handler(Connection, handler, *args) if !watch_mode and klass.public_instance_methods.any?{|m| [:notify_readable, :notify_writable].include? m.to_sym } raise ArgumentError, "notify_readable/writable with EM.attach is not supported. Use EM.watch(io){ |c| c.notify_readable = true }" end if io.respond_to?(:fileno) # getDescriptorByFileno deprecated in JRuby 1.7.x, removed in JRuby 9000 if defined?(JRuby) && JRuby.runtime.respond_to?(:getDescriptorByFileno) fd = JRuby.runtime.getDescriptorByFileno(io.fileno).getChannel else fd = io.fileno end else fd = io end s = attach_fd fd, watch_mode c = klass.new s, *args c.instance_variable_set(:@io, io) c.instance_variable_set(:@watch_mode, watch_mode) c.instance_variable_set(:@fd, fd) @conns[s] = c block_given? and yield c c end # Connect to a given host/port and re-use the provided {EventMachine::Connection} instance. # Consider also {EventMachine::Connection#reconnect}. # # @see EventMachine::Connection#reconnect def self.reconnect server, port, handler # Observe, the test for already-connected FAILS if we call a reconnect inside post_init, # because we haven't set up the connection in @conns by that point. # RESIST THE TEMPTATION to "fix" this problem by redefining the behavior of post_init. # # Changed 22Nov06: if called on an already-connected handler, just return the # handler and do nothing more. Originally this condition raised an exception. # We may want to change it yet again and call the block, if any. raise "invalid handler" unless handler.respond_to?(:connection_completed) #raise "still connected" if @conns.has_key?(handler.signature) return handler if @conns.has_key?(handler.signature) s = if port connect_server server, port else connect_unix_server server end handler.signature = s @conns[s] = handler block_given? and yield handler handler end # Make a connection to a Unix-domain socket. This method is simply an alias for {.connect}, # which can connect to both TCP and Unix-domain sockets. Make sure that your process has sufficient # permissions to open the socket it is given. # # @param [String] socketname Unix domain socket (local fully-qualified path) you want to connect to. # # @note UNIX sockets, as the name suggests, are not available on Microsoft Windows. def self.connect_unix_domain socketname, *args, &blk connect socketname, *args, &blk end # Used for UDP-based protocols. Its usage is similar to that of {EventMachine.start_server}. # # This method will create a new UDP (datagram) socket and # bind it to the address and port that you specify. # The normal callbacks (see {EventMachine.start_server}) will # be called as events of interest occur on the newly-created # socket, but there are some differences in how they behave. # # {Connection#receive_data} will be called when a datagram packet # is received on the socket, but unlike TCP sockets, the message # boundaries of the received data will be respected. In other words, # if the remote peer sent you a datagram of a particular size, # you may rely on {Connection#receive_data} to give you the # exact data in the packet, with the original data length. # Also observe that Connection#receive_data may be called with a # *zero-length* data payload, since empty datagrams are permitted in UDP. # # {Connection#send_data} is available with UDP packets as with TCP, # but there is an important difference. Because UDP communications # are *connectionless*, there is no implicit recipient for the packets you # send. Ordinarily you must specify the recipient for each packet you send. # However, EventMachine provides for the typical pattern of receiving a UDP datagram # from a remote peer, performing some operation, and then sending # one or more packets in response to the same remote peer. # To support this model easily, just use {Connection#send_data} # in the code that you supply for {Connection#receive_data}. # # EventMachine will provide an implicit return address for any messages sent to # {Connection#send_data} within the context of a {Connection#receive_data} callback, # and your response will automatically go to the correct remote peer. # # Observe that the port number that you supply to {EventMachine.open_datagram_socket} # may be zero. In this case, EventMachine will create a UDP socket # that is bound to an [ephemeral port](http://en.wikipedia.org/wiki/Ephemeral_port). # This is not appropriate for servers that must publish a well-known # port to which remote peers may send datagrams. But it can be useful # for clients that send datagrams to other servers. # If you do this, you will receive any responses from the remote # servers through the normal {Connection#receive_data} callback. # Observe that you will probably have issues with firewalls blocking # the ephemeral port numbers, so this technique is most appropriate for LANs. # # If you wish to send datagrams to arbitrary remote peers (not # necessarily ones that have sent data to which you are responding), # then see {Connection#send_datagram}. # # DO NOT call send_data from a datagram socket outside of a {Connection#receive_data} method. Use {Connection#send_datagram}. # If you do use {Connection#send_data} outside of a {Connection#receive_data} method, you'll get a confusing error # because there is no "peer," as #send_data requires (inside of {EventMachine::Connection#receive_data}, # {EventMachine::Connection#send_data} "fakes" the peer as described above). # # @param [String] address IP address # @param [String] port Port # @param [Class, Module] handler A class or a module that implements connection lifecycle callbacks. def self.open_datagram_socket address, port, handler=nil, *args # Replaced the implementation on 01Oct06. Thanks to Tobias Gustafsson for pointing # out that this originally did not take a class but only a module. klass = klass_from_handler(Connection, handler, *args) s = open_udp_socket address, port.to_i c = klass.new s, *args @conns[s] = c block_given? and yield c c end # For advanced users. This function sets the default timer granularity, which by default is # slightly smaller than 100 milliseconds. Call this function to set a higher or lower granularity. # The function affects the behavior of {EventMachine.add_timer} and {EventMachine.add_periodic_timer}. # Most applications will not need to call this function. # # Avoid setting the quantum to very low values because that may reduce performance under some extreme conditions. # We recommend that you not use values lower than 10. # # This method only can be used if event loop is running. # # @param [Integer] mills New timer granularity, in milliseconds # # @see EventMachine.add_timer # @see EventMachine.add_periodic_timer # @see EventMachine::Timer # @see EventMachine.run def self.set_quantum mills set_timer_quantum mills.to_i end # Sets the maximum number of timers and periodic timers that may be outstanding at any # given time. You only need to call {.set_max_timers} if you need more than the default # number of timers, which on most platforms is 1000. # # @note This method has to be used *before* event loop is started. # # @param [Integer] ct Maximum number of timers that may be outstanding at any given time # # @see EventMachine.add_timer # @see EventMachine.add_periodic_timer # @see EventMachine::Timer def self.set_max_timers ct set_max_timer_count ct end # Gets the current maximum number of allowed timers # # @return [Integer] Maximum number of timers that may be outstanding at any given time def self.get_max_timers get_max_timer_count end # Returns the total number of connections (file descriptors) currently held by the reactor. # Note that a tick must pass after the 'initiation' of a connection for this number to increment. # It's usually accurate, but don't rely on the exact precision of this number unless you really know EM internals. # # @example # # EventMachine.run { # EventMachine.connect("rubyeventmachine.com", 80) # # count will be 0 in this case, because connection is not # # established yet # count = EventMachine.connection_count # } # # # @example # # EventMachine.run { # EventMachine.connect("rubyeventmachine.com", 80) # # EventMachine.next_tick { # # In this example, count will be 1 since the connection has been established in # # the next loop of the reactor. # count = EventMachine.connection_count # } # } # # @return [Integer] Number of connections currently held by the reactor. def self.connection_count self.get_connection_count end # The is the responder for the loopback-signalled event. # It can be fired either by code running on a separate thread ({EventMachine.defer}) or on # the main thread ({EventMachine.next_tick}). # It will often happen that a next_tick handler will reschedule itself. We # consume a copy of the tick queue so that tick events scheduled by tick events # have to wait for the next pass through the reactor core. # # @private def self.run_deferred_callbacks until (@resultqueue ||= []).empty? result,cback = @resultqueue.pop cback.call result if cback end # Capture the size at the start of this tick... size = @next_tick_mutex.synchronize { @next_tick_queue.size } size.times do |i| callback = @next_tick_mutex.synchronize { @next_tick_queue.shift } begin callback.call rescue exception_raised = true raise ensure # This is a little nasty. The problem is, if an exception occurs during # the callback, then we need to send a signal to the reactor to actually # do some work during the next_tick. The only mechanism we have from the # ruby side is next_tick itself, although ideally, we'd just drop a byte # on the loopback descriptor. next_tick {} if exception_raised end end end # EventMachine.defer is used for integrating blocking operations into EventMachine's control flow. # The action of {.defer} is to take the block specified in the first parameter (the "operation") # and schedule it for asynchronous execution on an internal thread pool maintained by EventMachine. # When the operation completes, it will pass the result computed by the block (if any) back to the # EventMachine reactor. Then, EventMachine calls the block specified in the second parameter to # {.defer} (the "callback"), as part of its normal event handling loop. The result computed by the # operation block is passed as a parameter to the callback. You may omit the callback parameter if # you don't need to execute any code after the operation completes. If the operation raises an # unhandled exception, the exception will be passed to the third parameter to {.defer} (the # "errback"), as part of its normal event handling loop. If no errback is provided, the exception # will be allowed to blow through to the main thread immediately. # # ## Caveats ## # # Note carefully that the code in your deferred operation will be executed on a separate # thread from the main EventMachine processing and all other Ruby threads that may exist in # your program. Also, multiple deferred operations may be running at once! Therefore, you # are responsible for ensuring that your operation code is threadsafe. # # Don't write a deferred operation that will block forever. If so, the current implementation will # not detect the problem, and the thread will never be returned to the pool. EventMachine limits # the number of threads in its pool, so if you do this enough times, your subsequent deferred # operations won't get a chance to run. # # The threads within the EventMachine's thread pool have abort_on_exception set to true. As a result, # if an unhandled exception is raised by the deferred operation and an errback is not provided, it # will blow through to the main thread immediately. If the main thread is within an indiscriminate # rescue block at that time, the exception could be handled improperly by the main thread. # # @example # # operation = proc { # # perform a long-running operation here, such as a database query. # "result" # as usual, the last expression evaluated in the block will be the return value. # } # callback = proc {|result| # # do something with result here, such as send it back to a network client. # } # errback = proc {|error| # # do something with error here, such as re-raising or logging. # } # # EventMachine.defer(operation, callback, errback) # # @param [#call] op An operation you want to offload to EventMachine thread pool # @param [#call] callback A callback that will be run on the event loop thread after `operation` finishes. # @param [#call] errback An errback that will be run on the event loop thread after `operation` raises an exception. # # @see EventMachine.threadpool_size def self.defer op = nil, callback = nil, errback = nil, &blk # OBSERVE that #next_tick hacks into this mechanism, so don't make any changes here # without syncing there. # # Running with $VERBOSE set to true gives a warning unless all ivars are defined when # they appear in rvalues. But we DON'T ever want to initialize @threadqueue unless we # need it, because the Ruby threads are so heavyweight. We end up with this bizarre # way of initializing @threadqueue because EventMachine is a Module, not a Class, and # has no constructor. unless @threadpool @threadpool = [] @threadqueue = ::Queue.new @resultqueue = ::Queue.new spawn_threadpool end @threadqueue << [op||blk,callback,errback] end # @private def self.spawn_threadpool until @threadpool.size == @threadpool_size.to_i thread = Thread.new do Thread.current.abort_on_exception = true while true begin op, cback, eback = *@threadqueue.pop rescue ThreadError $stderr.puts $!.message break # Ruby 2.0 may fail at Queue.pop end begin result = op.call @resultqueue << [result, cback] rescue Exception => error raise error unless eback @resultqueue << [error, eback] end signal_loopbreak end end @threadpool << thread end @all_threads_spawned = true end ## # Returns +true+ if all deferred actions are done executing and their # callbacks have been fired. # def self.defers_finished? return false if @threadpool and !@all_threads_spawned return false if @threadqueue and not @threadqueue.empty? return false if @resultqueue and not @resultqueue.empty? return false if @threadpool and @threadqueue.num_waiting != @threadpool.size return true end class << self # @private attr_reader :threadpool # Size of the EventMachine.defer threadpool (defaults to 20) # @return [Number] attr_accessor :threadpool_size EventMachine.threadpool_size = 20 end # Schedules a proc for execution immediately after the next "turn" through the reactor # core. An advanced technique, this can be useful for improving memory management and/or # application responsiveness, especially when scheduling large amounts of data for # writing to a network connection. # # This method takes either a single argument (which must be a callable object) or a block. # # @param [#call] pr A callable object to run def self.next_tick pr=nil, &block # This works by adding to the @resultqueue that's used for #defer. # The general idea is that next_tick is used when we want to give the reactor a chance # to let other operations run, either to balance the load out more evenly, or to let # outbound network buffers drain, or both. So we probably do NOT want to block, and # we probably do NOT want to be spinning any threads. A program that uses next_tick # but not #defer shouldn't suffer the penalty of having Ruby threads running. They're # extremely expensive even if they're just sleeping. raise ArgumentError, "no proc or block given" unless ((pr && pr.respond_to?(:call)) or block) @next_tick_mutex.synchronize do @next_tick_queue << ( pr || block ) end signal_loopbreak if reactor_running? end # A wrapper over the setuid system call. Particularly useful when opening a network # server on a privileged port because you can use this call to drop privileges # after opening the port. Also very useful after a call to {.set_descriptor_table_size}, # which generally requires that you start your process with root privileges. # # This method is intended for use in enforcing security requirements, consequently # it will throw a fatal error and end your program if it fails. # # @param [String] username The effective name of the user whose privilege-level your process should attain. # # @note This method has no effective implementation on Windows or in the pure-Ruby # implementation of EventMachine def self.set_effective_user username EventMachine::setuid_string username end # Sets the maximum number of file or socket descriptors that your process may open. # If you call this method with no arguments, it will simply return # the current size of the descriptor table without attempting to change it. # # The new limit on open descriptors **only** applies to sockets and other descriptors # that belong to EventMachine. It has **no effect** on the number of descriptors # you can create in ordinary Ruby code. # # Not available on all platforms. Increasing the number of descriptors beyond its # default limit usually requires superuser privileges. (See {.set_effective_user} # for a way to drop superuser privileges while your program is running.) # # @param [Integer] n_descriptors The maximum number of file or socket descriptors that your process may open # @return [Integer] The new descriptor table size. def self.set_descriptor_table_size n_descriptors=nil EventMachine::set_rlimit_nofile n_descriptors end # Runs an external process. # # @example # # module RubyCounter # def post_init # # count up to 5 # send_data "5\n" # end # def receive_data data # puts "ruby sent me: #{data}" # end # def unbind # puts "ruby died with exit status: #{get_status.exitstatus}" # end # end # # EventMachine.run { # EventMachine.popen("ruby -e' $stdout.sync = true; gets.to_i.times{ |i| puts i+1; sleep 1 } '", RubyCounter) # } # # @note This method is not supported on Microsoft Windows # @see EventMachine::DeferrableChildProcess # @see EventMachine.system def self.popen cmd, handler=nil, *args # At this moment, it's only available on Unix. # Perhaps misnamed since the underlying function uses socketpair and is full-duplex. klass = klass_from_handler(Connection, handler, *args) w = case cmd when Array cmd when String Shellwords::shellwords( cmd ) end w.unshift( w.first ) if w.first s = invoke_popen( w ) c = klass.new s, *args @conns[s] = c yield(c) if block_given? c end # Tells you whether the EventMachine reactor loop is currently running. # # Useful when writing libraries that want to run event-driven code, but may # be running in programs that are already event-driven. In such cases, if {EventMachine.reactor_running?} # returns false, your code can invoke {EventMachine.run} and run your application code inside # the block passed to that method. If this method returns true, just # execute your event-aware code. # # @return [Boolean] true if the EventMachine reactor loop is currently running def self.reactor_running? @reactor_running && Process.pid == @reactor_pid end # (Experimental) # # @private def self.open_keyboard handler=nil, *args klass = klass_from_handler(Connection, handler, *args) s = read_keyboard c = klass.new s, *args @conns[s] = c block_given? and yield c c end # EventMachine's file monitoring API. Currently supported are the following events # on individual files, using inotify on Linux systems, and kqueue for *BSD and Mac OS X: # # * File modified (written to) # * File moved/renamed # * File deleted # # EventMachine::watch_file takes a filename and a handler Module containing your custom callback methods. # This will setup the low level monitoring on the specified file, and create a new EventMachine::FileWatch # object with your Module mixed in. FileWatch is a subclass of {EventMachine::Connection}, so callbacks on this object # work in the familiar way. The callbacks that will be fired by EventMachine are: # # * file_modified # * file_moved # * file_deleted # # You can access the filename being monitored from within this object using {FileWatch#path}. # # When a file is deleted, {FileWatch#stop_watching} will be called after your file_deleted callback, # to clean up the underlying monitoring and remove EventMachine's reference to the now-useless {FileWatch} instance. # This will in turn call unbind, if you wish to use it. # # The corresponding system-level Errno will be raised when attempting to monitor non-existent files, # files with wrong permissions, or if an error occurs dealing with inotify/kqueue. # # @example # # # Before running this example, make sure we have a file to monitor: # # $ echo "bar" > /tmp/foo # # module Handler # def file_modified # puts "#{path} modified" # end # # def file_moved # puts "#{path} moved" # end # # def file_deleted # puts "#{path} deleted" # end # # def unbind # puts "#{path} monitoring ceased" # end # end # # # for efficient file watching, use kqueue on Mac OS X # EventMachine.kqueue = true if EventMachine.kqueue? # # EventMachine.run { # EventMachine.watch_file("/tmp/foo", Handler) # } # # # $ echo "baz" >> /tmp/foo => "/tmp/foo modified" # # $ mv /tmp/foo /tmp/oof => "/tmp/foo moved" # # $ rm /tmp/oof => "/tmp/foo deleted" # # @note The ability to pick up on the new filename after a rename is not yet supported. # Calling #path will always return the filename you originally used. # # @param [String] filename Local path to the file to watch. # @param [Class, Module] handler A class or module that implements event handlers associated with the file. def self.watch_file(filename, handler=nil, *args) klass = klass_from_handler(FileWatch, handler, *args) s = EM::watch_filename(filename) c = klass.new s, *args # we have to set the path like this because of how Connection.new works c.instance_variable_set("@path", filename) @conns[s] = c block_given? and yield c c end # EventMachine's process monitoring API. On Mac OS X and *BSD this method is implemented using kqueue. # # @example # # module ProcessWatcher # def process_exited # put 'the forked child died!' # end # end # # pid = fork{ sleep } # # EventMachine.run { # EventMachine.watch_process(pid, ProcessWatcher) # EventMachine.add_timer(1){ Process.kill('TERM', pid) } # } # # @param [Integer] pid PID of the process to watch. # @param [Class, Module] handler A class or module that implements event handlers associated with the file. def self.watch_process(pid, handler=nil, *args) pid = pid.to_i klass = klass_from_handler(ProcessWatch, handler, *args) s = EM::watch_pid(pid) c = klass.new s, *args # we have to set the path like this because of how Connection.new works c.instance_variable_set("@pid", pid) @conns[s] = c block_given? and yield c c end # Catch-all for errors raised during event loop callbacks. # # @example # # EventMachine.error_handler{ |e| # puts "Error raised during event loop: #{e.message}" # } # # @param [#call] cb Global catch-all errback def self.error_handler cb = nil, &blk if cb or blk @error_handler = cb || blk elsif instance_variable_defined? :@error_handler remove_instance_variable :@error_handler end end # This method allows for direct writing of incoming data back out to another descriptor, at the C++ level in the reactor. # This is very efficient and especially useful for proxies where high performance is required. Propogating data from a server response # all the way up to Ruby, and then back down to the reactor to be sent back to the client, is often unnecessary and # incurs a significant performance decrease. # # The two arguments are instance of {EventMachine::Connection} subclasses, 'from' and 'to'. 'from' is the connection whose inbound data you want # relayed back out. 'to' is the connection to write it to. # # Once you call this method, the 'from' connection will no longer get receive_data callbacks from the reactor, # except in the case that 'to' connection has already closed when attempting to write to it. You can see # in the example, that proxy_target_unbound will be called when this occurs. After that, further incoming # data will be passed into receive_data as normal. # # Note also that this feature supports different types of descriptors: TCP, UDP, and pipes. You can relay # data from one kind to another, for example, feed a pipe from a UDP stream. # # @example # # module ProxyConnection # def initialize(client, request) # @client, @request = client, request # end # # def post_init # EM::enable_proxy(self, @client) # end # # def connection_completed # send_data @request # end # # def proxy_target_unbound # close_connection # end # # def unbind # @client.close_connection_after_writing # end # end # # module ProxyServer # def receive_data(data) # (@buf ||= "") << data # if @buf =~ /\r\n\r\n/ # all http headers received # EventMachine.connect("10.0.0.15", 80, ProxyConnection, self, data) # end # end # end # # EventMachine.run { # EventMachine.start_server("127.0.0.1", 8080, ProxyServer) # } # # @param [EventMachine::Connection] from Source of data to be proxies/streamed. # @param [EventMachine::Connection] to Destination of data to be proxies/streamed. # @param [Integer] bufsize Buffer size to use # @param [Integer] length Maximum number of bytes to proxy. # # @see EventMachine.disable_proxy def self.enable_proxy(from, to, bufsize=0, length=0) EM::start_proxy(from.signature, to.signature, bufsize, length) end # Takes just one argument, a {Connection} that has proxying enabled via {EventMachine.enable_proxy}. # Calling this method will remove that functionality and your connection will begin receiving # data via {Connection#receive_data} again. # # @param [EventMachine::Connection] from Source of data that is being proxied # @see EventMachine.enable_proxy def self.disable_proxy(from) EM::stop_proxy(from.signature) end # Retrieve the heartbeat interval. This is how often EventMachine will check for dead connections # that have had an inactivity timeout set via {Connection#set_comm_inactivity_timeout}. # Default is 2 seconds. # # @return [Integer] Heartbeat interval, in seconds def self.heartbeat_interval EM::get_heartbeat_interval end # Set the heartbeat interval. This is how often EventMachine will check for dead connections # that have had an inactivity timeout set via {Connection#set_comm_inactivity_timeout}. # Takes a Numeric number of seconds. Default is 2. # # @param [Integer] time Heartbeat interval, in seconds def self.heartbeat_interval=(time) EM::set_heartbeat_interval time.to_f end # @private def self.event_callback conn_binding, opcode, data # # Changed 27Dec07: Eliminated the hookable error handling. # No one was using it, and it degraded performance significantly. # It's in original_event_callback, which is dead code. # # Changed 25Jul08: Added a partial solution to the problem of exceptions # raised in user-written event-handlers. If such exceptions are not caught, # we must cause the reactor to stop, and then re-raise the exception. # Otherwise, the reactor doesn't stop and it's left on the call stack. # This is partial because we only added it to #unbind, where it's critical # (to keep unbind handlers from being re-entered when a stopping reactor # runs down open connections). It should go on the other calls to user # code, but the performance impact may be too large. # if opcode == ConnectionUnbound if c = @conns.delete( conn_binding ) begin if c.original_method(:unbind).arity != 0 c.unbind(data == 0 ? nil : EventMachine::ERRNOS[data]) else c.unbind end # If this is an attached (but not watched) connection, close the underlying io object. if c.instance_variable_defined?(:@io) and !c.instance_variable_get(:@watch_mode) io = c.instance_variable_get(:@io) begin io.close rescue Errno::EBADF, IOError end end # As noted above, unbind absolutely must not raise an exception or the reactor will crash. # If there is no EM.error_handler, or if the error_handler retrows, then stop the reactor, # stash the exception in $wrapped_exception, and the exception will be raised after the # reactor is cleaned up (see the last line of self.run). rescue Exception => error if instance_variable_defined? :@error_handler begin @error_handler.call error # No need to stop unless error_handler rethrows rescue Exception => error @wrapped_exception = error stop end else @wrapped_exception = error stop end end elsif c = @acceptors.delete( conn_binding ) # no-op else if $! # Bubble user generated errors. @wrapped_exception = $! stop else raise ConnectionNotBound, "received ConnectionUnbound for an unknown signature: #{conn_binding}" end end elsif opcode == ConnectionAccepted accep,args,blk = @acceptors[conn_binding] raise NoHandlerForAcceptedConnection unless accep c = accep.new data, *args @conns[data] = c blk and blk.call(c) c # (needed?) ## # The remaining code is a fallback for the pure ruby and java reactors. # In the C++ reactor, these events are handled in the C event_callback() in rubymain.cpp elsif opcode == ConnectionCompleted c = @conns[conn_binding] or raise ConnectionNotBound, "received ConnectionCompleted for unknown signature: #{conn_binding}" c.connection_completed elsif opcode == SslHandshakeCompleted c = @conns[conn_binding] or raise ConnectionNotBound, "received SslHandshakeCompleted for unknown signature: #{conn_binding}" c.ssl_handshake_completed elsif opcode == SslVerify c = @conns[conn_binding] or raise ConnectionNotBound, "received SslVerify for unknown signature: #{conn_binding}" c.close_connection if c.ssl_verify_peer(data) == false elsif opcode == TimerFired t = @timers.delete( data ) return if t == false # timer cancelled t or raise UnknownTimerFired, "timer data: #{data}" t.call elsif opcode == ConnectionData c = @conns[conn_binding] or raise ConnectionNotBound, "received data #{data} for unknown signature: #{conn_binding}" c.receive_data data elsif opcode == LoopbreakSignalled run_deferred_callbacks elsif opcode == ConnectionNotifyReadable c = @conns[conn_binding] or raise ConnectionNotBound c.notify_readable elsif opcode == ConnectionNotifyWritable c = @conns[conn_binding] or raise ConnectionNotBound c.notify_writable end end # # # @private def self._open_file_for_writing filename, handler=nil klass = klass_from_handler(Connection, handler) s = _write_file filename c = klass.new s @conns[s] = c block_given? and yield c c end # @private def self.klass_from_handler(klass = Connection, handler = nil, *args) klass = if handler and handler.is_a?(Class) raise ArgumentError, "must provide module or subclass of #{klass.name}" unless klass >= handler handler elsif handler if defined?(handler::EM_CONNECTION_CLASS) handler::EM_CONNECTION_CLASS else handler::const_set(:EM_CONNECTION_CLASS, Class.new(klass) {include handler}) end else klass end arity = klass.instance_method(:initialize).arity expected = arity >= 0 ? arity : -(arity + 1) if (arity >= 0 and args.size != expected) or (arity < 0 and args.size < expected) raise ArgumentError, "wrong number of arguments for #{klass}#initialize (#{args.size} for #{expected})" end klass end end # module EventMachine # Alias for {EventMachine} EM = EventMachine # Alias for {EventMachine::Protocols} EM::P = EventMachine::Protocols