1# Anything that can be subscribed to does this role. It provides the basic 2# supply management infrastructure, as well as various coercions that 3# turn Supply-like things into something else and convenience forms of calls 4# to SupplyOperations. 5 6my class SupplyOperations is repr('Uninstantiable') { ... } 7my class X::Supply::Combinator is Exception { 8 has $.combinator; 9 method message() { "Can only use $!combinator to combine defined Supply objects" } 10} 11 12my class Tap { 13 has &.emit; 14 has &.done; 15 has &.quit; 16 has &.closing; 17 has $.supply; 18 19 method close (Tap:D:) { $!supply.close(self) } 20} 21 22my role Supply { 23 has $!tappers_lock = Lock.new; 24 has @!tappers; 25 has $!been_tapped; 26 has @!paused; 27 28 method tap(Supply:D: &emit = -> $ { }, :&done,:&quit={die $_},:&closing) { 29 my $tap = Tap.new(:&emit, :&done, :&quit, :&closing, :supply(self)); 30 $!tappers_lock.protect({ 31 @!tappers.push($tap); 32 if @!paused -> \todo { 33 $tap.emit().($_) for todo; 34 @!paused = (); 35 } 36 $!been_tapped = True; 37 }); 38 $tap 39 } 40 41 proto method close(|) { * } 42 multi method close(Supply:D:) { self.close($_) for self.tappers } 43 multi method close(Supply:D: Tap $t) { 44 my $found; 45 $!tappers_lock.protect({ 46 @!tappers .= grep( { $_ === $t ?? !($found = True) !! True } ); 47 }); 48 if $t.closing -> &closing { 49 closing(); 50 } 51 $found // False; 52 } 53 54 method tappers(Supply:D:) { 55 # Shallow clone to provide safe snapshot. 56 my @tappers; 57 $!tappers_lock.protect({ @tappers = @!tappers }); 58 @tappers 59 } 60 61 method emit(Supply:D: \msg) { 62 if self.tappers -> \tappers { 63 .emit().(msg) for tappers; 64 } 65 elsif !$!been_tapped { 66 $!tappers_lock.protect({ @!paused.push: msg }); 67 } 68 Nil; 69 } 70 71 method more(Supply:D: \msg) { 72 DEPRECATED('emit', |<2014.10 2015.09>); 73 self.emit(msg); 74 } 75 76 method done(Supply:D:) { 77 for self.tappers -> $t { 78 my $l = $t.done(); 79 $l() if $l; 80 } 81 Nil; 82 } 83 84 method quit(Supply:D: $ex) { 85 for self.tappers -> $t { 86 my $f = $t.quit(); 87 $f($ex) if $f; 88 } 89 Nil; 90 } 91 92 method taps(Supply:D:) { +@!tappers } 93 method live(Supply:D:) { True }; 94 95 method Supply(Supply:) { self } 96 method Channel(Supply:D:) { 97 my $c = Channel.new(); 98 self.tap( -> \val { $c.send(val) }, 99 done => { $c.close }, 100 quit => -> $ex { $c.quit($ex) }); 101 $c 102 } 103 104 method Promise(Supply:D:) { 105 my $l = Lock.new; 106 my $p = Promise.new; 107 my $v = $p.vow; 108 my $t = self.tap( 109 -> \val { 110 $l.protect( { 111 if $p.status == Planned { 112 $v.keep(val); 113 $t.close() 114 } 115 } ); 116 }, 117 done => { $v.break("No value received") }, 118 quit => -> \ex { 119 $l.protect( { 120 if $p.status == Planned { 121 $v.break(ex); 122 $t.close() 123 } 124 } ); 125 }, 126 ); 127 $p 128 } 129 130 method wait(Supply:D:) { 131 my $l = Lock.new; 132 my $p = Promise.new; 133 my $t = self.tap( -> \val {}, 134 done => { 135 $l.protect( { 136 if $p.status == Planned { 137 $p.keep(True); 138 $t.close() 139 } 140 } ); 141 }, 142 quit => -> \ex { 143 $l.protect( { 144 if $p.status == Planned { 145 $p.break(ex); 146 $t.close() 147 } 148 } ); 149 }, 150 ); 151 $p.result 152 } 153 154 method list(Supply:D:) { 155 # Use a Channel to handle any asynchrony. 156 self.Channel.list; 157 } 158 159 method on-demand(Supply:U: |c) { SupplyOperations.on-demand(|c) } 160 method from-list(Supply:U: |c) { SupplyOperations.from-list(|c) } 161 method interval(Supply:U: |c) { SupplyOperations.interval(|c) } 162 method flat(Supply:D: ) { SupplyOperations.flat(self) } 163 method grep(Supply:D: Mu $test) { SupplyOperations.grep(self, $test) } 164 method map(Supply:D: &mapper) { SupplyOperations.map(self, &mapper) } 165 method schedule-on(Supply:D: Scheduler $scheduler) { 166 SupplyOperations.schedule-on(self, $scheduler); 167 } 168 method start(Supply:D: &startee) { SupplyOperations.start(self, &startee) } 169 method stable(Supply:D: $time, :$scheduler = $*SCHEDULER) { 170 SupplyOperations.stable(self, $time, :$scheduler); 171 } 172 method delay(Supply:D: $time, :$scheduler = $*SCHEDULER) { 173 DEPRECATED('delayed', '2015.02', '2015.09'); 174 SupplyOperations.delayed(self, $time, :$scheduler); 175 } 176 method delayed(Supply:D: $time, :$scheduler = $*SCHEDULER) { 177 SupplyOperations.delayed(self, $time, :$scheduler) 178 } 179 method migrate(Supply:D: ) { SupplyOperations.migrate(self) } 180 181 multi method classify(Supply:D: &mapper ) { 182 SupplyOperations.classify(self, &mapper); 183 } 184 multi method classify(Supply:D: %mapper ) { 185 SupplyOperations.classify(self, { %mapper{$^a} }); 186 } 187 multi method classify(Supply:D: @mapper ) { 188 SupplyOperations.classify(self, { @mapper[$^a] }); 189 } 190 191 proto method categorize (|) { * } 192 multi method categorize(Supply:D: &mapper ) { 193 SupplyOperations.classify(self, &mapper, :multi); 194 } 195 multi method categorize(Supply:D: %mapper ) { 196 SupplyOperations.classify(self, { %mapper{$^a} }, :multi); 197 } 198 multi method categorize(Supply:D: @mapper ) { 199 SupplyOperations.classify(self, { @mapper[$^a] }, :multi); 200 } 201 202 method act(Supply:D: &actor) { 203 self.do(&actor).tap(|%_) # need "do" for serializing callbacks 204 } 205 206 method do(Supply:D $self: &side_effect) { 207 on -> $res { 208 $self => -> \val { side_effect(val); $res.emit(val) } 209 } 210 } 211 212 method unique(Supply:D $self: :&as, :&with, :$expires) { 213 on -> $res { 214 $self => do { 215 if $expires { 216 if &with and &with !=== &[===] { 217 my @seen; # really Mu, but doesn't work in settings 218 my Mu $target; 219 &as 220 ?? -> \val { 221 my $now := now; 222 $target = &as(val); 223 my $index = 224 @seen.first-index({&with($target,$_[0])}); 225 if $index.defined { 226 if $now > @seen[$index][1] { # expired 227 @seen[$index][1] = $now+$expires; 228 $res.emit(val); 229 } 230 } 231 else { 232 @seen.push: [$target, $now+$expires]; 233 $res.emit(val); 234 } 235 } 236 !! -> \val { 237 my $now := now; 238 my $index = 239 @seen.first-index({&with(val,$_[0])}); 240 if $index.defined { 241 if $now > @seen[$index][1] { # expired 242 @seen[$index][1] = $now+$expires; 243 $res.emit(val); 244 } 245 } 246 else { 247 @seen.push: [val, $now+$expires]; 248 $res.emit(val); 249 } 250 }; 251 } 252 else { 253 my $seen := nqp::hash(); 254 my str $target; 255 &as 256 ?? -> \val { 257 my $now := now; 258 $target = nqp::unbox_s(&as(val).WHICH); 259 if !nqp::existskey($seen,$target) || 260 $now > nqp::atkey($seen,$target) { #expired 261 $res.emit(val); 262 nqp::bindkey($seen,$target,$now+$expires); 263 } 264 } 265 !! -> \val { 266 my $now := now; 267 $target = nqp::unbox_s(val.WHICH); 268 if !nqp::existskey($seen,$target) || 269 $now > nqp::atkey($seen,$target) { #expired 270 $res.emit(val); 271 nqp::bindkey($seen,$target,$now+$expires); 272 } 273 }; 274 } 275 } 276 else { # !$!expires 277 if &with and &with !=== &[===] { 278 my @seen; # really Mu, but doesn't work in settings 279 my Mu $target; 280 &as 281 ?? -> \val { 282 $target = &as(val); 283 if @seen.first({ &with($target,$_) } ) =:= Nil { 284 @seen.push($target); 285 $res.emit(val); 286 } 287 } 288 !! -> \val { 289 if @seen.first({ &with(val,$_) } ) =:= Nil { 290 @seen.push(val); 291 $res.emit(val); 292 } 293 }; 294 } 295 else { 296 my $seen := nqp::hash(); 297 my str $target; 298 &as 299 ?? -> \val { 300 $target = nqp::unbox_s(&as(val).WHICH); 301 unless nqp::existskey($seen, $target) { 302 nqp::bindkey($seen, $target, 1); 303 $res.emit(val); 304 } 305 } 306 !! -> \val { 307 $target = nqp::unbox_s(val.WHICH); 308 unless nqp::existskey($seen, $target) { 309 nqp::bindkey($seen, $target, 1); 310 $res.emit(val); 311 } 312 }; 313 } 314 } 315 } 316 } 317 } 318 319 method squish(Supply:D $self: :&as, :&with is copy) { 320 &with //= &[===]; 321 on -> $res { 322 my @secret; 323 $self => do { 324 my Mu $last = @secret; 325 my Mu $target; 326 &as 327 ?? -> \val { 328 $target = &as(val); 329 unless &with($target,$last) { 330 $last = $target; 331 $res.emit(val); 332 } 333 } 334 !! -> \val { 335 unless &with(val,$last) { 336 $last = val; 337 $res.emit(val); 338 } 339 }; 340 } 341 } 342 } 343 344 proto method rotor(|) {*} 345 multi method rotor(Supply:D:) { 346 DEPRECATED('.rotor( $elems => -$gap )',|<2015.04 2015.09>); 347 self.rotor( (2 => -1) ); 348 } 349 multi method rotor(Supply:D $self: *@cycle, :$partial) { 350 my @c := @cycle.infinite ?? @cycle !! @cycle xx *; 351 352 on -> $res { 353 $self => do { 354 my Int $elems; 355 my Int $gap; 356 my int $to-skip; 357 my int $skip; 358 sub next-batch() { 359 given @c.shift { 360 when Pair { 361 $elems = +.key; 362 $gap = +.value; 363 $to-skip = $gap > 0 ?? $gap !! 0; 364 } 365 default { 366 $elems = +$_; 367 $gap = 0; 368 $to-skip = 0; 369 } 370 } 371 } 372 next-batch; 373 374 my @batched; 375 sub flush() { 376 $res.emit( [@batched] ); 377 @batched.splice( 0, +@batched + $gap ); 378 $skip = $to-skip; 379 } 380 381 { 382 emit => -> \val { 383 @batched.push: val unless $skip && $skip--; 384 if @batched.elems == $elems { 385 flush; 386 next-batch; 387 } 388 }, 389 done => { 390 flush if @batched and $partial; 391 $res.done; 392 } 393 } 394 } 395 } 396 } 397 398 method batch(Supply:D $self: :$elems, :$seconds ) { 399 400 return $self if (!$elems or $elems == 1) and !$seconds; # nothing to do 401 402 on -> $res { 403 $self => do { 404 my @batched; 405 my $last_time; 406 sub flush { 407 $res.emit([@batched]); 408 @batched = (); 409 } 410 411 { 412 emit => do { 413 if $seconds { 414 $last_time = time div $seconds; 415 416 $elems # and $seconds 417 ?? -> \val { 418 my $this_time = time div $seconds; 419 if $this_time != $last_time { 420 flush if @batched; 421 $last_time = $this_time; 422 @batched.push: val; 423 } 424 else { 425 @batched.push: val; 426 flush if @batched.elems == $elems; 427 } 428 } 429 !! -> \val { 430 my $this_time = time div $seconds; 431 if $this_time != $last_time { 432 flush if @batched; 433 $last_time = $this_time; 434 } 435 @batched.push: val; 436 } 437 } 438 else { # just $elems 439 -> \val { 440 @batched.push: val; 441 flush if @batched.elems == $elems; 442 } 443 } 444 }, 445 done => { 446 flush if @batched; 447 $res.done; 448 } 449 } 450 } 451 } 452 } 453 454 method lines(Supply:D $self: :$chomp = True ) { 455 456 on -> $res { 457 $self => do { 458 my str $str; 459 my int $chars; 460 my int $left; 461 my int $pos; 462 my int $nextpos; 463 my int $found; 464 my int $cr; 465 my int $crlf; 466 467 { 468 emit => -> \val { 469 $str = $str ~ nqp::unbox_s(val); 470 $chars = nqp::chars($str); 471 $pos = 0; 472 473 while ($left = $chars - $pos) > 0 { 474 $nextpos = nqp::findcclass( 475 nqp::const::CCLASS_NEWLINE, $str, $pos, $left 476 ); 477 478 # no trailing line delimiter, so go buffer 479 last unless nqp::iscclass( 480 nqp::const::CCLASS_NEWLINE, $str, $nextpos 481 ); 482 483 # potentially broken CRLF, so go buffer 484 $cr = nqp::ordat($str, $nextpos) == 13; # CR 485 last if $cr == 1 and $nextpos + 1 == $chars; 486 487 $crlf = $cr 488 && nqp::ordat($str, $nextpos + 1) == 10; # LF 489 490 if $chomp { 491 $res.emit( ($found = $nextpos - $pos) 492 ?? nqp::box_s( 493 nqp::substr($str, $pos, $found), Str) 494 !! '' 495 ); 496 $pos = $nextpos + 1 + $crlf; 497 } 498 else { 499 $found = $nextpos - $pos + 1 + $crlf; 500 $res.emit( nqp::box_s( 501 nqp::substr($str, $pos, $found), Str) 502 ); 503 $pos = $pos + $found; 504 } 505 } 506 $str = $pos < $chars 507 ?? nqp::substr($str,$pos) 508 !! ''; 509 }, 510 done => { 511 if $str { 512 $chars = nqp::chars($str); 513 $res.emit( $chomp 514 && nqp::ordat($str, $chars - 1) == 13 # CR 515 ?? nqp::box_s(nqp::substr($str,0,$chars - 1),Str) 516 !! nqp::box_s($str, Str) 517 ); 518 } 519 $res.done; 520 } 521 } 522 } 523 } 524 } 525 526 method words(Supply:D $self:) { 527 528 on -> $res { 529 $self => do { 530 my str $str; 531 my int $chars; 532 my int $left; 533 my int $pos; 534 my int $nextpos; 535 my int $found; 536 my int $cr; 537 my int $crlf; 538 539 { 540 emit => -> \val { 541 $str = $str ~ nqp::unbox_s(val); 542 $chars = nqp::chars($str); 543 $pos = nqp::findnotcclass( 544 nqp::const::CCLASS_WHITESPACE, $str, 0, $chars); 545 546 while ($left = $chars - $pos) > 0 { 547 $nextpos = nqp::findcclass( 548 nqp::const::CCLASS_WHITESPACE, $str, $pos, $left 549 ); 550 551 last unless $left = $chars - $nextpos; # broken word 552 553 $res.emit( nqp::box_s( 554 nqp::substr( $str, $pos, $nextpos - $pos ), Str) 555 ); 556 557 $pos = nqp::findnotcclass( 558 nqp::const::CCLASS_WHITESPACE,$str,$nextpos,$left); 559 } 560 $str = $pos < $chars 561 ?? nqp::substr($str,$pos) 562 !! ''; 563 }, 564 done => { 565 $res.emit( nqp::box_s($str, Str) ) if $str; 566 $res.done; 567 } 568 } 569 } 570 } 571 } 572 573 method elems(Supply:D $self: $seconds? ) { 574 575 on -> $res { 576 $self => do { 577 my $elems = 0; 578 my $last_time; 579 my $last_elems; 580 581 { 582 emit => do { 583 if $seconds { 584 $last_time = time div $seconds; 585 $last_elems = $elems; 586 -> \val { 587 $last_elems = ++$elems; 588 my $this_time = time div $seconds; 589 if $this_time != $last_time { 590 $res.emit($elems); 591 $last_time = $this_time; 592 } 593 } 594 } 595 else { 596 -> \val { $res.emit(++$elems) } 597 } 598 }, 599 done => { 600 $res.emit($elems) if $seconds and $elems != $last_elems; 601 $res.done; 602 } 603 } 604 } 605 } 606 } 607 608 method last(Supply:D $self: Int $number = 1) { # should be Natural 609 on -> $res { 610 $self => do { 611 my @seen; 612 { 613 emit => $number == 1 614 ?? -> \val { @seen[0] = val } 615 !! -> \val { 616 @seen.shift if +@seen == $number; 617 @seen.push: val; 618 }, 619 done => { 620 $res.emit($_) for @seen; 621 $res.done; 622 } 623 } 624 } 625 } 626 } 627 628 method min(Supply:D $self: &by = &infix:<cmp>) { 629 my &cmp = &by.arity == 2 ?? &by !! { by($^a) cmp by($^b) } 630 on -> $res { 631 $self => do { 632 my $min; 633 { 634 emit => -> \val { 635 if val.defined and !$min.defined || cmp(val,$min) < 0 { 636 $res.emit( $min = val ); 637 } 638 }, 639 done => { $res.done } 640 } 641 } 642 } 643 } 644 645 method max(Supply:D $self: &by = &infix:<cmp>) { 646 my &cmp = &by.arity == 2 ?? &by !! { by($^a) cmp by($^b) } 647 on -> $res { 648 $self => do { 649 my $max; 650 { 651 emit => -> \val { 652 if val.defined and !$max.defined || cmp(val,$max) > 0 { 653 $res.emit( $max = val ); 654 } 655 }, 656 done => { $res.done } 657 } 658 } 659 } 660 } 661 662 method minmax(Supply:D $self: &by = &infix:<cmp>) { 663 my &cmp = &by.arity == 2 ?? &by !! { by($^a) cmp by($^b) } 664 on -> $res { 665 $self => do { 666 my $min; 667 my $max; 668 { 669 emit => -> \val { 670 if val.defined { 671 if !$min.defined { 672 $res.emit( Range.new($min = val, $max = val) ); 673 } 674 elsif cmp(val,$min) < 0 { 675 $res.emit( Range.new( $min = val, $max ) ); 676 } 677 elsif cmp(val,$max) > 0 { 678 $res.emit( Range.new( $min, $max = val ) ); 679 } 680 } 681 }, 682 done => { $res.done } 683 } 684 } 685 } 686 } 687 688 method reduce(Supply:D $self: &with) { 689 on -> $res { 690 $self => do { 691 my $notfirst; 692 my $reduced; 693 { 694 emit => -> \val { 695 $reduced = $notfirst ?? with($reduced,val) !! val; 696 $res.emit($reduced); 697 once $notfirst = True; 698 }, 699 done => { $res.done } 700 } 701 } 702 } 703 } 704 705 method grab(Supply:D $self: &when_done) { 706 on -> $res { 707 $self => do { 708 my @seen; 709 { 710 emit => -> \val { @seen.push: val }, 711 done => { 712 $res.emit($_) for when_done(@seen); 713 $res.done; 714 } 715 } 716 } 717 } 718 } 719 720 method reverse(Supply:D:) { self.grab( {.reverse} ) } 721 method sort(Supply:D: &by = &infix:<cmp>) { self.grab( {.sort(&by)} ) } 722 723 method merge(*@s) { 724 @s.unshift(self) if self.DEFINITE; # add if instance method 725 return Supply unless +@s; # nothing to be done 726 727 X::Supply::Combinator.new( 728 combinator => 'merge' 729 ).throw if NOT_ALL_DEFINED_TYPE(@s,Supply); 730 731 return @s[0] if +@s == 1; # nothing to be done 732 733 my $dones = 0; 734 on -> $res { 735 @s => { 736 emit => -> \val { $res.emit(val) }, 737 done => { $res.done() if ++$dones == +@s } 738 }, 739 } 740 } 741 742 method zip(*@s, :&with is copy = &[,]) { 743 @s.unshift(self) if self.DEFINITE; # add if instance method 744 return Supply unless +@s; # nothing to be done 745 746 X::Supply::Combinator.new( 747 combinator => 'zip' 748 ).throw if NOT_ALL_DEFINED_TYPE(@s,Supply); 749 750 return @s[0] if +@s == 1; # nothing to be done 751 752 my @values = ( [] xx +@s ); 753 on -> $res { 754 @s => -> $val, $index { 755 @values[$index].push($val); 756 if all(@values) { 757 $res.emit( [[&with]] @values>>.shift ); 758 } 759 } 760 } 761 } 762 763 method zip-latest(*@s, :&with is copy = &[,], :$initial ) { 764 @s.unshift(self) if self.DEFINITE; # add if instance method 765 return Supply unless +@s; # nothing to do. 766 767 X::Supply::Combinator.new( 768 combinator => 'zip-latest' 769 ).throw if NOT_ALL_DEFINED_TYPE(@s,Supply); 770 771 return @s[0] if +@s == 1; # nothing to do. 772 773 my @values; 774 775 my $uninitialised = +@s; # how many supplies have yet to emit until we 776 # can start emitting, too? 777 778 if $initial { 779 @values = @$initial; 780 $uninitialised = 0 max $uninitialised - @$initial; 781 } 782 783 my $dones = 0; 784 785 on -> $res { 786 @s => do { 787 { 788 emit => -> $val, $index { 789 if $uninitialised > 0 && not @values.EXISTS-POS($index) { 790 --$uninitialised; 791 } 792 @values[$index] = $val; 793 unless $uninitialised { 794 $res.emit( [[&with]] @values ); 795 } 796 }, 797 done => { $res.done() if ++$dones == +@s } 798 } 799 } 800 } 801 } 802 803 method for(Supply:U: |c) { 804 DEPRECATED('from-list',|<2015.01 2015.09>); 805 SupplyOperations.from-list(|c); 806 } 807 method on_demand(Supply:U: |c) { 808 DEPRECATED('on-demand',|<2015.03 2015.09>); 809 SupplyOperations.on-demand(|c); 810 } 811 method schedule_on(Supply:D: Scheduler $scheduler) { 812 DEPRECATED('schedule-on',|<2015.03 2015.09>); 813 SupplyOperations.schedule-on(self, $scheduler); 814 } 815 method uniq(Supply:D: |c) { 816 DEPRECATED('unique', |<2014.11 2015.09>); 817 self.unique(|c); 818 } 819} 820 821# The on meta-combinator provides a mechanism for implementing thread-safe 822# combinators on Supplies. It subscribes to a bunch of sources, but will 823# only let one of the specified callbacks to handle their emit/done/quit run 824# at a time. A little bit actor-like. 825my class X::Supply::On::BadSetup is Exception { 826 method message() { 827 "on requires a callable that returns a list of pairs with Supply keys" 828 } 829} 830my class X::Supply::On::NoEmit is Exception { 831 method message() { 832 "on requires that emit be specified for each supply" 833 } 834} 835sub on(&setup) { 836 my class OnSupply does Supply { 837 has &!setup; 838 has Bool $!live = False; 839 840 submethod BUILD(:&!setup) { } 841 842 method !add_source( 843 $source, $lock, $index, :&done is copy, :&quit is copy, 844 :&emit is copy, :&more # more deprecated, emit must be changeable 845 ) { 846 DEPRECATED('emit => {...}', |<2014.10 2015.09>) if &more; 847 $!live ||= True if $source.live; 848 &emit //= &more // X::Supply::On::NoEmit.new.throw; 849 &done //= { self.done }; 850 &quit //= -> $ex { self.quit($ex) }; 851 852 my &tap_emit = &emit.arity == 2 853 ?? -> \val { 854 $lock.protect({ emit(val,$index) }); 855 CATCH { default { self.quit($_) } } 856 } 857 !! -> \val { 858 $lock.protect({ emit(val) }); 859 CATCH { default { self.quit($_) } } 860 }; 861 862 my &tap_done = &done.arity == 1 863 ?? { 864 $lock.protect({ done($index) }); 865 CATCH { default { self.quit($_) } } 866 } 867 !! { 868 $lock.protect({ done() }); 869 CATCH { default { self.quit($_) } } 870 }; 871 872 my &tap_quit = &quit.arity == 2 873 ?? -> $ex { 874 $lock.protect({ quit($ex,$index) }); 875 CATCH { default { self.quit($_) } } 876 } 877 !! -> $ex { 878 $lock.protect({ quit($ex) }); 879 CATCH { default { self.quit($_) } } 880 }; 881 882 $source.tap( &tap_emit, done => &tap_done, quit => &tap_quit ); 883 } 884 885 method live { $!live } 886 method tap(|c) { 887 my @to_close; 888 my $sub = self.Supply::tap( |c, closing => {.close for @to_close}); 889 my @tappers = &!setup(self); 890 my $lock = Lock.new; 891 892 sub add ($source, $what, $index?) { 893 unless nqp::istype($source,Supply) { 894 X::Supply::On::BadSetup.new.throw; 895 } 896 given $what { 897 when EnumMap { 898 @to_close.push(self!add_source($source, $lock, $index, |$what)); 899 } 900 when Callable { 901 @to_close.push(self!add_source($source, $lock, $index, emit => $what)); 902 } 903 default { 904 X::Supply::On::BadSetup.new.throw; 905 } 906 } 907 } 908 909 for @tappers -> $tap { 910 unless nqp::istype($tap,Pair) { 911 X::Supply::On::BadSetup.new.throw; 912 } 913 given $tap.key { 914 when Positional { 915 my $todo := $tap.value; 916 for .list.kv -> $index, $supply { 917 add( $supply, $todo, $index ); 918 } 919 } 920 when Supply { 921 add( $_, $tap.value ); 922 } 923 default { 924 X::Supply::On::BadSetup.new.throw; 925 } 926 } 927 } 928 $sub 929 } 930 931 method emit(\msg) { 932 for self.tappers { 933 .emit().(msg) 934 } 935 Nil; 936 } 937 938 method done() { 939 for self.tappers { 940 if .done -> $l { $l() } 941 } 942 Nil; 943 } 944 945 method quit($ex) { 946 for self.tappers { 947 if .quit -> $t { $t($ex) } 948 } 949 Nil; 950 } 951 } 952 953 OnSupply.new(:&setup) 954} 955 956# vim: ft=perl6 expandtab sw=4 957