Coro で Worker-Thread パターン

増補改訂版 Java言語で学ぶデザインパターン入門 マルチスレッド編 を参考に Coro で Worker-Thread パターンを実装。

Worker-Thread パターンはワーカースレッドが依頼を待ち、依頼が来たら処理する。

#!/usr/bin/perl
use strict;
use warnings;

package Client;
use Coro;
use Coro::Timer;

sub new {
    my ( $class, %args ) = @_;
    bless \%args, $class;
}

sub run {
    my $self = shift;
    my $coro = async {
        do {
            for my $count ( 1 .. $self->{num_of_request} ) {
                print $self->{name}, " request $count\n";
                my $text = $self->{name} . "'s request $count";
                my $request = Request->new( request => $text );
                $self->{channel}->put_request($request);
                Coro::Timer::sleep( int(rand(3)) );
            }
        };
    };
    return $coro;
}

package Worker;
use Coro;

sub new {
    my ( $class, %args ) = @_;
    bless \%args, $class;
}

sub run {
    my $self = shift;
    my $coro = async {
        do {
            while (1) {
                my $request = $self->{channel}->take_request();
                print $self->{name}, qq{ execute "}, $request->get_request_name(), qq{"\n};
                $request->execute();
            }
        };
    };
    return $coro;
}

package Request;
use Coro;
use Coro::Timer;

sub new {
    my ( $class, %args ) = @_;
    my %defaults = ( request => 'request' );
    %args = ( %defaults, %args );
    bless \%args, $class;
}

sub execute {
    my $self = shift;

    Coro::Timer::sleep(1);
}

sub get_request_name {
    my $self = shift;

    return $self->{request};
}

package Channel;
use Coro;
use Coro::Semaphore;
use Coro::Signal;

sub new {
    my ( $class, %args ) = @_;
    my %defaults = ( max_request => 10, num_of_thread => 3 );
    %args = ( %defaults, %args );
    $args{signal}  = Coro::Signal->new;
    $args{queue}   = [];
    $args{workers} = [];
    $args{coros}   = [];

    my $self = bless \%args, $class;
    for my $count ( 1 .. $args{num_of_thread} ) {
        push @{ $args{workers} }, Worker->new( name => "worker-$count", channel => $self );
    }

    return $self;
}

sub start_workers {
    my $self = shift;
    for my $worker ( @{ $self->{workers} } ) {
        push @{ $self->{coros} }, $worker->run();
    }
}

sub stop_workers {
    my $self = shift;
    for my $coro ( @{ $self->{coros} } ) {
        $coro->cancel();
    }
}

sub put_request {
    my $self  = shift;
    my $guard = $self->_get_guard();

    if ( $self->{max_request} <= scalar( @{ $self->{queue} } ) ) {
        warn "no space in queue";
        return;
    }
    push @{ $self->{queue} }, $_[0];
    $self->{signal}->broadcast();
}

sub take_request {
    my $self  = shift;
    my $guard = $self->_get_guard();

    while ( scalar( @{ $self->{queue} } ) == 0 ) {
        $self->{signal}->wait();
    }
    return shift @{ $self->{queue} };
}

sub _get_guard {
    my $self = shift;
    my $name = ( caller 1 )[3];
    $name =~ s/.*:://;
    $name .= '_sem';

    if ( !exists( $self->{$name}) ) {
        $self->{$name} = Coro::Semaphore->new;
    }
    else {
        my $class = ref $self->{$name};
        if ( $class ne 'Coro::Semaphore' ) {
            die "$name is not Coro::Semaphore";
        }
    }

    return $self->{$name}->guard();
}

package main;
use Coro;
use Coro::Timer;

my $channel = Channel->new( max_request => 50, num_of_thread => 2 );
$channel->start_workers();

my $client1 = Client->new( name => 'Alice', num_of_request => 3, channel => $channel );
my $client2 = Client->new( name => 'Bobby', num_of_request => 5, channel => $channel );
my $client3 = Client->new( name => 'Chris', num_of_request => 7, channel => $channel );

my $client1_coro = $client1->run();
my $client2_coro = $client2->run();
my $client3_coro = $client3->run();

$client1_coro->join();
$client2_coro->join();
$client3_coro->join();

Coro::Timer::sleep(2);
$channel->stop_workers();

出力

Alice request 1
Bobby request 1
Chris request 1
worker-1 execute "Alice's request 1"
worker-2 execute "Bobby's request 1"
Bobby request 2
worker-2 execute "Chris's request 1"
worker-1 execute "Bobby's request 2"
Chris request 2
Bobby request 3
Bobby request 4
Alice request 2
worker-2 execute "Chris's request 2"
Chris request 3
worker-1 execute "Bobby's request 3"
Chris request 4
Chris request 5
Chris request 6
Alice request 3
worker-2 execute "Bobby's request 4"
worker-1 execute "Alice's request 2"
Bobby request 5
worker-1 execute "Chris's request 3"
worker-2 execute "Chris's request 4"
Chris request 7
worker-2 execute "Chris's request 5"
worker-1 execute "Chris's request 6"
worker-2 execute "Alice's request 3"
worker-1 execute "Bobby's request 5"
worker-2 execute "Chris's request 7"

pic ファイル

.PS

copy "sequence.pic";

boxwid = 1.3;

# Define the objects
object(CL,":client");
placeholder_object(R);
object(CH,":channel");
object(W,":worker");
step();

# Message sequences
active(CL);
active(CH);
active(W);

create_message(CL,R,":request");

message(W,CH,"take_request");
message(CL,CH,"put_request");
rmessage(CH,CL);
rmessage(CH,W);

message(W,R,"execute");
active(R);
step();
rmessage(R,W,"");
inactive(R);

step();
complete(CL);
complete(R);
complete(CH);
complete(W);

.PE