xref: /Universal-ctags/Units/parser-perl6.r/perl6-bunch2.d/input.pm (revision 6451d5eeb6083c954b0b2b2eda601575ca4af2e6)
1# Anything that can be subscribed to does this role. It provides the basic
2# supply management infrastructure, as well as various coercions that
3# turn Supply-like things into something else and convenience forms of calls
4# to SupplyOperations.
5
6my class SupplyOperations is repr('Uninstantiable') { ... }
7my class X::Supply::Combinator is Exception {
8    has $.combinator;
9    method message() { "Can only use $!combinator to combine defined Supply objects" }
10}
11
12my class Tap {
13    has &.emit;
14    has &.done;
15    has &.quit;
16    has &.closing;
17    has $.supply;
18
19    method close (Tap:D:) { $!supply.close(self) }
20}
21
22my role Supply {
23    has $!tappers_lock = Lock.new;
24    has @!tappers;
25    has $!been_tapped;
26    has @!paused;
27
28    method tap(Supply:D: &emit = -> $ { }, :&done,:&quit={die $_},:&closing) {
29        my $tap = Tap.new(:&emit, :&done, :&quit, :&closing, :supply(self));
30        $!tappers_lock.protect({
31            @!tappers.push($tap);
32            if @!paused -> \todo {
33                $tap.emit().($_) for todo;
34                @!paused = ();
35            }
36            $!been_tapped = True;
37        });
38        $tap
39    }
40
41    proto method close(|) { * }
42    multi method close(Supply:D:) { self.close($_) for self.tappers }
43    multi method close(Supply:D: Tap $t) {
44        my $found;
45        $!tappers_lock.protect({
46            @!tappers .= grep( { $_ === $t ?? !($found = True) !! True } );
47        });
48        if $t.closing -> &closing {
49            closing();
50        }
51        $found // False;
52    }
53
54    method tappers(Supply:D:) {
55        # Shallow clone to provide safe snapshot.
56        my @tappers;
57        $!tappers_lock.protect({ @tappers = @!tappers });
58        @tappers
59    }
60
61    method emit(Supply:D: \msg) {
62        if self.tappers -> \tappers {
63            .emit().(msg) for tappers;
64        }
65        elsif !$!been_tapped {
66            $!tappers_lock.protect({ @!paused.push: msg });
67        }
68        Nil;
69    }
70
71    method more(Supply:D: \msg) {
72        DEPRECATED('emit', |<2014.10 2015.09>);
73        self.emit(msg);
74    }
75
76    method done(Supply:D:) {
77        for self.tappers -> $t {
78            my $l = $t.done();
79            $l() if $l;
80        }
81        Nil;
82    }
83
84    method quit(Supply:D: $ex) {
85        for self.tappers -> $t {
86            my $f = $t.quit();
87            $f($ex) if $f;
88        }
89        Nil;
90    }
91
92    method taps(Supply:D:) { +@!tappers }
93    method live(Supply:D:) { True };
94
95    method Supply(Supply:) { self }
96    method Channel(Supply:D:) {
97        my $c = Channel.new();
98        self.tap( -> \val { $c.send(val) },
99          done => { $c.close },
100          quit => -> $ex { $c.quit($ex) });
101        $c
102    }
103
104    method Promise(Supply:D:) {
105        my $l = Lock.new;
106        my $p = Promise.new;
107        my $v = $p.vow;
108        my $t = self.tap(
109          -> \val {
110              $l.protect( {
111                  if $p.status == Planned {
112                      $v.keep(val);
113                      $t.close()
114                  }
115              } );
116          },
117          done => { $v.break("No value received") },
118          quit => -> \ex {
119              $l.protect( {
120                  if $p.status == Planned {
121                      $v.break(ex);
122                      $t.close()
123                  }
124              } );
125          },
126        );
127        $p
128    }
129
130    method wait(Supply:D:) {
131        my $l = Lock.new;
132        my $p = Promise.new;
133        my $t = self.tap( -> \val {},
134          done => {
135              $l.protect( {
136                  if $p.status == Planned {
137                      $p.keep(True);
138                      $t.close()
139                  }
140              } );
141          },
142          quit => -> \ex {
143              $l.protect( {
144                  if $p.status == Planned {
145                      $p.break(ex);
146                      $t.close()
147                  }
148              } );
149          },
150        );
151        $p.result
152    }
153
154    method list(Supply:D:) {
155        # Use a Channel to handle any asynchrony.
156        self.Channel.list;
157    }
158
159    method on-demand(Supply:U: |c)       { SupplyOperations.on-demand(|c) }
160    method from-list(Supply:U: |c)       { SupplyOperations.from-list(|c) }
161    method interval(Supply:U: |c)        { SupplyOperations.interval(|c) }
162    method flat(Supply:D: )              { SupplyOperations.flat(self) }
163    method grep(Supply:D: Mu $test)      { SupplyOperations.grep(self, $test) }
164    method map(Supply:D: &mapper)        { SupplyOperations.map(self, &mapper) }
165    method schedule-on(Supply:D: Scheduler $scheduler) {
166        SupplyOperations.schedule-on(self, $scheduler);
167    }
168    method start(Supply:D: &startee)     { SupplyOperations.start(self, &startee) }
169    method stable(Supply:D: $time, :$scheduler = $*SCHEDULER) {
170        SupplyOperations.stable(self, $time, :$scheduler);
171    }
172    method delay(Supply:D: $time, :$scheduler = $*SCHEDULER) {
173        DEPRECATED('delayed', '2015.02', '2015.09');
174        SupplyOperations.delayed(self, $time, :$scheduler);
175    }
176    method delayed(Supply:D: $time, :$scheduler = $*SCHEDULER) {
177        SupplyOperations.delayed(self, $time, :$scheduler)
178    }
179    method migrate(Supply:D: )           { SupplyOperations.migrate(self) }
180
181    multi method classify(Supply:D: &mapper )  {
182        SupplyOperations.classify(self, &mapper);
183    }
184    multi method classify(Supply:D: %mapper )  {
185        SupplyOperations.classify(self, { %mapper{$^a} });
186    }
187    multi method classify(Supply:D: @mapper )  {
188        SupplyOperations.classify(self, { @mapper[$^a] });
189    }
190
191    proto method categorize (|) { * }
192    multi method categorize(Supply:D: &mapper )  {
193        SupplyOperations.classify(self, &mapper, :multi);
194    }
195    multi method categorize(Supply:D: %mapper )  {
196        SupplyOperations.classify(self, { %mapper{$^a} }, :multi);
197    }
198    multi method categorize(Supply:D: @mapper )  {
199        SupplyOperations.classify(self, { @mapper[$^a] }, :multi);
200    }
201
202    method act(Supply:D: &actor) {
203        self.do(&actor).tap(|%_) # need "do" for serializing callbacks
204    }
205
206    method do(Supply:D $self: &side_effect) {
207        on -> $res {
208            $self => -> \val { side_effect(val); $res.emit(val) }
209        }
210    }
211
212    method unique(Supply:D $self: :&as, :&with, :$expires) {
213        on -> $res {
214            $self => do {
215                if $expires {
216                    if &with and &with !=== &[===] {
217                        my @seen;  # really Mu, but doesn't work in settings
218                        my Mu $target;
219                        &as
220                          ?? -> \val {
221                              my $now := now;
222                              $target = &as(val);
223                              my $index =
224                                @seen.first-index({&with($target,$_[0])});
225                              if $index.defined {
226                                  if $now > @seen[$index][1] {  # expired
227                                      @seen[$index][1] = $now+$expires;
228                                      $res.emit(val);
229                                  }
230                              }
231                              else {
232                                  @seen.push: [$target, $now+$expires];
233                                  $res.emit(val);
234                              }
235                          }
236                          !! -> \val {
237                              my $now := now;
238                              my $index =
239                                @seen.first-index({&with(val,$_[0])});
240                              if $index.defined {
241                                  if $now > @seen[$index][1] {  # expired
242                                      @seen[$index][1] = $now+$expires;
243                                      $res.emit(val);
244                                  }
245                              }
246                              else {
247                                  @seen.push: [val, $now+$expires];
248                                  $res.emit(val);
249                              }
250                          };
251                    }
252                    else {
253                        my $seen := nqp::hash();
254                        my str $target;
255                        &as
256                          ?? -> \val {
257                              my $now := now;
258                              $target = nqp::unbox_s(&as(val).WHICH);
259                              if !nqp::existskey($seen,$target) ||
260                                $now > nqp::atkey($seen,$target) { #expired
261                                  $res.emit(val);
262                                  nqp::bindkey($seen,$target,$now+$expires);
263                              }
264                          }
265                          !! -> \val {
266                              my $now := now;
267                              $target = nqp::unbox_s(val.WHICH);
268                              if !nqp::existskey($seen,$target) ||
269                                $now > nqp::atkey($seen,$target) { #expired
270                                  $res.emit(val);
271                                  nqp::bindkey($seen,$target,$now+$expires);
272                              }
273                          };
274                    }
275                }
276                else { # !$!expires
277                    if &with and &with !=== &[===] {
278                        my @seen;  # really Mu, but doesn't work in settings
279                        my Mu $target;
280                        &as
281                          ?? -> \val {
282                              $target = &as(val);
283                              if @seen.first({ &with($target,$_) } ) =:= Nil {
284                                  @seen.push($target);
285                                  $res.emit(val);
286                              }
287                          }
288                          !! -> \val {
289                              if @seen.first({ &with(val,$_) } ) =:= Nil {
290                                  @seen.push(val);
291                                  $res.emit(val);
292                              }
293                          };
294                    }
295                    else {
296                        my $seen := nqp::hash();
297                        my str $target;
298                        &as
299                          ?? -> \val {
300                              $target = nqp::unbox_s(&as(val).WHICH);
301                              unless nqp::existskey($seen, $target) {
302                                  nqp::bindkey($seen, $target, 1);
303                                  $res.emit(val);
304                              }
305                          }
306                          !! -> \val {
307                              $target = nqp::unbox_s(val.WHICH);
308                              unless nqp::existskey($seen, $target) {
309                                  nqp::bindkey($seen, $target, 1);
310                                  $res.emit(val);
311                              }
312                          };
313                    }
314                }
315            }
316        }
317    }
318
319    method squish(Supply:D $self: :&as, :&with is copy) {
320        &with //= &[===];
321        on -> $res {
322            my @secret;
323            $self => do {
324                my Mu $last = @secret;
325                my Mu $target;
326                &as
327                  ?? -> \val {
328                      $target = &as(val);
329                      unless &with($target,$last) {
330                          $last = $target;
331                          $res.emit(val);
332                      }
333                  }
334                  !! -> \val {
335                      unless &with(val,$last) {
336                          $last = val;
337                          $res.emit(val);
338                      }
339                  };
340            }
341        }
342    }
343
344    proto method rotor(|) {*}
345    multi method rotor(Supply:D:) {
346        DEPRECATED('.rotor( $elems => -$gap )',|<2015.04 2015.09>);
347        self.rotor( (2 => -1) );
348    }
349    multi method rotor(Supply:D $self: *@cycle, :$partial) {
350        my @c := @cycle.infinite ?? @cycle !! @cycle xx *;
351
352        on -> $res {
353            $self => do {
354                my Int $elems;
355                my Int $gap;
356                my int $to-skip;
357                my int $skip;
358                sub next-batch() {
359                    given @c.shift {
360                        when Pair {
361                            $elems   = +.key;
362                            $gap     = +.value;
363                            $to-skip = $gap > 0 ?? $gap !! 0;
364                        }
365                        default {
366                            $elems   = +$_;
367                            $gap     = 0;
368                            $to-skip = 0;
369                        }
370                    }
371                }
372                next-batch;
373
374                my @batched;
375                sub flush() {
376                    $res.emit( [@batched] );
377                    @batched.splice( 0, +@batched + $gap );
378                    $skip = $to-skip;
379                }
380
381                {
382                    emit => -> \val {
383                        @batched.push: val unless $skip && $skip--;
384                        if @batched.elems == $elems {
385                            flush;
386                            next-batch;
387                        }
388                    },
389                    done => {
390                        flush if @batched and $partial;
391                        $res.done;
392                    }
393                }
394            }
395        }
396    }
397
398    method batch(Supply:D $self: :$elems, :$seconds ) {
399
400        return $self if (!$elems or $elems == 1) and !$seconds;  # nothing to do
401
402        on -> $res {
403            $self => do {
404                my @batched;
405                my $last_time;
406                sub flush {
407                    $res.emit([@batched]);
408                    @batched = ();
409                }
410
411                {
412                    emit => do {
413                        if $seconds {
414                            $last_time = time div $seconds;
415
416                            $elems # and $seconds
417                              ??  -> \val {
418                                  my $this_time = time div $seconds;
419                                  if $this_time != $last_time {
420                                      flush if @batched;
421                                      $last_time = $this_time;
422                                      @batched.push: val;
423                                  }
424                                  else {
425                                      @batched.push: val;
426                                      flush if @batched.elems == $elems;
427                                  }
428                              }
429                              !! -> \val {
430                                  my $this_time = time div $seconds;
431                                  if $this_time != $last_time {
432                                      flush if @batched;
433                                      $last_time = $this_time;
434                                  }
435                                  @batched.push: val;
436                              }
437                        }
438                        else { # just $elems
439                            -> \val {
440                                @batched.push: val;
441                                flush if @batched.elems == $elems;
442                            }
443                        }
444                    },
445                    done => {
446                        flush if @batched;
447                        $res.done;
448                    }
449                }
450            }
451        }
452    }
453
454    method lines(Supply:D $self: :$chomp = True ) {
455
456        on -> $res {
457            $self => do {
458                my str $str;
459                my int $chars;
460                my int $left;
461                my int $pos;
462                my int $nextpos;
463                my int $found;
464                my int $cr;
465                my int $crlf;
466
467                {
468                    emit => -> \val {
469                        $str   = $str ~ nqp::unbox_s(val);
470                        $chars = nqp::chars($str);
471                        $pos   = 0;
472
473                        while ($left = $chars - $pos) > 0 {
474                            $nextpos = nqp::findcclass(
475                              nqp::const::CCLASS_NEWLINE, $str, $pos, $left
476                            );
477
478                            # no trailing line delimiter, so go buffer
479                            last unless nqp::iscclass(
480                              nqp::const::CCLASS_NEWLINE, $str, $nextpos
481                            );
482
483                            # potentially broken CRLF, so go buffer
484                            $cr = nqp::ordat($str, $nextpos) == 13;    # CR
485                            last if $cr == 1 and $nextpos + 1 == $chars;
486
487                            $crlf = $cr
488                              && nqp::ordat($str, $nextpos + 1) == 10; # LF
489
490                            if $chomp {
491                                $res.emit( ($found = $nextpos - $pos)
492                                  ?? nqp::box_s(
493                                       nqp::substr($str, $pos, $found), Str)
494                                  !! ''
495                                );
496                                $pos = $nextpos + 1 + $crlf;
497                            }
498                            else {
499                                $found = $nextpos - $pos + 1 + $crlf;
500                                $res.emit( nqp::box_s(
501                                  nqp::substr($str, $pos, $found), Str)
502                                );
503                                $pos = $pos + $found;
504                            }
505                        }
506                        $str = $pos < $chars
507                          ?? nqp::substr($str,$pos)
508                          !! '';
509                    },
510                    done => {
511                        if $str {
512                            $chars = nqp::chars($str);
513                            $res.emit( $chomp
514                              && nqp::ordat($str, $chars - 1) == 13    # CR
515                              ?? nqp::box_s(nqp::substr($str,0,$chars - 1),Str)
516                              !! nqp::box_s($str, Str)
517                            );
518                        }
519                        $res.done;
520                    }
521                }
522            }
523        }
524    }
525
526    method words(Supply:D $self:) {
527
528        on -> $res {
529            $self => do {
530                my str $str;
531                my int $chars;
532                my int $left;
533                my int $pos;
534                my int $nextpos;
535                my int $found;
536                my int $cr;
537                my int $crlf;
538
539                {
540                    emit => -> \val {
541                        $str   = $str ~ nqp::unbox_s(val);
542                        $chars = nqp::chars($str);
543                        $pos   = nqp::findnotcclass(
544                          nqp::const::CCLASS_WHITESPACE, $str, 0, $chars);
545
546                        while ($left = $chars - $pos) > 0 {
547                            $nextpos = nqp::findcclass(
548                              nqp::const::CCLASS_WHITESPACE, $str, $pos, $left
549                            );
550
551                            last unless $left = $chars - $nextpos; # broken word
552
553                            $res.emit( nqp::box_s(
554                              nqp::substr( $str, $pos, $nextpos - $pos ), Str)
555                            );
556
557                            $pos = nqp::findnotcclass(
558                              nqp::const::CCLASS_WHITESPACE,$str,$nextpos,$left);
559                        }
560                        $str = $pos < $chars
561                          ?? nqp::substr($str,$pos)
562                          !! '';
563                    },
564                    done => {
565                        $res.emit( nqp::box_s($str, Str) ) if $str;
566                        $res.done;
567                    }
568                }
569            }
570        }
571    }
572
573    method elems(Supply:D $self: $seconds? ) {
574
575        on -> $res {
576            $self => do {
577                my $elems = 0;
578                my $last_time;
579                my $last_elems;
580
581                {
582                    emit => do {
583                        if $seconds {
584                            $last_time  = time div $seconds;
585                            $last_elems = $elems;
586                            -> \val {
587                                  $last_elems = ++$elems;
588                                  my $this_time = time div $seconds;
589                                  if $this_time != $last_time {
590                                      $res.emit($elems);
591                                      $last_time = $this_time;
592                                  }
593                            }
594                        }
595                        else {
596                            -> \val { $res.emit(++$elems) }
597                        }
598                    },
599                    done => {
600                        $res.emit($elems) if $seconds and $elems != $last_elems;
601                        $res.done;
602                    }
603                }
604            }
605        }
606    }
607
608    method last(Supply:D $self: Int $number = 1) {  # should be Natural
609        on -> $res {
610            $self => do {
611                my @seen;
612                {
613                    emit => $number == 1
614                      ?? -> \val { @seen[0] = val }
615                      !! -> \val {
616                          @seen.shift if +@seen == $number;
617                          @seen.push: val;
618                      },
619                    done => {
620                        $res.emit($_) for @seen;
621                        $res.done;
622                    }
623                }
624            }
625        }
626    }
627
628    method min(Supply:D $self: &by = &infix:<cmp>) {
629        my &cmp = &by.arity == 2 ?? &by !! { by($^a) cmp by($^b) }
630        on -> $res {
631            $self => do {
632                my $min;
633                {
634                    emit => -> \val {
635                        if val.defined and !$min.defined || cmp(val,$min) < 0 {
636                            $res.emit( $min = val );
637                        }
638                    },
639                    done => { $res.done }
640                }
641            }
642        }
643    }
644
645    method max(Supply:D $self: &by = &infix:<cmp>) {
646        my &cmp = &by.arity == 2 ?? &by !! { by($^a) cmp by($^b) }
647        on -> $res {
648            $self => do {
649                my $max;
650                {
651                    emit => -> \val {
652                        if val.defined and !$max.defined || cmp(val,$max) > 0 {
653                            $res.emit( $max = val );
654                        }
655                    },
656                    done => { $res.done }
657                }
658            }
659        }
660    }
661
662    method minmax(Supply:D $self: &by = &infix:<cmp>) {
663        my &cmp = &by.arity == 2 ?? &by !! { by($^a) cmp by($^b) }
664        on -> $res {
665            $self => do {
666                my $min;
667                my $max;
668                {
669                    emit => -> \val {
670                        if val.defined {
671                            if !$min.defined {
672                                $res.emit( Range.new($min = val, $max = val) );
673                            }
674                            elsif cmp(val,$min) < 0 {
675                                $res.emit( Range.new( $min = val, $max ) );
676                            }
677                            elsif cmp(val,$max) > 0 {
678                                $res.emit( Range.new( $min, $max = val ) );
679                            }
680                        }
681                    },
682                    done => { $res.done }
683                }
684            }
685        }
686    }
687
688    method reduce(Supply:D $self: &with) {
689        on -> $res {
690            $self => do {
691                my $notfirst;
692                my $reduced;
693                {
694                    emit => -> \val {
695                        $reduced = $notfirst ?? with($reduced,val) !! val;
696                        $res.emit($reduced);
697                        once $notfirst = True;
698                    },
699                    done => { $res.done }
700                }
701            }
702        }
703    }
704
705    method grab(Supply:D $self: &when_done) {
706        on -> $res {
707            $self => do {
708                my @seen;
709                {
710                    emit => -> \val { @seen.push: val },
711                    done => {
712                        $res.emit($_) for when_done(@seen);
713                        $res.done;
714                    }
715                }
716            }
717        }
718    }
719
720    method reverse(Supply:D:)                 { self.grab( {.reverse} ) }
721    method sort(Supply:D: &by = &infix:<cmp>) { self.grab( {.sort(&by)} ) }
722
723    method merge(*@s) {
724        @s.unshift(self) if self.DEFINITE;  # add if instance method
725        return Supply unless +@s;           # nothing to be done
726
727        X::Supply::Combinator.new(
728           combinator => 'merge'
729        ).throw if NOT_ALL_DEFINED_TYPE(@s,Supply);
730
731        return @s[0]  if +@s == 1;          # nothing to be done
732
733        my $dones = 0;
734        on -> $res {
735            @s => {
736                emit => -> \val { $res.emit(val) },
737                done => { $res.done() if ++$dones == +@s }
738            },
739        }
740    }
741
742    method zip(*@s, :&with is copy = &[,]) {
743        @s.unshift(self) if self.DEFINITE;  # add if instance method
744        return Supply unless +@s;           # nothing to be done
745
746        X::Supply::Combinator.new(
747           combinator => 'zip'
748        ).throw if NOT_ALL_DEFINED_TYPE(@s,Supply);
749
750        return @s[0]  if +@s == 1;          # nothing to be done
751
752        my @values = ( [] xx +@s );
753        on -> $res {
754            @s => -> $val, $index {
755                @values[$index].push($val);
756                if all(@values) {
757                    $res.emit( [[&with]] @values>>.shift );
758                }
759            }
760        }
761    }
762
763    method zip-latest(*@s, :&with is copy = &[,], :$initial ) {
764        @s.unshift(self) if self.DEFINITE;  # add if instance method
765        return Supply unless +@s;           # nothing to do.
766
767        X::Supply::Combinator.new(
768           combinator => 'zip-latest'
769        ).throw if NOT_ALL_DEFINED_TYPE(@s,Supply);
770
771        return @s[0] if +@s == 1;           # nothing to do.
772
773        my @values;
774
775        my $uninitialised = +@s; # how many supplies have yet to emit until we
776                                 # can start emitting, too?
777
778        if $initial {
779            @values = @$initial;
780            $uninitialised = 0 max $uninitialised - @$initial;
781        }
782
783        my $dones = 0;
784
785        on -> $res {
786            @s => do {
787                {
788                emit => -> $val, $index {
789                    if $uninitialised > 0 && not @values.EXISTS-POS($index) {
790                        --$uninitialised;
791                    }
792                    @values[$index] = $val;
793                    unless $uninitialised {
794                        $res.emit( [[&with]] @values );
795                    }
796                },
797                done => { $res.done() if ++$dones == +@s }
798                }
799            }
800        }
801    }
802
803    method for(Supply:U: |c) {
804        DEPRECATED('from-list',|<2015.01 2015.09>);
805        SupplyOperations.from-list(|c);
806    }
807    method on_demand(Supply:U: |c)       {
808        DEPRECATED('on-demand',|<2015.03 2015.09>);
809        SupplyOperations.on-demand(|c);
810    }
811    method schedule_on(Supply:D: Scheduler $scheduler) {
812        DEPRECATED('schedule-on',|<2015.03 2015.09>);
813        SupplyOperations.schedule-on(self, $scheduler);
814    }
815    method uniq(Supply:D: |c) {
816        DEPRECATED('unique', |<2014.11 2015.09>);
817        self.unique(|c);
818    }
819}
820
821# The on meta-combinator provides a mechanism for implementing thread-safe
822# combinators on Supplies. It subscribes to a bunch of sources, but will
823# only let one of the specified callbacks to handle their emit/done/quit run
824# at a time. A little bit actor-like.
825my class X::Supply::On::BadSetup is Exception {
826    method message() {
827        "on requires a callable that returns a list of pairs with Supply keys"
828    }
829}
830my class X::Supply::On::NoEmit is Exception {
831    method message() {
832        "on requires that emit be specified for each supply"
833    }
834}
835sub on(&setup) {
836    my class OnSupply does Supply {
837        has &!setup;
838        has Bool $!live = False;
839
840        submethod BUILD(:&!setup) { }
841
842        method !add_source(
843          $source, $lock, $index, :&done is copy, :&quit is copy,
844          :&emit is copy, :&more   # more deprecated, emit must be changeable
845        ) {
846            DEPRECATED('emit => {...}', |<2014.10 2015.09>) if &more;
847            $!live ||= True if $source.live;
848            &emit //= &more // X::Supply::On::NoEmit.new.throw;
849            &done //= { self.done };
850            &quit //= -> $ex { self.quit($ex) };
851
852            my &tap_emit = &emit.arity == 2
853              ?? -> \val {
854                  $lock.protect({ emit(val,$index) });
855                  CATCH { default { self.quit($_) } }
856              }
857              !!  -> \val {
858                  $lock.protect({ emit(val) });
859                  CATCH { default { self.quit($_) } }
860              };
861
862            my &tap_done = &done.arity == 1
863              ?? {
864                  $lock.protect({ done($index) });
865                  CATCH { default { self.quit($_) } }
866              }
867              !! {
868                  $lock.protect({ done() });
869                  CATCH { default { self.quit($_) } }
870              };
871
872            my &tap_quit = &quit.arity == 2
873              ?? -> $ex {
874                  $lock.protect({ quit($ex,$index) });
875                  CATCH { default { self.quit($_) } }
876              }
877              !! -> $ex {
878                  $lock.protect({ quit($ex) });
879                  CATCH { default { self.quit($_) } }
880              };
881
882            $source.tap( &tap_emit, done => &tap_done, quit => &tap_quit );
883        }
884
885        method live { $!live }
886        method tap(|c) {
887            my @to_close;
888            my $sub = self.Supply::tap( |c, closing => {.close for @to_close});
889            my @tappers = &!setup(self);
890            my $lock    = Lock.new;
891
892            sub add ($source, $what, $index?) {
893                unless nqp::istype($source,Supply) {
894                    X::Supply::On::BadSetup.new.throw;
895                }
896                given $what {
897                    when EnumMap {
898                        @to_close.push(self!add_source($source, $lock, $index, |$what));
899                    }
900                    when Callable {
901                        @to_close.push(self!add_source($source, $lock, $index, emit => $what));
902                    }
903                    default {
904                        X::Supply::On::BadSetup.new.throw;
905                    }
906                }
907            }
908
909            for @tappers -> $tap {
910                unless nqp::istype($tap,Pair) {
911                    X::Supply::On::BadSetup.new.throw;
912                }
913                given $tap.key {
914                    when Positional {
915                        my $todo := $tap.value;
916                        for .list.kv -> $index, $supply {
917                            add( $supply, $todo, $index );
918                        }
919                    }
920                    when Supply {
921                        add( $_, $tap.value );
922                    }
923                    default {
924                        X::Supply::On::BadSetup.new.throw;
925                    }
926                }
927            }
928            $sub
929        }
930
931        method emit(\msg) {
932            for self.tappers {
933                .emit().(msg)
934            }
935            Nil;
936        }
937
938        method done() {
939            for self.tappers {
940                if .done -> $l { $l() }
941            }
942            Nil;
943        }
944
945        method quit($ex) {
946            for self.tappers {
947                if .quit -> $t { $t($ex) }
948            }
949            Nil;
950        }
951    }
952
953    OnSupply.new(:&setup)
954}
955
956# vim: ft=perl6 expandtab sw=4
957