Reactive Ruby with Observable

Just as Ruby’s Enumerable module offers a prepackaged implementation of the GoF Iterator pattern, so too does the Ruby Standard Library’s Observable deliver the Observer pattern with a friendly interface that makes it trivially easy to implement the pub-sub pattern.

Commonly called pub-sub (for publish-subscribe), or, in Gang of Four terms, the Observer pattern, the intent is to

Define a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically.

There are push and pull implementations of updating within the Observer pattern, but in either case the idea is for updates to be disseminated from a publisher (or “subject” in GoF terms) to an arbitrary number of subscribers (i.e., “observers”).

Description

Once an observer object has subscribed to an observable object (using #add_observer), it will receive any notifications sent by the observed object. The latter emits a notification using the #notify_observers method. It will only emit a notification if the object’s state has changed (this can be asserted using #changed).

All that’s needed after a require "observable" is an include Observable in your publisher, and invocations of #changed and #notify_observers to push updates to subscribers. To add a subscriber, just call #add_observer.

Implementation

Here’s a sample implementation adapted from the docs.

Responsibilities

First let’s get an overview of the responsibilities in play.

We have a stock ticker, which displays a continuous stream of price updates for a given stock symbol. This is our “publisher”, aka our “subject”, aka our Observable object.

# prints updated stock info to stdout
# publishes price changes to subscribed objects
class StockTicker; end

Then we have our “subscribers”, aka our observers, HighPriceAlerter and LowPriceAlerter, which do exactly what their names say they do:

# subscribe to `StockTicker`
# receive updates from `StockTicker`
# define a callback named #update to be
# invoked when `notify_observers` is called
# print a warning if one is needed
#   (i.e. if the # price is too high / too low)
class HighPriceAlerter < AbstractAlerter; end
class LowPriceAlerter < AbstractAlerter; end

Lastly, we have a mock price source. In practice something like this could wrap, say, an API call, but here we’ll just use rand for testing.

class StockPrice
  def self.fetch(*)
    60 + rand(80)
  end
end

Publisher

Our publisher class defines a #run method that, in a continuous loop, polls StockPrice for the current stock price, updating itself, and then uses #changed and #notify_subscribers to publish a price change when the price has changed.

require "observer"

class StockTicker
  include Observable

  attr_accessor :last_price
  attr_reader :stock_symbol

  def initialize(stock_symbol:)
    @stock_symbol = stock_symbol
  end

  def run
    loop do
      puts "Current price: #{current_price!}"
      publish_price_change if price_has_changed?
      sleep 1
    end
  end

  def publish_price_change
    register_a_state_change_and_push_notification do
      self.last_price = current_price
    end
  end

  def price_has_changed?
    current_price != last_price
  end

  # The #changed and #notify_observers methods come
  # courtesy of Observable
  def register_a_state_change_and_push_notification
    changed
    yield
    notify_observers(Time.now, current_price)
  end

  def current_price
    @current_price ||= current_price!
  end

  def current_price!
    @current_price = StockPrice.fetch(stock_symbol)
  end
end

Subscribers

Subscriber objects register themselves by accepting a reference to the publisher object and sending #add_observer to it with self.

# An abstract alerter class
class AbstractAlerter
  attr_reader :threshold

  def initialize(stock_ticker:, threshold:)
    subscribe_to(stock_ticker)
    @threshold = threshold
  end

  # StockTicker#add_observer provided by Observable
  def subscribe_to(stock_ticker)
    stock_ticker.add_observer(self)
  end

  def warning_message(time, price)
    time.strftime("[%T]: Price #{warning} #{threshold}: #{price}")
  end
end
# An observer class
class LowPriceAlerter < AbstractAlerter
  # callback for this observer
  def update(time, price)
    return unless price < threshold
    puts warning_message(time, price)
  end

  def warning
    :below
  end
end

# An observer class
class HighPriceAlerter < AbstractAlerter
  # callback for this observer
  def update(time, price)
    return unless price > threshold
    puts warning_message(time, price)
  end

  def warning
    :above
  end
end

Execution

Note that nowhere do these collaborator classes have to imperatively tell each other what to do.

ticker = StockTicker.new(stock_symbol: "GOOG")

LowPriceAlerter.new(stock_ticker: ticker, threshold: 80)
HighPriceAlerter.new(stock_ticker: ticker, threshold: 120)

ticker.run

Neither of our Alerters needs to be told to print a warning, nor does either of them have to be explicitly chosen and delegated to based on the nature of the update.

They each know what to do when they receive an update, they’re subscribed, and they respond automatically whenever an update is pushed.

% ruby stock_ticker.rb

# Current price: 118
# Current price: 92
# Current price: 122
# [23:27:06]: Price above 120: 122
# Current price: 61
# [23:27:07]: Price below 80: 61
# Current price: 104
# Current price: 61
# [23:27:09]: Price below 80: 61
# Current price: 107
# Current price: 108
# . . .