method throttle

Documentation for method throttle assembled from the following types:

class Supply

From Supply

(Supply) method throttle

method throttle(Supply:D:
  $limit,                 # values / time or simultaneous processing 
  $seconds or $callable,  # time-unit / code to process simultaneously 
  $delay = 0,             # initial delay before starting, in seconds 
  :$control,              # supply to emit control messages on (optional) 
  :$status,               # supply to tap status messages from (optional) 
  :$bleed,                # supply to bleed messages to (optional) 
  :$vent-at,              # bleed when so many buffered (optional) 
  :$scheduler,            # scheduler to use, default $*SCHEDULER 
  --> Supply:D)

Produces a Supply from a given Supply, but makes sure the number of messages passed through, is limited.

It has two modes of operation: per time-unit or by maximum number of execution of a block of code: this is determined by the second positional parameter.

The first positional parameter specifies the limit that should be applied.

If the second positional parameter is a Callable, then the limit indicates the maximum number of parallel processes executing the Callable, which is given the value that was received. The emitted values in this case will be the Promises that were obtained from starting the Callable.

If the second positional parameter is a numeric value, it is interpreted as the time-unit (in seconds). If you specify .1 as the value, then it makes sure you don't exceed the limit for every tenth of a second.

If the limit is exceeded, then incoming messages are buffered until there is room to pass on / execute the Callable again.

The third positional parameter is optional: it indicates the number of seconds the throttle will wait before passing on any values.

The :control named parameter optionally specifies a Supply that you can use to control the throttle while it is in operation. Messages that can be sent, are strings in the form of "key:value". Please see below for the types of messages that you can send to control the throttle.

The :status named parameter optionally specifies a Supply that will receive any status messages. If specified, it will at least send one status message after the original Supply is exhausted. See status message below.

The :bleed named parameter optionally specifies a Supply that will receive any values that were either explicitly bled (with the bleed control message), or automatically bled (if there's a vent-at active).

The :vent-at named parameter indicates the number of values that may be buffered before any additional value will be routed to the :bleed Supply. Defaults to 0 if not specified (causing no automatic bleeding to happen). Only makes sense if a :bleed Supply has also been specified.

The :scheduler named parameter indicates the scheduler to be used. Defaults to $*SCHEDULER.

control messages

These messages can be sent to the :control Supply. A control message consists of a string of the form "key: value", e.g. "limit: 4".

Change the number of messages (as initially given in the first positional) to the value given.

Route the given number of buffered messages to the :bleed Supply.

Change the maximum number of buffered values before automatic bleeding takes place. If the value is lower than before, will cause immediate rerouting of buffered values to match the new maximum.

Send a status message to the :status Supply with the given id.

status message

The status return message is a hash with the following keys:

The current number of messages / callables that is still allowed to be passed / executed.

The number of messages routed to the :bleed Supply.

The number of messages currently buffered because of overflow.

The number of messages emitted (passed through).

The id of this status message (a monotonically increasing number). Handy if you want to log status messages.

The current limit that is being applied.

The maximum number of messages that may be buffered before they're automatically re-routed to the :bleed Supply.

Examples

Have a simple piece of code announce when it starts running asynchronously, wait a random amount of time, then announce when it is done. Do this 6 times, but don't let more than 3 of them run simultaneously.

my $s = Supply.from-list(^6);  # set up supply 
my $t = $s.throttle: 3,        # only allow 3 at a time 
{                              # code block to run 
    say "running $_";          # announce we've started 
    sleep rand;                # wait some random time 
    say "done $_"              # announce we're done 
}                              # don't need ; because } at end of line 
$t.wait;                       # wait for the supply to be done

and the result of one run will be:

running 0
running 1
running 2
done 2
running 3
done 1
running 4
done 4
running 5
done 0
done 3
done 5