Coro で Worker-Thread パターン
増補改訂版 Java言語で学ぶデザインパターン入門 マルチスレッド編 を参考に Coro で Worker-Thread パターンを実装。
Worker-Thread パターンはワーカースレッドが依頼を待ち、依頼が来たら処理する。
#!/usr/bin/perl use strict; use warnings; package Client; use Coro; use Coro::Timer; sub new { my ( $class, %args ) = @_; bless \%args, $class; } sub run { my $self = shift; my $coro = async { do { for my $count ( 1 .. $self->{num_of_request} ) { print $self->{name}, " request $count\n"; my $text = $self->{name} . "'s request $count"; my $request = Request->new( request => $text ); $self->{channel}->put_request($request); Coro::Timer::sleep( int(rand(3)) ); } }; }; return $coro; } package Worker; use Coro; sub new { my ( $class, %args ) = @_; bless \%args, $class; } sub run { my $self = shift; my $coro = async { do { while (1) { my $request = $self->{channel}->take_request(); print $self->{name}, qq{ execute "}, $request->get_request_name(), qq{"\n}; $request->execute(); } }; }; return $coro; } package Request; use Coro; use Coro::Timer; sub new { my ( $class, %args ) = @_; my %defaults = ( request => 'request' ); %args = ( %defaults, %args ); bless \%args, $class; } sub execute { my $self = shift; Coro::Timer::sleep(1); } sub get_request_name { my $self = shift; return $self->{request}; } package Channel; use Coro; use Coro::Semaphore; use Coro::Signal; sub new { my ( $class, %args ) = @_; my %defaults = ( max_request => 10, num_of_thread => 3 ); %args = ( %defaults, %args ); $args{signal} = Coro::Signal->new; $args{queue} = []; $args{workers} = []; $args{coros} = []; my $self = bless \%args, $class; for my $count ( 1 .. $args{num_of_thread} ) { push @{ $args{workers} }, Worker->new( name => "worker-$count", channel => $self ); } return $self; } sub start_workers { my $self = shift; for my $worker ( @{ $self->{workers} } ) { push @{ $self->{coros} }, $worker->run(); } } sub stop_workers { my $self = shift; for my $coro ( @{ $self->{coros} } ) { $coro->cancel(); } } sub put_request { my $self = shift; my $guard = $self->_get_guard(); if ( $self->{max_request} <= scalar( @{ $self->{queue} } ) ) { warn "no space in queue"; return; } push @{ $self->{queue} }, $_[0]; $self->{signal}->broadcast(); } sub take_request { my $self = shift; my $guard = $self->_get_guard(); while ( scalar( @{ $self->{queue} } ) == 0 ) { $self->{signal}->wait(); } return shift @{ $self->{queue} }; } sub _get_guard { my $self = shift; my $name = ( caller 1 )[3]; $name =~ s/.*:://; $name .= '_sem'; if ( !exists( $self->{$name}) ) { $self->{$name} = Coro::Semaphore->new; } else { my $class = ref $self->{$name}; if ( $class ne 'Coro::Semaphore' ) { die "$name is not Coro::Semaphore"; } } return $self->{$name}->guard(); } package main; use Coro; use Coro::Timer; my $channel = Channel->new( max_request => 50, num_of_thread => 2 ); $channel->start_workers(); my $client1 = Client->new( name => 'Alice', num_of_request => 3, channel => $channel ); my $client2 = Client->new( name => 'Bobby', num_of_request => 5, channel => $channel ); my $client3 = Client->new( name => 'Chris', num_of_request => 7, channel => $channel ); my $client1_coro = $client1->run(); my $client2_coro = $client2->run(); my $client3_coro = $client3->run(); $client1_coro->join(); $client2_coro->join(); $client3_coro->join(); Coro::Timer::sleep(2); $channel->stop_workers();
出力
Alice request 1 Bobby request 1 Chris request 1 worker-1 execute "Alice's request 1" worker-2 execute "Bobby's request 1" Bobby request 2 worker-2 execute "Chris's request 1" worker-1 execute "Bobby's request 2" Chris request 2 Bobby request 3 Bobby request 4 Alice request 2 worker-2 execute "Chris's request 2" Chris request 3 worker-1 execute "Bobby's request 3" Chris request 4 Chris request 5 Chris request 6 Alice request 3 worker-2 execute "Bobby's request 4" worker-1 execute "Alice's request 2" Bobby request 5 worker-1 execute "Chris's request 3" worker-2 execute "Chris's request 4" Chris request 7 worker-2 execute "Chris's request 5" worker-1 execute "Chris's request 6" worker-2 execute "Alice's request 3" worker-1 execute "Bobby's request 5" worker-2 execute "Chris's request 7"
pic ファイル
.PS copy "sequence.pic"; boxwid = 1.3; # Define the objects object(CL,":client"); placeholder_object(R); object(CH,":channel"); object(W,":worker"); step(); # Message sequences active(CL); active(CH); active(W); create_message(CL,R,":request"); message(W,CH,"take_request"); message(CL,CH,"put_request"); rmessage(CH,CL); rmessage(CH,W); message(W,R,"execute"); active(R); step(); rmessage(R,W,""); inactive(R); step(); complete(CL); complete(R); complete(CH); complete(W); .PE
参考
増補改訂版 Java言語で学ぶデザインパターン入門 マルチスレッド編
増補改訂版 Java言語で学ぶデザインパターン入門 マルチスレッド編
- 作者: 結城浩
- 出版社/メーカー: ソフトバンククリエイティブ
- 発売日: 2006/03/21
- メディア: 大型本
- 購入: 15人 クリック: 287回
- この商品を含むブログ (206件) を見る