上一篇日志说过了,内置的 Thread 模块无法解决线程排队、变量共享的问题,但是在使用多线程的过程中,这两个又是比较常见的需求。线程数无法控制,处理速度和内存消耗两者无法平衡;变量无法共享,无法让线程向同一个数据结构传递返回值。但是 Perl 有庞大的第三方库,CPAN 上有一个备受匿名和尚们推崇的模块能够很好地解决这些问题,这个模块就是传说中的 Parallel::ForkManager 。严格意义上说这个模块是通过 Fork 进程而不是创建线程来实现并行处理的,不过在实际应用过程中,这两者其实没什么区别。

Perl多线程

Parallel::ForkManager 代码也只有寥寥的200多行,内容并不艰深。

对于线程排队,ForkManager 只是在 fork 进程的方法里添加了一个判断,如果当前子进程数量>=事先定义的数量,就调用等待方法。

while ($s->{max_proc} && ( keys %{ $s->{processes} } ) >= $s->{max_proc}) {
    $s->on_wait;
    $s->wait_one_child(defined $s->{on_wait_period} ? &WNOHANG : undef);
  };

对于变量共享, 它利用 Storable 模块 将子线程的数据结构序列化之后转换成文本,然后主线程再反其道行之将文本读取到内存,这样就绕开了复杂的 variable lock,当然,因为涉及到磁盘IO,性能也会打折扣。这个实现方式也有很大的局限,严格说来并不算是所谓的变量共享,我在最后会讨论这个问题。

if ( $s->{in_child} ) {
    if (defined($r)) {  # store the child's data structure
      my $storable_tempfile = File::Spec->catfile($s->{tempdir}, 'Parallel-ForkManager-' . $s->{parent_pid} . '-' . $$ . '.txt');
      my $stored = eval { return &store($r, $storable_tempfile); };

      # handle Storables errors, IE logcarp or carp returning undef, or die (via logcroak or croak)
      if (not $stored or $@) {
        warn(qq|The storable module was unable to store the child's data structure to the temp file "$storable_tempfile":  | . join(', ', $@));
      }
    }
    CORE::exit($x || 0);
  }

下面是一个简单的示例:

use strict;
use warnings;
use Parallel::ForkManager;


# 指定进程峰值为100
my $pm = Parallel::ForkManager->new(100);
# 所有子进程返回的数据结构都会传回到这个hash里
my %persons;

# 进程结束时运行的callback,将子进程返回的数据结构传入%persons,
# 这是“共享数据结构”的关键
$pm -> run_on_finish( sub {
        my $person_info = pop @_;
        while ( my ($key,$value) = each %$person_info ) {
            $persons{$key} = $value;
        }
    });

for my $email ( @emails ) {
    # 创建进程
    $pm->start and next;
    # 实际需要运行的代码
    my $data = get_person_info($email);
    
    # 结束进程并返回$data。
    # $data是一个array ref,其中有进程id,进程退出码等信息,
    # 其中最后一个item是子进程返回的数据结构。
    $pm->finish(0, $data);
}

$pm->wait_all_children;

# 伪sub
sub get_person_info {
    my $email = shift @_;
    # bla bla bla
    return \%person_info;
}

上面代码的关键在于在创建PM对象的时候设定进程最大值,以及设定 run_on_finish 的 callback 。之前提到利用 run_on_finish 有一定局限,如果我希望在子线程的代码中加入条件判断,将不同类别的数据输出到两个不同的共享hash就不怎么好处理了。此外,如果子进程需要实时从共享hash 中读取数据,ForkManager 是无法做到的。如果不想自己写代码,可以利用另外一个CPAN模块,下一篇再说。