高性能计算-MPI并行程序开发实战(四)

高性能计算-MPI并行程序开发实战(四)

前面的三个部分只介绍了MPI的基本通信语句和相关知识。接下来将使用MPI实现更多的功能。

MPI实现计时功能

在MPI程序中,经常会用到时间函数,如用来统计程序运行的时间,或根据时间的不同选取不同的随机数种子,或根据时间的不同对程序的执行进行控制等。常见的时间函数有gettimeofday等函数。在MPI中,也有MPI专用的时间函数调用方法,即我们介绍的第7个MPI调用接口:

  • MPI_WTIME()
  • double MPI_Wtime(void)

MPI_WTIME()返回一个用浮点数表示的秒数,它表示从过去某一时刻到调用时刻所经历的时间。这样如果需要对代码中特定的部分进行计时,一般采取的方式是:

double startTime, endTime;
...
startTime = MPI_WTime();
//需要计时的部分
endTime = MPI_WTime();
fprintf(stdout, "Blocks took %f seconds.\n", endTime-startTime);

当我们需要得知当前时间的精度时,我们可以使用MPI为我们提供的第8个MPI调用接口:

  • MPI_WTICK()
  • double MPI_Wtick()

MPI_WTICK返回MPI_WTIME的精度,单位是秒,可以认为是一个时钟(Tick-Tock)所占用的时间。
介绍了上面两个MPI调用函数,接下来,我们编写一段代码,来测试MPI的时间函数是否正确:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "mpi.h"
#include "test.h"

int main(int argc, char * argv[]) {
    int     err = 0;
    double  t1, t2;
    double  tick;
    int     i;

    MPI_Init(&argc, &argv);
    t1 = MPI_Wtime();           //通过MPI_Wtime()获得当前时间t1
    t2 = MPI_Wtime();           //通过MPI_Wtime()获得当前时间t1
    if (t2 - t1 > 0.1 || t2 - t1 < 0.0) {
        /* 若连续两次调用时间API得到的时间间隔过大,此处为超过0.1s,或者后调用的时间小于先调用的时间,则时间调用有误。*/
        err++;
        fprintf(stderr, "Two successive calls to MPI_Wtime gave strange results: (%f)(%f).\n", t1, t2);
    }
    // 循环测试10次,每次循环调用两次时间函数,两次时间调用的时间间隔是1s
    for (i = 0; i < 10; i++ ) {
        t1 = MPI_Wtime();   //计时开始
        sleep(1);           //睡眠1s
        t2 = MPI_Wtime();   //计时结束
        if (t2 - t1 >= (1.0-0.01) && t2 - t1 <= 5.0) {
            break;
        }
        // 两次计时得到的时间间隔合理,则退出。
        if (t2 - t1 > 5.0) {
            i = 9;
        }
        // 若两次计时得到的时间间隔过大,则改变循环计数变量的值,迫使程序从循环退出。
    }
    // 若计时函数正确,则程序不需要执行10次循环才从循环推出,否则会重复执行10次。
    if (i == 10) {
        // 计时函数不正确
        fprintf(stderr, "Timer around sleep(1) did not give 1 second. gave %f.\n", t2 - t1);
        err++;
    }
    tick = MPI_Wtick();         // 得到一个时钟(Tick-Tock)的时间
    if (tick > 1.0 || tick < 0.0) {
        // 时间太长或为负数,时间不正确,错误
        err++;
        fprintf(stderr, "MPI_Wtick() gave a strange result: (%f).\n", tick);
    }
    MPI_Finalize();
}

程序代码完成后,编译方法参见之前三篇博客的内容,编译及运行命令为:

mpicc MPIWTIME.c -o MPIWTIME
mpiexec MPIWTIME -n 8

由于程序编译及运行较为简单,此处不再详细讲解,作者运行当前程序的环境为神威太湖之光无锡超算集群,使用支持MPI的swcc5(5.421)编译器编译,使用申威LSF作业系统提交作业并执行,此处提交的命令为:

bsub -I -b -q q_sw_expr -host_stack 1024 -share_size 4096 -n 4 -cgsp 64 ./MPI_WTIME

作业提交并运行,效果如图所示:
PARATERA_SWMPI_BSUB_SWRUN

获取机器的名字和MPI版本号

在实际使用MPI编写并行程序的过程中,经常要将一些中间结果或最终结果输出到程序自己创建的文件中,对于在不同机器上的进程,常希望输出的文件名包含该机器名,或者是需要根据不同的机器执行不同的操作,这样仅仅靠进程标识rank是不够的,MPI为此提供了一个专门的调用,使各个进程在运行时可以动态的得到该进程所运行机器的名字。

int MPI_Get_processor_name(char *name, int *resultlen);
int MPI_Get_version(int *version, int *subversion);

MPI_Get_processor_name调用返回调用进程所在机器的名字,MPI_Get_Version返回MPI的主版本号version和次版本号subversion。

是否初始化及错误退出

在MPI程序中唯一一个可以用在MPI_Init之前的调用是MPI_Initialized,它的功能就是判断MPI_INIT是否已经执行。

int MPI_Initialized(int *flag);

MPI_Initialized判断当前进程是否已经调用了MPI_Init,若已调用,则返回flag==true,否则flag==false。在编写MPI程序的过程中,若发现已出现无法恢复的严重错误,因而只好退出MPI程序的执行,MPI提供了这样的调用,并且在退出时可以返回给调用环境一个错误码。

int MPI_Abort(MPI_Comm comm, int errorcode);

MPI_Abort使通信域comm中的所有进程退出,MPI_Abort调用不要求外部环境对返回的错误码采取任何操作。下面的示例会将指定的Master节点杀掉。

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "mpi.h"

int main(int argc, char * argv[]) {  
    int node, size, i;
    int masternode = 0;
    // 设置缺省初始值
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &node);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    // 参数校验
    for (i = 1; i < argc; i++) {
        fprintf(stderr, "myid = %d, procs = %d, argv[%d] = %s.\n", node, size, i, argv[i]);
        if ( argv[i] && strcmp("lastmaster", argv[i]) == 0 ) {
            masternode = size - 1;
            // 将最后一个进程设置为master
        }
    }

    if (node == masternode) {
        // 由master进程执行退出操作
        fprintf(stderr, "myid = %d is masternode Abort!\n", node);
        MPI_Abort(MPI_COMM_WORLD, 99);
    } else {
        // 非master进程等待
        fprintf(stderr, "myid = %d is not masternode Barrier!\n", node);
        MPI_Barrier(MPI_COMM_WORLD);
    }
    MPI_Finalize();
}

运行程序,效果如图所示:
MPI_ABORT_RUNNING_RESULT

数据接力传送

下面给出一个数据接力传送的例子,数据的传送过程如下图所示:
DATA_PASSENGE

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "mpi.h"

int main(int argc, char * argv[]) {
    int rank, value, size;
    MPI_Status mpi_status;
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    // 得到当前进程标识和总的进程数量
    do {
        // 循环执行,直到输入的数据为负数才退出
        if (rank == 0) {
            fprintf(stderr, "\n Please Input New Value: ");
            // 进程0读入要传递的数据
            scanf("%d", &value);
            fprintf(stderr, "%d\t\tread\t\t<-<-\t(%d)\n", rank, value);
            if (size > 1) {
                MPI_Send(&value, 1, MPI_INT, rank + 1, 0, MPI_COMM_WORLD);
                fprintf(stderr, "%d\t\tsend\t\t(%d)->->\t\t%d\n", rank, value, rank+1);
                // 若不少于一个进程,则向下一个进程传递该数据
           }
        } else {
            MPI_Recv(&value, 1, MPI_INT, rank - 1, 0, MPI_COMM_WORLD, &mpi_status);
            // 其他进程从前一个进程接收传递过来的数据
            fprintf(stderr, "%d\t\treceive(%d)<-<-\t\t%d\n", rank, value, rank - 1);
            if ( rank < size - 1 ) {
                MPI_Send(&value, 1, MPI_INT, rank + 1, 0, MPI_COMM_WORLD);
                fprintf(stderr, "%d\t\tsend\t\t(%d)->->\t\t%d\n", rank, value, rank + 1);
                // 若当前进程不是最后一个进程,则将该数据继续向后传递
            }
        }
        MPI_Barrier(MPI_COMM_WORLD);
        // 执行同步,为了将前后两次数据传递分开
    } while(value >= 0);

    MPI_Finalize();
}

运行程序,效果如图所示:
MPI_DATA_PASSENGER_RUNNING_RESULT

任意进程间的相互问候

在许多情况下需要任意两个进程之间都进行数据的交换,下面给出一个例子,任意进程都向其他的进程问好:

ANY_PROCESS_SEND_HELLO

#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"

void Hello(void) {
    // 任意两个进程间交换问候信息,问候信息由发送进程标识和接收进程标识组成。
    int nproc, rank;
    int type = 1;
    int buffer[2], node;
    MPI_Status mpi_status;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &nproc);
    // 得到当前进程标识的总进程数
    if ( rank == 0 ) {
        // 进程0负责打印提示信息
        fprintf(stderr, "\n [MPI Message] Hello from process 0.\n");
        fflush(stdout);
    }

    for ( node = 0; node < nproc; node++ ) {
        // 循环对每个进程进行问候
        if (node != rank) {
            //得到一个和自身不同的进程标识 
            buffer[0] = rank; // 将自身标识放入消息中
            buffer[1] = node; // 将被问候的进程标识放入消息中
            MPI_Send(buffer, 2, MPI_INT, node, type, MPI_COMM_WORLD);   // 首先将问候消息发出
            MPI_Recv(buffer, 2, MPI_INT, node, type, MPI_COMM_WORLD, &mpi_status); // 接收被问候进程对自己发送的问候消息
            if ( (buffer[0] != node) || (buffer[1] != rank) ) {
                // 若接收到的消息的内容不是问候自己的或不是以被问候方的身份问候自己,则出错
                fprintf(stderr, " [MPI Message] Hello: %d != %d or %d != %d.\n", buffer[0], node, buffer[1], rank);
                printf("Mismatch on Hello Process ids; node = %d.\n", node);
            }
            fprintf(stdout, " [MPI Message] Hello from %d to %d.\n", rank, node);
            fflush(stdout);
        }
    }
}

int main(int argc, char **argv) {
    int rank, option, namelen, size;
    char processor_name[MPI_MAX_PROCESSOR_NAME];
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    // 得到当前进程标识和总进程数
    if ( size < 2 ) {
        // 若总进程数小于2,退出
        fprintf(stderr, " [MPI Message] Systest Requires at least 2 processes.\n");
        MPI_Abort(MPI_COMM_WORLD, 1);
    }
    MPI_Get_processor_name(processor_name, &namelen);
    // 得到当前宿主机名称
    fprintf(stderr, " [MPI Message] Process %d is alive on %s.\n", rank, processor_name);
    MPI_Barrier(MPI_COMM_WORLD);    //同步
    Hello();    // 调用问候过程
    MPI_Finalize();
    return EXIT_SUCCESS;
}

运行程序,效果如图所示:
MPI_PROCESS_HELLO_RUNNING_RESULT

任意源和任意标识的使用

在接收操作中,通过使用任意源和任意tag标识,使得该接收操作可以接收任何进程以任何标识发送给本进程的数据,但是该消息的数据类型必须和接收操作的数据类型一致。下面是一个使用任意源和任意标识的例子。Root进程(0号进程)接收来自其他所有进程的消息,然后将各消息的内容,消息来源和消息标识打印出来。如图所示:

MPI_ANY_SOURCE_TAG_EX_IMAGE

#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"

int main(int argc, char **argv) {
    int rank, size, i, buf[1];
    MPI_Status mpi_status;
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    if ( rank == 0 ) {
        for ( i = 0; i < 100 * (size - 1); i++ ) {
            MPI_Recv(buf, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &mpi_status);
            fprintf(stderr, " [MPI Message] Message = %d from %d with tag: %d.\n", buf[0], mpi_status.MPI_SOURCE, mpi_status.MPI_TAG);
        }
    } else {
        for( i = 0; i < 100; i++ ) {
            buf[0] = rank+i;
        }
        MPI_Send(buf, 1, MPI_INT, 0, i, MPI_COMM_WORLD);
    }
    MPI_Finalize();
}

运行程序,效果如图所示:

MPI_ANY_SOURCE_TAG_RUNNING_RESULT

安全的MPI程序

编写MPI程序,如果通信调用顺序不当,很容易造成死锁。如下面的例子(部分代码)总会造成死锁:

MPI_Status status;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank, ierr);
if ( rank == 0 ) {
    MPI_Recv(recvbuf, count, MPI_REAL, 1, tag, comm, status, ierr);
    MPI_Send(sendbuf, count, MPI_REAL, 1, tag, comm, ierr);
} else if ( rank == 1 ) {
    MPI_Recv(recvbuf, count, MPI_REAL, 0, tag, comm, status, ierr);
    MPI_Send(sendbuf, count, MPI_REAL, 0, tag, comm, ierr);
}

下图即为总会发生死锁的通信调用次序(参照上面的程序):
MPI_DEADLOCK_COMM_ORDER_EX_IMAGE

进程0的第一条接收语句A能否完成取决于进程1的第二条发送语句D,即A依赖于D。从执行次序上可以明显的看出,进程0向进程1发送消息的语句C的执行又依赖于它前面接收语句A的完成,即C依赖于A;同时,进程1的第一条接收语句B能否完成取决于进程0的第二条发送语句C的执行,即B依赖于C。从执行次序上可以明显地看出,向进程0发送消息的语句D的执行又依赖于B的完成,故有A依赖于D,而D又依赖于B,B依赖于C,C依赖于A,形成了一个环。进程0和进程1相互等待,无法执行,必然导致死锁。若两个进程需要相互交换数据,在两个进程中首先都进行接收调用显然是不合适的。那么,同时先进行发送调用的结果是这样的:

MPI_Status status;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank, ierr);
if ( rank == 0 ) {
    MPI_Send(sendbuf, count, MPI_REAL, 1, tag, comm, ierr);
    MPI_Recv(recvbuf, count, MPI_REAL, 1, tag, comm, status, ierr);
} else if ( rank == 1 ) {
    MPI_Send(sendbuf, count, MPI_REAL, 0, tag, comm, ierr);
    MPI_Recv(recvbuf, count, MPI_REAL, 0, tag, comm, status, ierr);
}

下图即为不安全的通信调用次序(参照上面的程序):
MPI_NOTSAFE_COMM_ORDER_EX_IMAGE

由于进程0或进程1的发送需要系统提供缓冲区,若系统缓冲区不足,则进程0或进程1的发送将无法完成,则进程0和进程1的接收也无法正确完成。所以,对于相互交换数据的进程,直接将发送语句写在前面仍然是不安全的。

下面是一种可以保证消息安全传递的通信调用次序,即当两个进程需要相互交换数据时,一定要将它们的发送和接收操作按照次序进行匹配,即一个进程的发送操作在前,接收操作在后;另一个进程的接收操作在前,发送操作在后,前后两个发送和接收操作要相互匹配。

MPI_Status status;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank, ierr);
if ( rank == 0 ) {
    MPI_Send(sendbuf, count, MPI_REAL, 1, tag, comm, ierr);
    MPI_Recv(recvbuf, count, MPI_REAL, 1, tag, comm, status, ierr);
} else if ( rank == 1 ) {
    MPI_Recv(recvbuf, count, MPI_REAL, 0, tag, comm, status, ierr);
    MPI_Send(sendbuf, count, MPI_REAL, 0, tag, comm, ierr);
}

下图即为安全的通信调用次序(参照上面的程序):
MPI_SAFE_COMM_ORDER_EX_IMAGE

如上图,C的完成只需要A完成,A的完成只要有对应的D存在,不需要系统提供缓冲区也可以进行,这里恰好满足条件,所以A总能够完成,则D也一定能完成。当A和D完成后,B完成只需要对应的C,同时,也不需要缓冲区即可完成。所以B和C也一定能完成。所以这样的通信形式是安全的。A和C,D和B同时互换,原理上,这种情况是一样的,所以,这种通信调用次序也是安全的。