Coro で Producer-Consumer パターン

増補改訂版 Java言語で学ぶデザインパターン入門 マルチスレッド編 を参考に Coro で Producer-Consumer パターンを実装。

Producer-Consumer パターン は メッセージキューだが、キューが一杯のときはキューに入れようとする待たされる。キューが空のときキューを読もうとすると待たされる。


#!/usr/bin/perl
use strict;
use warnings;

package Maker;
use Coro;
use Coro::Timer;

sub new {
    my ( $class, %args ) = @_;
    bless \%args, $class;
}

sub run {
    my $self = shift;
    my $coro = async {
        do {
            my $queue = $self->{queue};

            my $msg;
            $msg = "message1";
            print "Maker: put $msg\n";
            $queue->put($msg);

            $msg = "message2";
            print "Maker: put $msg\n";
            $queue->put($msg);

            $msg = "message3";
            print "Maker: put $msg\n";
            $queue->put($msg);

            $msg = "message4";
            print "Maker: put $msg\n";
            $queue->put($msg);

            $msg = "end";
            print "Maker: put $msg\n";
            $queue->put($msg);
        };
    };
}

package Eater;
use Coro;

sub new {
    my ( $class, %args ) = @_;
    bless \%args, $class;
}

sub run {
    my $self = shift;
    my $coro = async {
        do {
            my $queue = $self->{queue};
            while (1) {
                my $msg = $queue->take;
                print "Eater: take $msg\n";
                last if $msg =~ m/^end$/;
            }
        };
    };

    return $coro;
}

package Queue;
use Coro;
use Coro::Semaphore;
use Coro::Signal;

sub new {
    my ( $class, %args ) = @_;
    my %defaults = ( max => 3 );
    %args = ( %defaults, %args );
    $args{signal}   = Coro::Signal->new;
    $args{queue}    = [];
    $args{put_sem}  = Coro::Semaphore->new;
    $args{take_sem} = Coro::Semaphore->new;
    bless \%args, $class;
}

sub put {
    my $self  = shift;
    my $guard = $self->{put_sem}->guard();

    while ( $self->{max} <= scalar( @{ $self->{queue} } ) ) {
        print qq{no space on the table. waiting for "put"\n};
        $self->{signal}->wait();
    }
    push @{ $self->{queue} }, $_[0];
    $self->{signal}->broadcast();
}

sub take {
    my $self  = shift;
    my $guard = $self->{take_sem}->guard();

    while ( scalar( @{ $self->{queue} } ) == 0 ) {
        print qq{no cake on the table. waiting for "put"\n};
        $self->{signal}->wait();
    }
    my $ret = shift @{ $self->{queue} };
    $self->{signal}->broadcast();
    return $ret;
}

package main;
use Coro;

my $queue = Queue->new( max   => 3 );
my $eater = Eater->new( queue => $queue );
my $maker = Maker->new( queue => $queue );

my $maker_coro = $maker->run();
my $eater_coro = $eater->run();

$maker_coro->join();
$eater_coro->join();

出力

Maker: put message1
Maker: put message2
Maker: put message3
Maker: put message4
no space on the table. waiting for "put"
Eater: take message1
Eater: take message2
Eater: take message3
no cake on the table. waiting for "put"
Maker: put end
Eater: take message4
Eater: take end

pic ファイル

.PS

copy "sequence.pic";

boxwid = 1.3;

# Define the objects
object(M,":maker");
object(T,":table");
object(E,":eater");
step();

# Message sequences
active(M);
active(E);

message(M,T,"put");
active(T);
rmessage(T,M,"");
inactive(T);

message(M,T,"put");
active(T);
rmessage(T,M,"");
inactive(T);

message(M,T,"put");
active(T);
rmessage(T,M,"");
inactive(T);

message(M,T,"put");
active(T);
comment(T,T2,up 0.5 right 0.2,wid 1.5 ht 0.5 "no space on the table." "waiting for \"take\".");
step();
message(E,T,"take");
active(T);
rmessage(T,E,"");
inactive(T);
rmessage(T,M,"");
inactive(T);

message(E,T,"take");
active(T);
rmessage(T,E,"");
inactive(T);

message(E,T,"take");
active(T);
rmessage(T,E,"");
inactive(T);

message(E,T,"take");
active(T);
rmessage(T,E,"");
inactive(T);

message(E,T,"take");
active(T);
comment(T,T2,up 0.5 left 0.2,wid 1.5 ht 0.5 "no cake on the table." "waiting for \"put\".");
step();
message(M,T,"put");
active(T);
rmessage(T,M,"");
inactive(T);
rmessage(T,E,"");
inactive(T);

complete(M);
complete(T);
complete(E);

.PE