Coro で Guarded-Suspension パターン (Coro::Signal使用)

増補改訂版 Java言語で学ぶデザインパターン入門 マルチスレッド編 の Guarded-Suspension パターンを Coro で実装。前回は Coro::Channelを使ったが、Coro::Channelを使うとキューが一杯になると、ブロックしてしまう。

use strict;
use warnings;
use Coro;
use Coro::Channel;

my $max = 2;
my $ch  = Coro::Channel->new($max);

my $put_coro = async {
    do {
        for my $count ( 1 .. 10 ) {
            print "send $count\n";
            $ch->put($count);
        }
    };
};

my $get_coro = async {
    do {
        while (1) {
            my $count = $ch->get;
            print "recv $count\n";
        }
    };
};

$put_coro->join();
$get_coro->cancel();

上のスクリプトを実行すると、2 つメッセージを送ると送信側がブロックするのがわかる。

send 1
send 2
recv 1
recv 2
send 3
send 4
recv 3
recv 4
send 5
send 6
recv 5
recv 6
send 7
send 8
recv 7
recv 8
send 9
send 10
recv 9
recv 10

そこで、書籍のとおりにシグナル (この例では Coro::Signal) を使って Guarded-Suspension パターンを実装する。キューが一杯になると破棄している。

use strict;
use warnings;

package Client;
use Coro;
use Coro::Timer;

sub new {
    my ( $class, %args ) = @_;
    bless \%args, $class;
}

sub run {
    my $self = shift;
    my $coro = async {
        do {
            my $queue = $self->{queue};

            my $msg;
            $msg = "message1";
            print "Client: put $msg\n";
            $queue->put($msg);

            $msg = "message2";
            print "Client: put $msg\n";
            $queue->put($msg);

            print "Client: sleep...\n";
            Coro::Timer::sleep(1);

            $msg = "end";
            print "Client: put $msg\n";
            $queue->put($msg);
        };
    };
    return $coro;
}

package Server;
use Coro;

sub new {
    my ( $class, %args ) = @_;
    bless \%args, $class;
}

sub run {
    my $self = shift;
    my $coro = async {
        do {
            my $queue = $self->{queue};
            while (1) {
                my $msg = $queue->get;
                print "Server: get $msg\n";
                last if $msg =~ m/^end$/;
            }
        };
    };

    return $coro;
}

package Queue;
use Coro;
use Coro::Semaphore;
use Coro::Signal;

sub new {
    my ( $class, %args ) = @_;
    my %defaults = ( max => 3 );
    %args = ( %defaults, %args );
    $args{signal}  = Coro::Signal->new;
    $args{queue}   = [];
    $args{put_sem} = Coro::Semaphore->new;
    $args{get_sem} = Coro::Semaphore->new;
    bless \%args, $class;
}

sub put {
    my $self  = shift;
    my $guard = $self->{put_sem}->guard();

    if ($self->{max} <= scalar( @{ $self->{queue} } )) {
        warn "no space in queue";
        return;
    }
    push @{ $self->{queue} }, $_[0];
    $self->{signal}->broadcast();
}

sub get {
    my $self  = shift;
    my $guard = $self->{get_sem}->guard();

    while ( scalar( @{ $self->{queue} } ) == 0 ) {
        $self->{signal}->wait();
    }
    return shift @{ $self->{queue} };
}

package main;
use Coro;

my $queue = Queue->new( max => 5 );
my $server = Server->new( queue => $queue );
my $client = Client->new( queue => $queue );

my $client_coro = $client->run();
my $server_coro = $server->run();

$client_coro->join();
$server_coro->join();

出力

Client: put message1
Client: put message2
Client: sleep...
Server: get message1
Server: get message2
Client: put end
Server: get end