MPI в качестве высокоуровневого средства разработки параллельных программ

 

1. Выбор средств параллельного программирования

Важной задачей в разработке параллельных приложений является выбор средств их программной реализации и исследования.

Эффективное использование многопроцессорных и многомашинных вычислительных комплексов требует наличия средств разработки параллельных программ. Такие средств включают в себя языки параллельного программирования, библиотеки функций и подпрограмм, средства отладки и оценки производительности (профилирования).

В настоящее время созданы десятки систем параллельного программирования (как специализированных, так и универсальных). Среди них можно назвать следующие: OpenMP, [p]threads, shmem (для систем с разделяемой памятью), Occam, Concurrent C, HPF (высокопроизводительный ФОРТРАН), HPC++, Distributed Java (для систем с распределенной памятью), Parlog, Concurrent Prolog (параллельные версии языка ПРОЛОГ),  Multilisp (параллельная версия языка ЛИСП).

Однако, наибольшее распространение среди систем параллельного программирования получили две следующие:

Обе системы реализуют модель передачи сообщений, содержат библиотеки функций и подпрограмм для стандартных языков программирования СИ, СИ++, ФОРТРАН, обеспечивают взаимодействия "точка-точка" и групповые. В то же время системы имеют и существенные отличия.

Система разработки и выполнения параллельных программ  PVM [http://www.epm.ornl.gov/pvm] разрабатывается и поддерживается в рамках исследовательского проекта с 1989 г. (родоначальниками являются профессор университета Эмори г. Атланта  Vaidy Sunderman и сотрудник Окриджской национальной лаборатории Al Geits). В настоящее время в развитии проекта участвуют многие специалисты во всем мире. Основная цель - обеспечение работы параллельных программ в гетерогенных вычислительных средах. В основу  PVM  положена концепция виртуальной параллельной вычислительной машины, которая объединяет в единый ресурс множество (до тысяч) разнородных узлов. Примечательно, что виртуальная машина допускает динамическое конфигурирование (причем пользователем, а не администратором). Возможно создание приложений как типа SPMD (одна параллельная программа - много потоков данных), та и типа MPMD (много параллельных программ - много потоков данных). Предусмотрены средства корректного преобразования данных при их передаче между узлами различной архитектуры и между программами, написанными на разных языках программирования (СИ/СИ++ и ФОРТРАН). Важно наличие механизма асинхронных уведомлений о значимых событиях в виртуальной машине (добавление/удаление узла, выход узла из строя). PVM изначально является открытой и свободно распространяемой системой.

Стандарт на программный интерфейс передачи сообщений  MPI был разработан в 1994 г. группой  MPI Forum [http://www.mpi-forum.org], в состав которой входили представители промышленности и академических кругов. В настоящее время принята новая версия стандарта - MPI-2. Основная цель стандарта - достижение максимальной производительности (даже в ущерб гибкости) выполнения параллельных программ на супер-ЭВМ и максимальной степени переносимости таких программ на уровне исходных текстов.  MPI  ориентирован, в первую очередь, на создание приложений типа  SPMD, включает спецификацию 128 функций (в MPI-2 добавлено еще 120). MPI характеризуется статической структурой вычислительной среды, задаваемой до начала выполнения параллельной программы, однако, имеет средства динамического построения на этой структуре виртуальных топологий обмена и создания групп процессов. Имеется множество коммерческих реализаций стандарта MPI для конкретных супер-ЭВМ, существуют также свободно распространяемые версии, наиболее известная из которых - MPICH (MPI Chameleon), ориентированная, в первую очередь, на использование в кластерах рабочих станций [ http://www.mcs.anl.gov/mpi/mpich/download.html].

Сравнение  MPI  и  PVM позволяет сделать вывод о предпочтительности использования MPI  в однородных вычислительных средах (машины с массовым параллелизмом и небольшие кластеры из однотипных машин).

2. Программирование в  MPI

Описание на русском языке основ параллельного программирования и, в частности, средств MPI, можно найти в книге "Немнюгин С.А., Стесик О.Л. Параллельное программирование для многопроцессорных вычислительных систем. - СПб: БХВ-Петербург, 2002. - 400с.: ил.".
Параллельная программа, разработанная с использованием  MPI, выполняется в рамках нескольких процессов или ветвей, функционирующих одновременно. Процессы могут выполняться как на разных процессорах, так и на одном и том же - для программы это роли не играет, поскольку в обоих случаях используется единый механизм обмена данными. Процессы обмениваются друг с другом данными в виде сообщений. Сообщения характеризуются  идентификаторами, которые позволяют программе и функциям библиотеки связи отличать их друг от друга. Каждый процесс может узнать у библиотеки связи свой номер внутри группы и, в зависимости от номера, приступить к выполнению соответствующей части расчетов.

Ключевым для MPI является понятие области связи (communication domain). При запуске приложения все процессы помещаются в автоматически создаваемую общую область связи  MPI_COMM_WORLD. При необходимости  приложения могут создавать новые области связи на базе существующих. Процессы, использующие одну область связи, образуют группу. Все области связи имеют независимую друг от друга нумерацию процессов. Каждый процесс может, используя соответствующую функцию  MPI, определить свой уникальный номер в группе. Программе пользователя в распоряжение предоставляется коммуникатор - структура данных, описывающая области связи. Многие функции MPI имеют в качестве одного из аргументов коммуникатор, который ограничивает сферу их действия той областью связи, к которой он привязан. Для одной области связи может существовать несколько коммуникаторов.

3. Типы функций

Функции MPI характеризуются масштабностью выполняемых ими действий и влиянием на ход выполнения процессов.

Локальные функции не инициируют пересылок данных между процессами. Большинство информационных функций является локальными, т.к. копии системных данных хранятся в локально памяти каждого процесса. Функция передачи сообщений MPI_Send и функция синхронизации MPI_Barrier не являются локальными, поскольку производят пересылку. Следует заметить, что, к примеру, функция приема MPI_Recv (парная для MPI_Send) является локальной: она всего лишь пассивно ждет поступления данных, ничего не пытаясь сообщить другим ветвям.

Коллективные функции (например,  MPI_Bcast) для выполнения некоторого общего для группы действия должны быть вызваны всеми процессами одной области связи. Несоблюдение этого правила приводит к ошибкам на стадии выполнения программы (как правило, к "зависанию").

Блокирующие функции останавливают (блокируют) выполнение процесса до тех пор, пока производимая ими операция не будет выполнена. Неблокирующие функции возвращают управление немедленно, а выполнение операции продолжается в фоновом режиме. За завершением операции необходимо проследить особо. Неблокирующие функции возвращают "квитанции", которые погашаются при завершении. До погашения квитанции с переменными и массивами, которые были аргументами неблокирующей функции, ничего делать нельзя.

4. Функции инициализации и завершения

Инициализация коммуникационных средств  MPI  осуществляется функцией  MPI_Init. Эта функция должна быть первой  MPI-функцией в программе. Объявление функции имеет следующий вид:

int  MPI_Init ( 
  int *argc, 
  char ***argv);

В качестве аргументов функции выступают аргументы функции  main программы на языке СИ.

Аварийное завершение параллельного приложения осуществляется функцией MPI_Abort. Эта функция вызывается, если один из процессов в параллельной программе столкнулся с неразрешимой проблемой, связанной с MPI, и требует немедленного завершения всего приложения. Объявление функции имеет следующий вид:

int  MPI_Abort (
  MPI_Comm comm,
  int ierr);

Вызов MPI_Abort из любого процесса принудительно завершает работу всех задач, присоединенных к  области связи  comm. Если в качестве  comm указано MPI_COMM_WORLD, будет завершено все приложение целиком. Аргумент  ierr является идентификатором ошибки  MPI (используется код MPI_ERR_OTHER, если характер ошибки не известен).

Нормальное завершение обменов в  MPI осуществляется функцией MPI_Finalize. Объявление функции имеет следующий вид:

int  MPI_Finalize(void);

Определение общего количества параллельных процессов в группе осуществляется функцией  MPI_Comm_size. Объявление функции имеет следующий вид :

int MPI_Comm_size (
  MPI_Comm comm,
  int *size);

Функция возвращает в  size количество процессов (размер группы) для области связи, задаваемой коммуникатором  comm.

Для определения идентификатора (номера) процесса в группе используется функция  MPI_Comm_rank. Объявление функции имеет следующий вид:

int MPI_Comm_rank (
  MPI_Comm comm,
  int *rank );

Функция возвращает в rank номер процесса в группе, ассоциированной с областью связи, задаваемой коммуникатором.

5. Функции взаимодействия типа "точка-точка"

Для отправления и према сообщений с блокировкой используются функции  MPI_Send и MPI_Recv. Объявление этих функций имеет следующий вид:

int MPI_Send (
  void *buf,
  int count,
  MPI_Datatype datatype,
  int dest,
  int msgtag,
  MPI_Comm comm);

int MPI_Recv (
  void *buf,
  int count,
  MPI_Datatype datatype,
  int src,
  int msgtag,
  MPI_Comm comm,
  MPI_Status *status); 

Аргументы функций:

Реальное количество элементов данных в сообщении, полученном функцией MPI_Recv, можно определить с помощью функции  MPI_Get_Count. Объявление функции имеет следующий вид:

int MPI_Get_Count (
  MPI_Status *status,
  MPI_Datatype datatype,
  int *count);

Функция возвращает в  count количество элементов данных типа  datatype  в сообщении, описываемом структурой  status.

Возврат из функции MPI_Send происходит только тогда, когда все отсылаемые данные из буфера buf будут перемещены в безопасное место (это может быть временный внутренний буфер MPI или даже непосредственно буфер получателя).

Такой режим передачи называется стандартным. Он характеризуется тем, что в ситуации, когда в момент запроса на посылку данных (MPI_Send) еще не был выдан парный запрос на их прием (MPI_Recv), решение о выборе способа передачи данных принимает MPI. Передаваемые данные могут быть перемещены во внутренние буфера MPI для ожидания запроса на их прием, после этого сразу же происходит возврат управления из функции MPI_Send. Однако, может оказаться, что во внутренних буферах MPI недостаточно места для передаваемых данных, тогда MPI заблокирует выполнение MPI_Send до момента окончания реальной передачи данных приемнику (или до появления достаточного места в буферах). Стандартный режим нелокален.

Кроме стандартного существуют еще три режима передачи данных, это находит отражение в правилах именования функций MPI. В общем случае функции обмена данными именуются по следующим правилам (на примере посылки данных):

MPI_[I][R|S|B]Send

Инфикс B означает буферизованный режим, в котором передаваемые данные копируются во внутренние буфера MPI для последующей отсылки приемнику. Возврат из функции происходит немедленно после копирования в буфер. Операция закончится неудачно, если в буфере недостаточно места, однако, в распоряжении программиста есть функции для управления размером буфера. Буферизованный режим локален.

Инфикс S означает синхронный режим, в котором возврат из функции происходит только после начала парной операции приема и освобождения для повторного использования буфкра данных прикладной программы. Внутренние буфера MPI в данном режиме не используются. Синхронный режим нелокален.

Инфикс R означает режим по готовности, в котором операция отсылки может стартовать только при наличии ранее выданного парного запроса на прием данных. Если такого запроса нет, то функция завершается ошибочно, иначе далее работа строится по алгоритму буферизованного режима. Режим по готовности нелокален.

Инфикс I модифицирует семантику функций отсылки, делая их неблокирующими (immediate). Неблокирующие версии характеризуются тем, что они только запускают (стартуют) процесс передачи, но не ждут его завершения (в том смысле, как это понимается в каждом режиме). Для проверки окончания операции используются функции MPI_Wait и MPI_Test.

6. Функции коллективного взаимодействия

Функции коллективного обмена характеризуются следующими особенностями:

Широковещательная рассылка/прием сообщения с блокировкой процессов осуществляется функцией  MPI_Bcast. Объявление функции имеет следующий вид:

int  MPI_Bcast (
  void *buf,
  int count,
  MPI_Datatype datatype,
   int root,
  MPI_Comm comm);

Процесс с идентификатором root с помощью этой функции  рассылает всем процессам в группе  comm сообщение из буфера  buf  длиной  count  элементов типа  datatype. Все процессы (в том числе и процесс  root) в группе  comm принимают сообщение в буфер buf от процесса с идентификатором  root. Значение аргументов  count, datatype  и  root  должно быть одинаковым во всех процессах группы  comm. Широковещательная рассылка может завершиться удачно только при условии, когда все без исключения процессы в группе осуществят вызов функции  MPI_Bcast.

Для сбора однородных данных со всех процессов группы используется функция MPI_Gather (gather -  совок). Объявление функции имеет следующий вид:

int  MPI_Gather (
  void *sbuf,
  int scount,
  MPI_Datatype sdatatype, 
  void *rbuf,
  int rcount,
  MPI_Datatype rdatatype,
  int root,
  MPI_Comm comm);

Каждый процесс в группе  comm отсылает процессу с номером  root scount  элементов данных типа  sdatatype. Процесс root принимает от каждого процесса  rcount элементов данных типа rdatatype и размещает их последовательно в буфере rbuf (порядок следования "подмассивов" в буфере rbuf  соответствует номерам отсылающих процессов). Аргументы  rbuf, rcount и rdatatype имеют смысл только для принимающего процесса root. Аргументы  scount и sdatatype для всех отсылающих процессов должны быть одинаковыми и совпадать со значениями rcount и rdatatype, соответственно, для  root.

Для сбора разного количества элементов данных от разных процессов используется функция MPI_Gatherv  ("векторная").

Функция  MPI_Allgather аналогична функции MPI_Gather, но прием осуществляется не в одном процессе с идентификатором  root , а во  всех. При этом каждый процесс имеет специфическое содержимое в передающем буфере, но все получают одинаковое содержимое в буфере приемном. Как и в MPI_Gather, приемный буфер последовательно заполняется элементами данных из всех передающих буферов.  Существует "векторный" вариант  данной функции (MPI_Allgatherv). 

Для одновременной рассылки разных (но однотипных) данных разным процессам в группе используется функция  MPI_Scatter (scatter -  разбрызгиватель). Объявление функции имеет следующий вид:

int  MPI_Scatter (
  void *sbuf,
  int  scount,
  MPI_Datatype sdatatype, 
  void *rbuf,
  int rcount,
  MPI_Datatype rdatatype,
  int root,
  MPI_Comm comm); 
 

Процесс с идентификатором root с помощью этой функции  распределяет по всем процессам в группе  comm содержимое буфера  sbuf. Каждому процессу отсылается  scount  элементов типа  sdatatype, начиная с (scount*(rank-1))-ого элемента данных  sbuf. Каждый процесс (в том числе и процесс  root) в группе  comm принимает rcount  элементов типа  rdatatype в буфер rbuf от процесса с идентификатором  root. Аргументы  sbuf, scount и sdatatype имеют смысл только для  отсылающего процесса root. Аргументы  rcount и rdatatype для всех  принимающих процессов должны быть одинаковыми и совпадать со значениями scount и sdatatype, соответственно, для  root.

Для распределения разного количества элементов данных разным процессам используется функция MPI_Scatterv  ("векторная").

Функция  MPI_Alltoall реализует схему распределения данных "от каждого процесса - всем", совмещая в себя логику действий одновременно функции  MPI_Gather  и функции  MPI_Scatter. Векторный вариант  функции - MPI_Alltoallv.

Функции коллективного обмена несовместимы с функциями обмена двухточечного: недопустимым, например, является вызов в одном из процессов функции MPI_Recv для приема широковещательного сообщения, сгенерированного функцией MPI_Bcast.

Для синхронизации работы всех процессов в группе без передачи данных используется функция MPI_Barrier. Объявление функции имеет следующий вид:

int MPI_Barrier (
  MPI_Comm comm);

Возврат из функции происходит в момент, когда все процессы в группе вызовут данную функцию.

7. Глобальные операции

Глобальными в  MPI являются операции (такие, как сложение, умножение, поиск экстремума и т.п.), выполняемые встроенными средствами над элементами данных, поставляемыми разными процессами.

Простейшей функцией, реализующей глобальные вычисления, является функция MPI_Reduce. Объявление функции имеет следующий вид:

int  MPI_Reduce (
  void *buf,
  void *res,
  int count, 
  MPI_Datatype datatype, 
  MPI_op op,
  int root,
  MPI_Comm comm);

Данная функция должна быть вызвана в каждом процессе группы  comm, при этом в буфере  buf должно располагаться count элементов данных типа  datatype. В результате в  i-ом элементе данных буфера  res  у принимающего процесса  root будет размещен результат применения операции  op ко всем i-ым элементам данных буферов  buf всех процессов группы  comm. Аргументы  count, datatype, op  и root должны быть одинаковы во всех процессах группы comm. Аргумент res имеет смысл только в принимающем процессе  root.

В  MPI насчитывается 12 предопределенных операций (аргумент  op  функции  MPI_Reduce), идентифицируемых следующими символьными константами:

Допустимые типы операндов для глобальных операций приведены в таблице 1.

Таблица 1
 
Операция
Допустимый тип операндов

MPI_MAX, MPI_MIN

целые и вещественные

MPI_SUM, MPI_PROD

целые, вещественные, комплексные

MPI_LAND, MPI_LOR, MPI_LXOR

целые и логические

MPI_LAND, MPI_LOR, MPI_LXOR

целые (в т.ч. байтовые)
 

С помощью функций MPI_Op_create и MPI_Op_free пользователь может создавать собственные глобальные операции.

Существуют и более сложные варианты функции  MPI_Reduce. Так функция MPI_Allreduce рассылает результат применения глобальной операции не одному (root) процессу, а всем процессам в группе. Функция MPI_Reduce_scatter распределяет массив-результат частями по отдельным процессам. Функция MPI_Scan аналогична функции MPI_Allreduce в том, что каждый процесс в группе получает результирующий массив. Однако, здесь содержимое приемного буфера в процессе  k является результатом применения глобальной операции к буферам-источникам процессов с номерами от 0 до  k включительно.

Пример MPI-программы

Ниже приведен текст MPI-программы умножения матрицы на вектор.

/*----- Умножает матрицу на вектор: C=AxB -----*/

// Для хранения матрицы используется одномерный	массив, в котором
// последовательно располагаются строки матрицы.
// Для обращения к элементу матрицы A, находящемуся в j-ом столбце i-ой
// строки используется конструкция A[i*n+j], где n - количество столбцов.
// Такой способ хранения позволяет легко декомпозировать матрицу на
// "горизонтальные" ленты.

#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>

#include <mpi.h>
#define _REENTRANT

#define N_PER_PROC 100	// Кол-во строк матрицы на процесс

int main(int argc, char **argv) {
  int myrank, total;

  double *A, *B, *C;	// Используются только в root

  double *a, *b, *c;	// Лента матрицы [mxn], вектор [n], рез-т [m]

  int n;    // Размерность квадратной матрицы
  int m;    // Ширина горизонтальной ленты матрицы
  int i, j;
  int intBuf[2];

  MPI_Init (&argc, &argv);
  MPI_Comm_size (MPI_COMM_WORLD, &total);
  MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
  printf ("Total=%d, rank=%d\n", total, myrank);

  if (!myrank) {  
    // Подготовка исх. данных (только root)
    n = N_PER_PROC * total;
    A = (double *) malloc (sizeof(double)*n*n);
    B = (double *) malloc (sizeof(double)*n);
    C = (double *) malloc (sizeof(double)*n);
    // Инициализация матрицы A и вектора B
    for (i=0; i<n; i++) {
      B[i] = (double)i;
      for (j=0; j<n; j++)
	A[i*n+j] = (double)(i+j);
      };  
    };

  if (!myrank) {  
    intBuf[0] = n;
    intBuf[1] = N_PER_PROC;  
    };
  MPI_Bcast((void *)intBuf, 2, MPI_INT, 0, MPI_COMM_WORLD);
  n = intBuf[0];
  m = intBuf[1];

  a = (double *) malloc (sizeof(double)*n*m);
  b = (double *) malloc (sizeof(double)*n);
  c = (double *) malloc (sizeof(double)*m);

  if (!myrank) {	// Лишнее действие, B не нужен 
    memcpy (b, B, sizeof(double)*n); 
    };
  MPI_Bcast((void *)b, n, MPI_DOUBLE, 0, MPI_COMM_WORLD);

  MPI_Scatter((void *)A, n*m, MPI_DOUBLE,
	      (void *)a, n*m, MPI_DOUBLE, 0, MPI_COMM_WORLD);

  for (i=0; i<m; i++) {
    c[i] = 0;
    for (j=0; j<n; j++)
      c[i] += a[n*i+j]*b[j];
    };

  MPI_Gather((void *)c, m, MPI_DOUBLE,
	    (void *)C, m, MPI_DOUBLE, 0, MPI_COMM_WORLD);

  if (!myrank)  
    for (i=0; i<n; i++)
      printf ("%g\n", C[i]);

  MPI_Finalize();

  exit(0);
  }