class Supply
Asynchronous data stream with multiple subscribers
A supply is a thread-safe, asynchronous data stream like a Channel, but it can have multiple subscribers (taps) that all get the same values flowing through the supply.
It is a thread-safe implementation of the Observer Pattern, and central to supporting reactive programming in Perl 6.
There are two types of Supplies: live
and on demand
. When tapping into a live
supply, the tap will only see values that are flowing through the supply after the tap has been created. Such supplies are normally infinite in nature, such as mouse movements. Closing such a tap does not stop mouse events from occurring, it just means that the values will go by unseen. All tappers see the same flow of values.
A tap on an on demand
supply will initiate the production of values, and tapping the supply again may result in a new set of values. For example, Supply.interval
produces a fresh timer with the appropriate interval each time it is tapped. If the tap is closed, the timer simply stops emitting values to that tap.
A live
Supply
is obtained from the Supplier factory method Supply
. New values are emitted by calling emit
on the Supplier
object.
my = Supplier.new;my = .Supply;.tap(-> );.emit(42); # Will cause the tap to output "42"
The live method returns True
on live supplies. Factory methods such as interval, from-list will return on demand supplies.
A live Supply
that keeps values until tapped the first time can be created with Supplier::Preserving.
Further examples can be found in the concurrency page.
Methods that return Taps
method tap
method tap(Supply: = -> $ ,:,:,:--> Tap)
Creates a new tap (a kind of subscription if you will), in addition to all existing taps. The first positional argument is a piece of code that will be called when a new value becomes available through the emit
call.
The &done
callback can be called in a number of cases: if a supply block is being tapped, when a done
routine is reached; if a supply block is being tapped, it will be automatically triggered if the supply block reaches the end; if the done
method is called on the parent Supplier
(in the case of a supply block, if there are multiple Suppliers referenced by whenever
, they must all have their done
method invoked for this to trigger the &done
callback of the tap as the block will then reach its end).
The &quit
callback is called if the tap is on a supply block which exits with an error. It is also called if the quit
method is invoked on the parent Supplier
(in the case of a supply block any one Supplier
quitting with an uncaught exception will call the &quit
callback as the block will exit with an error). The error is passed as a parameter to the callback.
The &tap
callback is called once the Tap object is created, which is passed as a parameter to the callback. The callback is called ahead of emit
/done
/quit
, providing a reliable way to get the Tap
object. One case where this is useful is when the Supply
begins emitting values synchronously, since the call to .tap
won't return the Tap
object until it is done emitting, preventing it from being stopped if needed.
Method tap
returns an object of type Tap, on which you can call the close
method to cancel the subscription.
my = Supply.from-list(0 .. 5);my = .tap(-> , done => );
Produces:
012345no more ticks
method act
method act(Supply: --> Tap)
Creates a tap on the given supply with the given code. Differently from tap
, the given code is guaranteed to be only executed by one thread at a time.
Utility methods
method Capture
Defined as:
method Capture(Supply --> Capture)
Equivalent to calling .List.Capture
on the invocant.
method Channel
method Channel(Supply: --> Channel)
Returns a Channel object that will receive all future values from the supply, and will be close
d when the Supply is done, and quit (shut down with error) when the supply is quit.
method Promise
method Promise(Supply: --> Promise)
Returns a Promise that will be kept when the Supply
is done
. If the Supply
also emits any values, then the Promise
will be kept with the final value. Otherwise, it will be kept with Nil
. If the Supply
ends with a quit
instead of a done
, then the Promise
will be broken with that exception.
my = Supplier.new;my = .Supply;my = .Promise;.then(-> );.emit('cha'); # not output yet.done(); # got cha
The Promise
method is most useful when dealing with supplies that will tend to produce just one value, when only the final value is of interest, or when only completion (successful or not) is relevant.
method live
method live(Supply: --> Bool)
Returns True
if the supply is "live", that is, values are emitted to taps as soon as they arrive. Always returns True
in the default Supply
(but for example on the supply returned from Supply.from-list
it's False
).
say Supplier.new.Supply.live; # OUTPUT: «True»
method schedule-on
method schedule-on(Supply: Scheduler )
Runs the emit, done and quit callbacks on the specified scheduler.
This is useful for GUI toolkits that require certain actions to be run from the GUI thread.
Methods that wait until the supply is done
method wait
method wait(Supply:)
Taps the Supply
it is called on, and blocks execution until the either the supply is done
(in which case it evaluates to the final value that was emitted on the Supply
, or Nil
if not value was emitted) or quit
(in which case it will throw the exception that was passed to quit
).
my = Supplier.new;start.Supply.wait;say "Two seconds: done";
method list
method list(Supply: --> List)
Taps the Supply
it is called on, and returns a lazy list that will be reified as the Supply
emits values. The list will be terminated once the Supply
is done
. If the Supply
quit
s, then an exception will be thrown once that point in the lazy list is reached.
method grab
method grab(Supply: --> Supply)
Taps the Supply
it is called on. When it is done
, calls &when-done
and then emits the list of values that it returns on the result Supply
. If the original Supply
quit
s, then the exception is immediately conveyed on the return Supply
.
my = Supply.from-list(4, 10, 3, 2);my = .grab();.tap(); # OUTPUT: «19»
method reverse
method reverse(Supply: --> Supply)
Taps the Supply
it is called on. Once that Supply
emits done
, all of the values it emitted will be emitted on the returned Supply
in reverse order. If the original Supply
quit
s, then the exception is immediately conveyed on the return Supply
.
my = Supply.from-list(1, 2, 3);my = .reverse;.tap(); # OUTPUT: «321»
method sort
method sort(Supply: ? --> Supply)
Taps the Supply
it is called on. Once that Supply
emits done
, all of the values that it emitted will be sorted, and the results emitted on the returned Supply
in the sorted order. Optionally accepts a comparator Block. If the original Supply
quit
s, then the exception is immediately conveyed on the return Supply
.
my = Supply.from-list(4, 10, 3, 2);my = .sort();.tap(); # OUTPUT: «23410»
Methods that return another Supply
method from-list
method from-list(Supply: + --> Supply)
Creates an on-demand supply from the values passed to this method.
my = Supply.from-list(1, 2, 3);.tap(); # OUTPUT: «123»
method share
method share(Supply: --> Supply)
Creates a live supply from an on-demand supply, thus making it possible to share the values of the on-demand supply on multiple taps, instead of each tap seeing its own copy of all values from the on-demand supply.
# this says in turn: "first 1" "first 2" "second 2" "first 3" "second 3"my = Supply.interval(1).share;.tap: ;sleep 1.1;.tap: ;sleep 2
method flat
method flat(Supply: --> Supply)
Creates a supply on which all of the values seen in the given supply are flattened before being emitted again.
method do
method do(Supply: --> Supply)
Creates a supply to which all values seen in the given supply, are emitted again. The given code, executed for its side-effects only, is guaranteed to be only executed by one thread at a time.
method on-close
method on-close(Supply: --> Supply)
Returns a new Supply
which will run &on-close
whenever a Tap of that Supply
is closed. This includes if further operations are chained on to the Supply
. (for example, $supply.on-close(&on-close).map(*.uc)
). When using a react
or supply
block, using the CLOSE phaser is usually a better choice.
my = Supplier.new;my = .Supply.on-close().tap(-> ,done => ,quit => -> ,);.emit('Perl 6');.close; # OUTPUT: «Tap closed»
method interval
method interval(Supply: , = 0, : = --> Supply)
Creates a supply that emits a value every $interval
seconds, starting $delay
seconds from the call. The emitted value is an integer, starting from 0, and is incremented by one for each value emitted.
Implementations may treat too-small values as lowest resolution they support, possibly warning in such situations; e.g. treating 0.0001
as 0.001
. For 6.d language version, the minimal value specified is 0.001
.
method grep
method grep(Supply: Mu --> Supply)
Creates a new supply that only emits those values from the original supply that smartmatch against $test
.
my = Supplier.new;my = .Supply;my = .grep(Int);.tap();.emit() for 1, 'a string', 3.14159; # prints only 1
method map
method map(Supply: --> Supply)
Returns a new supply that maps each value of the given supply through &mapper
and emits it to the new supply.
my = Supplier.new;my = .Supply;my = .map(-> );.tap();.emit(4); # RESULT: «8»
method batch
method batch(Supply: :, : --> Supply)
Creates a new supply that batches the values of the given supply by either the number of elements in the batch (using :elems
) or the maximum number of seconds (using the :seconds
) or both. Any remaining values are emitted in a final batch when the supply is done.
method elems
method elems(Supply: ? --> Supply)
Creates a new supply in which changes to the number of values seen are emitted. It optionally also takes an interval (in seconds) if you only want to be updated every so many seconds.
method head
method head(Supply: Int(Cool) = 1 --> Supply)
Creates a "head" supply with the same semantics as List.head.
my = Supply.from-list(4, 10, 3, 2);my = .head(2);.tap(); # OUTPUT: «410»
method tail
method tail(Supply: Int(Cool) = 1 --> Supply)
Creates a "tail" supply with the same semantics as List.tail.
my = Supply.from-list(4, 10, 3, 2);my = .tail(2);.tap(); # OUTPUT: «32»
method rotor
method rotor(Supply: --> Supply)
Creates a "rotoring" supply with the same semantics as List.rotor.
method delayed
method delayed(Supply: , : = --> Supply)
Creates a new supply in which all values flowing through the given supply are emitted, but with the given delay in seconds.
method throttle
method throttle(Supply:, # values / time or simultaneous processingor , # time-unit / code to process simultaneously= 0, # initial delay before starting, in seconds:, # supply to emit control messages on (optional):, # supply to tap status messages from (optional):, # supply to bleed messages to (optional):, # bleed when so many buffered (optional):, # scheduler to use, default $*SCHEDULER--> Supply)
Produces a Supply
from a given Supply, but makes sure the number of messages passed through, is limited.
It has two modes of operation: per time-unit or by maximum number of execution of a block of code: this is determined by the second positional parameter.
The first positional parameter specifies the limit that should be applied.
If the second positional parameter is a Callable
, then the limit indicates the maximum number of parallel processes executing the Callable, which is given the value that was received. The emitted values in this case will be the Promise
s that were obtained from start
ing the Callable.
If the second positional parameter is a numeric value, it is interpreted as the time-unit (in seconds). If you specify .1 as the value, then it makes sure you don't exceed the limit for every tenth of a second.
If the limit is exceeded, then incoming messages are buffered until there is room to pass on / execute the Callable again.
The third positional parameter is optional: it indicates the number of seconds the throttle will wait before passing on any values.
The :control
named parameter optionally specifies a Supply that you can use to control the throttle while it is in operation. Messages that can be sent, are strings in the form of "key:value". Please see below for the types of messages that you can send to control the throttle.
The :status
named parameter optionally specifies a Supply that will receive any status messages. If specified, it will at least send one status message after the original Supply is exhausted. See status message below.
The :bleed
named parameter optionally specifies a Supply that will receive any values that were either explicitly bled (with the bleed control message), or automatically bled (if there's a vent-at active).
The :vent-at
named parameter indicates the number of values that may be buffered before any additional value will be routed to the :bleed
Supply. Defaults to 0 if not specified (causing no automatic bleeding to happen). Only makes sense if a :bleed
Supply has also been specified.
The :scheduler
named parameter indicates the scheduler to be used. Defaults to $*SCHEDULER
.
control messages
These messages can be sent to the :control
Supply. A control message consists of a string of the form "key: value", e.g. "limit: 4".
limit
Change the number of messages (as initially given in the first positional) to the value given.
bleed
Route the given number of buffered messages to the :bleed
Supply.
vent-at
Change the maximum number of buffered values before automatic bleeding takes place. If the value is lower than before, will cause immediate rerouting of buffered values to match the new maximum.
status
Send a status message to the :status
Supply with the given id.
status message
The status return message is a hash with the following keys:
allowed
The current number of messages / callables that is still allowed to be passed / executed.
bled
The number of messages routed to the :bleed
Supply.
buffered
The number of messages currently buffered because of overflow.
emitted
The number of messages emitted (passed through).
id
The id of this status message (a monotonically increasing number). Handy if you want to log status messages.
limit
The current limit that is being applied.
vent-at
The maximum number of messages that may be buffered before they're automatically re-routed to the :bleed
Supply.
Examples
Have a simple piece of code announce when it starts running asynchronously, wait a random amount of time, then announce when it is done. Do this 6 times, but don't let more than 3 of them run simultaneously.
my = Supply.from-list(^6); # set up supplymy = .throttle: 3, # only allow 3 at a time# don't need ; because } at end of line.wait; # wait for the supply to be done
and the result of one run will be:
running 0running 1running 2done 2running 3done 1running 4done 4running 5done 0done 3done 5
method stable
method stable(Supply: , : = --> Supply)
Creates a new supply that only passes on a value flowing through the given supply if it wasn't superseded by another value in the given $time
(in seconds). Optionally uses another scheduler than the default scheduler, using the :scheduler
parameter.
To clarify the above, if, during the timeout $time
, additional values are emitted to the Supplier
all but the last one will be thrown away. Each time an additional value is emitted to the Supplier
, during the timeout, $time
is reset.
This method can be quite useful when handling UI input, where it is not desired to perform an operation until the user has stopped typing for a while rather than on every keystroke.
my = Supplier.new;my = .Supply;.tap(-> );.emit(42);my Supply = .stable(5);.tap(-> );sleep(3);.emit(43); # will not be seen by $supply2 but will reset $time.emit(44);sleep(10);# OUTPUT: «Supply1 got: 42Supply1 got: 43Supply1 got: 44Supply2 got: 44»
As can be seen above, $supply1
received all values emitted to the Supplier
while $supply2
only received one value. The 43 was thrown away because it was followed by another 'last' value 44 which was retained and sent to $supply2
after approximately eight seconds, this due to the fact that the timeout $time
was reset after three seconds.
method reduce
method reduce(Supply: --> Supply)
Creates a "reducing" supply with the same semantics as List.reduce.
my = Supply.from-list(1..5).reduce();.tap(-> ); # OUTPUT: «15»
method produce
method produce(Supply: --> Supply)
Creates a "producing" supply with the same semantics as List.produce.
my = Supply.from-list(1..5).produce();.tap(-> ); # OUTPUT: «1361015»
method lines
method lines(Supply: : = True --> Supply)
Creates a supply that will emit the characters coming in line by line from a supply that's usually created by some asynchronous I/O operation. The optional :chomp
parameter indicates whether to remove line separators: the default is True
.
method words
method words(Supply: --> Supply)
Creates a supply that will emit the characters coming in word for word from a supply that's usually created by some asynchronous I/O operation.
my = Supply.from-list("Hello Word!".comb);my = .words;.tap(); # OUTPUT: «HelloWord!»
method unique
method unique(Supply: :, :, : --> Supply)
Creates a supply that only provides unique values, as defined by the optional :as
and :with
parameters (same as with List.unique). The optional :expires
parameter how long to wait (in seconds) before "resetting" and not considering a value to have been seen, even if it's the same as an old value.
method squish
method squish(Supply: :, : --> Supply)
Creates a supply that only provides unique values, as defined by the optional :as
and :with
parameters (same as with List.squish).
method max
method max(Supply: = :<cmp> --> Supply)
Creates a supply that only emits values from the given supply if they are larger than any value seen before. In other words, from a continuously ascending supply it will emit all the values. From a continuously descending supply it will only emit the first value. The optional parameter specifies the comparator, just as with Any.max.
method min
method min(Supply: = :<cmp> --> Supply)
Creates a supply that only emits values from the given supply if they are smaller than any value seen before. In other words, from a continuously descending supply it will emit all the values. From a continuously ascending supply it will only emit the first value. The optional parameter specifies the comparator, just as with Any.min.
method minmax
method minmax(Supply: = :<cmp> --> Supply)
Creates a supply that emits a Range every time a new minimum or maximum values is seen from the given supply. The optional parameter specifies the comparator, just as with Any.minmax.
method skip
method skip(Supply: Int(Cool) = 1 --> Supply)
Returns a new Supply
which will emit all values from the given Supply
except for the first $number
values, which will be thrown away.
my = Supplier.new;my = .Supply;= .skip(3);.tap();.emit() for 1..10; # OUTPUT: «45678910»
method start
method start(Supply: --> Supply)
Creates a supply of supplies. For each value in the original supply, the code object is scheduled on another thread, and returns a supply either of a single value (if the code succeeds), or one that quits without a value (if the code fails).
This is useful for asynchronously starting work that you don't block on.
Use migrate
to join the values into a single supply again.
method migrate
method migrate(Supply: --> Supply)
Takes a Supply
which itself has values that are of type Supply
as input. Each time the outer Supply
emits a new Supply
, this will be tapped and its values emitted. Any previously tapped Supply
will be closed. This is useful for migrating between different data sources, and only paying attention to the latest one.
For example, imagine an application where the user can switch between different stocks. When they switch to a new one, a connection is established to a web socket to get the latest values, and any previous connection should be closed. Each stream of values coming over the web socket would be represented as a Supply, which themselves are emitted into a Supply of latest data sources to watch. The migrate
method could be used to flatten this supply of supplies into a single Supply of the current values that the user cares about.
Here is a simple simulation of such a program:
my Supplier .= new;sub watch-stock().Supply.migrate.tap: *.say;watch-stock('GOOG');sleep 3;watch-stock('AAPL');sleep 3;
Which produces output like:
Starting to watch GOOGGOOG: 111.67GOOG: 111.20GOOG: 111.37Lost interest in GOOGStarting to watch AAPLAAPL: 111.55AAPL: 111.6AAPL: 111.6
Methods that combine supplies
method merge
method merge(Supply --> Supply)
Creates a supply to which any value seen from the given supplies, is emitted. The resulting supply is done Only when all given supplies are done. Can also be called as a class method.
method zip
method zip(Supply , : = &[,] --> Supply)
Creates a supply that emits combined values as soon as there is a new value seen on all of the supplies. By default, Lists are created, but this can be changed by specifying your own combiner with the :with
parameter. The resulting supply is done as soon as any of the given supplies are done. Can also be called as a class method.
method zip-latest
method zip-latest(Supply , : = &[,], : --> Supply)
Creates a supply that emits combined values as soon as there is a new value seen on any of the supplies. By default, Lists are created, but this can be changed by specifying your own combiner with the :with
parameter. The optional :initial parameter can be used to indicate the initial state of the combined values. By default, all supplies have to have at least one value emitted on them before the first combined values is emitted on the resulting supply. The resulting supply is done as soon as any of the given supplies are done. Can also be called as a class method.
I/O features exposed as supplies
sub signal
sub signal(*, : = )
Creates a supply for the Signal enums (such as SIGINT) specified, and an optional :scheduler
parameter. Any signals received, will be emitted on the supply. For example:
signal(SIGINT).tap( );
would catch Control-C, thank you, and then exit.
To go from a signal number to a Signal, you can do something like this:
signal(Signal(2)).tap( -> );
The list of supported signals can be found by checking Signal::.keys
(as you would any enum). For more details on how enums work see enum.
Note: Rakudo versions up to 2018.05 had a bug due to which numeric values of signals were incorrect on some systems. For example, Signal(10)
was returning SIGBUS
even if it was actually SIGUSR1
on a particular system. That being said, using signal(SIGUSR1)
was working as expected on all Rakudo versions except 2018.04, 2018.04.1 and 2018.05, where the intended behavior can be achieved by using signal(SIGBUS)
instead. These issues are resolved in Rakudo releases after 2018.05.
method IO::Notification.watch-path
method watch-path( --> Supply)
Creates a supply to which the OS will emit values to indicate changes on the filesystem for the given path. Also has a shortcut with the watch
method on an IO object, like this:
IO::Notification.watch-path(".").act( );".".IO.watch.act( ); # same