Reactive Ruby with Observable
Just as Ruby’s Enumerable
module offers a prepackaged
implementation of the GoF Iterator pattern, so too does the Ruby Standard
Library’s Observable
deliver the Observer pattern with a
friendly interface that makes it trivially easy to implement the pub-sub
pattern.
Commonly called pub-sub (for publish-subscribe), or, in Gang of Four terms, the Observer pattern, the intent is to
Define a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically.
There are push and pull implementations of updating within the Observer pattern, but in either case the idea is for updates to be disseminated from a publisher (or “subject” in GoF terms) to an arbitrary number of subscribers (i.e., “observers”).
Description
Once an observer object has subscribed to an observable object (using
#add_observer
), it will receive any notifications sent by the observed object.
The latter emits a notification using the #notify_observers
method. It will
only emit a notification if the object’s state has changed (this can be asserted
using #changed
).
All that’s needed after a require "observable"
is an include Observable
in
your publisher, and invocations of #changed
and #notify_observers
to push updates to subscribers. To
add a subscriber, just call #add_observer
.
Implementation
Here’s a sample implementation adapted from the docs.
Responsibilities
First let’s get an overview of the responsibilities in play.
We have a stock ticker, which displays a continuous stream of price updates for
a given stock symbol. This is our “publisher”, aka our “subject”, aka our
Observable
object.
# prints updated stock info to stdout
# publishes price changes to subscribed objects
class StockTicker; end
Then we have our “subscribers”, aka our observers, HighPriceAlerter
and
LowPriceAlerter
, which do exactly what their names say they do:
# subscribe to `StockTicker`
# receive updates from `StockTicker`
# define a callback named #update to be
# invoked when `notify_observers` is called
# print a warning if one is needed
# (i.e. if the # price is too high / too low)
class HighPriceAlerter < AbstractAlerter; end
class LowPriceAlerter < AbstractAlerter; end
Lastly, we have a mock price source. In practice something like this could wrap,
say, an API call, but here we’ll just use rand
for testing.
class StockPrice
def self.fetch(*)
60 + rand(80)
end
end
Publisher
Our publisher class defines a #run
method that, in a continuous loop, polls
StockPrice
for the current stock price, updating itself, and then uses
#changed
and #notify_subscribers
to publish a price change when the price
has changed.
require "observer"
class StockTicker
include Observable
attr_accessor :last_price
attr_reader :stock_symbol
def initialize(stock_symbol:)
@stock_symbol = stock_symbol
end
def run
loop do
puts "Current price: #{current_price!}"
publish_price_change if price_has_changed?
sleep 1
end
end
def publish_price_change
register_a_state_change_and_push_notification do
self.last_price = current_price
end
end
def price_has_changed?
current_price != last_price
end
# The #changed and #notify_observers methods come
# courtesy of Observable
def register_a_state_change_and_push_notification
changed
yield
notify_observers(Time.now, current_price)
end
def current_price
@current_price ||= current_price!
end
def current_price!
@current_price = StockPrice.fetch(stock_symbol)
end
end
Subscribers
Subscriber objects register themselves by accepting a reference to the publisher
object and sending #add_observer
to it with self
.
# An abstract alerter class
class AbstractAlerter
attr_reader :threshold
def initialize(stock_ticker:, threshold:)
subscribe_to(stock_ticker)
@threshold = threshold
end
# StockTicker#add_observer provided by Observable
def subscribe_to(stock_ticker)
stock_ticker.add_observer(self)
end
def warning_message(time, price)
time.strftime("[%T]: Price #{warning} #{threshold}: #{price}")
end
end
# An observer class
class LowPriceAlerter < AbstractAlerter
# callback for this observer
def update(time, price)
return unless price < threshold
puts warning_message(time, price)
end
def warning
:below
end
end
# An observer class
class HighPriceAlerter < AbstractAlerter
# callback for this observer
def update(time, price)
return unless price > threshold
puts warning_message(time, price)
end
def warning
:above
end
end
Execution
Note that nowhere do these collaborator classes have to imperatively tell each other what to do.
ticker = StockTicker.new(stock_symbol: "GOOG")
LowPriceAlerter.new(stock_ticker: ticker, threshold: 80)
HighPriceAlerter.new(stock_ticker: ticker, threshold: 120)
ticker.run
Neither of our Alerter
s needs to be told to print a warning, nor does either
of them have to be explicitly chosen and delegated to based on the nature of the
update.
They each know what to do when they receive an update, they’re subscribed, and they respond automatically whenever an update is pushed.
% ruby stock_ticker.rb
# Current price: 118
# Current price: 92
# Current price: 122
# [23:27:06]: Price above 120: 122
# Current price: 61
# [23:27:07]: Price below 80: 61
# Current price: 104
# Current price: 61
# [23:27:09]: Price below 80: 61
# Current price: 107
# Current price: 108
# . . .