Важной задачей в разработке параллельных приложений является выбор средств их программной реализации и исследования.
Эффективное использование многопроцессорных и многомашинных вычислительных комплексов требует наличия средств разработки параллельных программ. Такие средств включают в себя языки параллельного программирования, библиотеки функций и подпрограмм, средства отладки и оценки производительности (профилирования).
В настоящее время созданы десятки систем параллельного программирования (как специализированных, так и универсальных). Среди них можно назвать следующие: OpenMP, [p]threads, shmem (для систем с разделяемой памятью), Occam, Concurrent C, HPF (высокопроизводительный ФОРТРАН), HPC++, Distributed Java (для систем с распределенной памятью), Parlog, Concurrent Prolog (параллельные версии языка ПРОЛОГ), Multilisp (параллельная версия языка ЛИСП).
Однако, наибольшее распространение среди систем параллельного программирования получили две следующие:
MPI (Message Passing Interface - интерфейс передачи сообщений).
Обе системы реализуют модель передачи сообщений, содержат библиотеки функций и подпрограмм для стандартных языков программирования СИ, СИ++, ФОРТРАН, обеспечивают взаимодействия "точка-точка" и групповые. В то же время системы имеют и существенные отличия.
Система разработки и выполнения параллельных программ PVM [http://www.epm.ornl.gov/pvm] разрабатывается и поддерживается в рамках исследовательского проекта с 1989 г. (родоначальниками являются профессор университета Эмори г. Атланта Vaidy Sunderman и сотрудник Окриджской национальной лаборатории Al Geits). В настоящее время в развитии проекта участвуют многие специалисты во всем мире. Основная цель - обеспечение работы параллельных программ в гетерогенных вычислительных средах. В основу PVM положена концепция виртуальной параллельной вычислительной машины, которая объединяет в единый ресурс множество (до тысяч) разнородных узлов. Примечательно, что виртуальная машина допускает динамическое конфигурирование (причем пользователем, а не администратором). Возможно создание приложений как типа SPMD (одна параллельная программа - много потоков данных), та и типа MPMD (много параллельных программ - много потоков данных). Предусмотрены средства корректного преобразования данных при их передаче между узлами различной архитектуры и между программами, написанными на разных языках программирования (СИ/СИ++ и ФОРТРАН). Важно наличие механизма асинхронных уведомлений о значимых событиях в виртуальной машине (добавление/удаление узла, выход узла из строя). PVM изначально является открытой и свободно распространяемой системой.
Стандарт на программный интерфейс передачи сообщений MPI был разработан в 1994 г. группой MPI Forum [http://www.mpi-forum.org], в состав которой входили представители промышленности и академических кругов. В настоящее время принята новая версия стандарта - MPI-2. Основная цель стандарта - достижение максимальной производительности (даже в ущерб гибкости) выполнения параллельных программ на супер-ЭВМ и максимальной степени переносимости таких программ на уровне исходных текстов. MPI ориентирован, в первую очередь, на создание приложений типа SPMD, включает спецификацию 128 функций (в MPI-2 добавлено еще 120). MPI характеризуется статической структурой вычислительной среды, задаваемой до начала выполнения параллельной программы, однако, имеет средства динамического построения на этой структуре виртуальных топологий обмена и создания групп процессов. Имеется множество коммерческих реализаций стандарта MPI для конкретных супер-ЭВМ, существуют также свободно распространяемые версии, наиболее известная из которых - MPICH (MPI Chameleon), ориентированная, в первую очередь, на использование в кластерах рабочих станций [ http://www.mcs.anl.gov/mpi/mpich/download.html].
Сравнение MPI и PVM позволяет сделать вывод о предпочтительности использования MPI в однородных вычислительных средах (машины с массовым параллелизмом и небольшие кластеры из однотипных машин).
Описание на русском языке основ параллельного программирования
и, в частности, средств MPI, можно найти в книге "Немнюгин С.А., Стесик О.Л.
Параллельное программирование для многопроцессорных вычислительных систем.
- СПб: БХВ-Петербург, 2002. - 400с.: ил.".
Параллельная программа, разработанная с использованием
MPI, выполняется в рамках нескольких процессов или
ветвей, функционирующих одновременно. Процессы могут выполняться как
на разных процессорах, так и на одном и том же - для программы это роли не
играет, поскольку в обоих случаях используется единый механизм обмена данными.
Процессы обмениваются друг с другом данными в виде сообщений. Сообщения
характеризуются идентификаторами, которые позволяют программе и функциям
библиотеки связи отличать их друг от друга. Каждый процесс может узнать у
библиотеки связи свой номер внутри группы и, в зависимости от
номера, приступить
к выполнению соответствующей части расчетов.
Ключевым для MPI является понятие области связи (communication domain). При запуске приложения все процессы помещаются в автоматически создаваемую общую область связи MPI_COMM_WORLD. При необходимости приложения могут создавать новые области связи на базе существующих. Процессы, использующие одну область связи, образуют группу. Все области связи имеют независимую друг от друга нумерацию процессов. Каждый процесс может, используя соответствующую функцию MPI, определить свой уникальный номер в группе. Программе пользователя в распоряжение предоставляется коммуникатор - структура данных, описывающая области связи. Многие функции MPI имеют в качестве одного из аргументов коммуникатор, который ограничивает сферу их действия той областью связи, к которой он привязан. Для одной области связи может существовать несколько коммуникаторов.
Функции MPI характеризуются масштабностью выполняемых ими действий и влиянием на ход выполнения процессов.
Локальные функции не инициируют пересылок данных между процессами. Большинство информационных функций является локальными, т.к. копии системных данных хранятся в локально памяти каждого процесса. Функция передачи сообщений MPI_Send и функция синхронизации MPI_Barrier не являются локальными, поскольку производят пересылку. Следует заметить, что, к примеру, функция приема MPI_Recv (парная для MPI_Send) является локальной: она всего лишь пассивно ждет поступления данных, ничего не пытаясь сообщить другим ветвям.
Коллективные функции (например, MPI_Bcast) для выполнения некоторого общего для группы действия должны быть вызваны всеми процессами одной области связи. Несоблюдение этого правила приводит к ошибкам на стадии выполнения программы (как правило, к "зависанию").
Блокирующие функции останавливают (блокируют) выполнение процесса до тех пор, пока производимая ими операция не будет выполнена. Неблокирующие функции возвращают управление немедленно, а выполнение операции продолжается в фоновом режиме. За завершением операции необходимо проследить особо. Неблокирующие функции возвращают "квитанции", которые погашаются при завершении. До погашения квитанции с переменными и массивами, которые были аргументами неблокирующей функции, ничего делать нельзя.
Инициализация коммуникационных средств MPI осуществляется функцией MPI_Init. Эта функция должна быть первой MPI-функцией в программе. Объявление функции имеет следующий вид:
int MPI_Init ( int *argc, char ***argv);
В качестве аргументов функции выступают аргументы функции main программы на языке СИ.
Аварийное завершение параллельного приложения осуществляется функцией MPI_Abort. Эта функция вызывается, если один из процессов в параллельной программе столкнулся с неразрешимой проблемой, связанной с MPI, и требует немедленного завершения всего приложения. Объявление функции имеет следующий вид:
int MPI_Abort ( MPI_Comm comm, int ierr);
Вызов MPI_Abort из любого процесса принудительно завершает работу всех задач, присоединенных к области связи comm. Если в качестве comm указано MPI_COMM_WORLD, будет завершено все приложение целиком. Аргумент ierr является идентификатором ошибки MPI (используется код MPI_ERR_OTHER, если характер ошибки не известен).
Нормальное завершение обменов в MPI осуществляется функцией MPI_Finalize. Объявление функции имеет следующий вид:
int MPI_Finalize(void);
Определение общего количества параллельных процессов в группе осуществляется функцией MPI_Comm_size. Объявление функции имеет следующий вид :
int MPI_Comm_size ( MPI_Comm comm, int *size);
Функция возвращает в size количество процессов (размер группы) для области связи, задаваемой коммуникатором comm.
Для определения идентификатора (номера) процесса в группе используется функция MPI_Comm_rank. Объявление функции имеет следующий вид:
int MPI_Comm_rank ( MPI_Comm comm, int *rank );
Функция возвращает в rank номер процесса в группе, ассоциированной с областью связи, задаваемой коммуникатором.
Для отправления и према сообщений с блокировкой используются функции MPI_Send и MPI_Recv. Объявление этих функций имеет следующий вид:
int MPI_Send ( void *buf, int count, MPI_Datatype datatype, int dest, int msgtag, MPI_Comm comm); int MPI_Recv ( void *buf, int count, MPI_Datatype datatype, int src, int msgtag, MPI_Comm comm, MPI_Status *status);
Аргументы функций:
buf - адрес буфера, содержащий отправляемые (MPI_Send) или принимаемые (MPI_Recv) данные.
count - количество элементов в отправляемом сообщении (MPI_Send) или максимальное количество элементов в приемном буфере (MPI_Recv). Существенно, что count определяет количество элементов сообщения/буфера, а не длину в байтах. Количество реально принятых функцией MPI_Recv элементов данных можно выяснить с помощью функции MPI_Get_Count.
datatype - тип элементов сообщения. MPI_Send и MPI_Recv оперируют одномерными массивами элементов одного типа. Для описания базовых типов языка СИ в MPI определены константы MPI_INT, MPI_CHAR, MPI_DOUBLE и т.п.. Их имена образуются префиксом "MPI_" и именем соответствующего типа (int, char, double и т.п.), записанным заглавными буквами. Пользователю предоставлены не описываемые здесь возможности определять в MPI свои собственные типы данных, например, структуры, после чего MPI сможет обрабатывать их наравне с базовыми. При передаче данных между машинами различной архитектурой автоматически происходит корректное преобразование данных. Возможна также "сырая" пересылка без преобразования данных для типа элементов MPI_BYTE.
dst и src - идентификатор (номер) в группе, определяемой comm, процесса-партнера по обмену сообщением. В качестве этого аргумента функция MPI_Recv может принимать значение MPI_ANY_SOURCE, разрешающее прием сообщения от любого процесса-источника.
msgtag - идентификатор сообщения, произвольное целое число в диапазоне от 0 до 32767. В качестве этого аргумента функция MPI_Recv может принимать значение MPI_ANY_TAG, разрешающее прием сообщения c любым идентификатором.
status - структура, содержащая информацию о принятом сообщении: его идентификатор, номер задачи-источника, код завершения и количество фактически пришедших элементов данных.
Реальное количество элементов данных в сообщении, полученном функцией MPI_Recv, можно определить с помощью функции MPI_Get_Count. Объявление функции имеет следующий вид:
int MPI_Get_Count ( MPI_Status *status, MPI_Datatype datatype, int *count);
Функция возвращает в count количество элементов данных типа datatype в сообщении, описываемом структурой status.
Возврат из функции MPI_Send происходит только тогда, когда все отсылаемые данные из буфера buf будут перемещены в безопасное место (это может быть временный внутренний буфер MPI или даже непосредственно буфер получателя).
Такой режим передачи называется стандартным. Он характеризуется тем, что в ситуации, когда в момент запроса на посылку данных (MPI_Send) еще не был выдан парный запрос на их прием (MPI_Recv), решение о выборе способа передачи данных принимает MPI. Передаваемые данные могут быть перемещены во внутренние буфера MPI для ожидания запроса на их прием, после этого сразу же происходит возврат управления из функции MPI_Send. Однако, может оказаться, что во внутренних буферах MPI недостаточно места для передаваемых данных, тогда MPI заблокирует выполнение MPI_Send до момента окончания реальной передачи данных приемнику (или до появления достаточного места в буферах). Стандартный режим нелокален.
Кроме стандартного существуют еще три режима передачи данных, это находит отражение в правилах именования функций MPI. В общем случае функции обмена данными именуются по следующим правилам (на примере посылки данных):
Инфикс B означает буферизованный режим, в котором передаваемые данные копируются во внутренние буфера MPI для последующей отсылки приемнику. Возврат из функции происходит немедленно после копирования в буфер. Операция закончится неудачно, если в буфере недостаточно места, однако, в распоряжении программиста есть функции для управления размером буфера. Буферизованный режим локален.
Инфикс S означает синхронный режим, в котором возврат из функции происходит только после начала парной операции приема и освобождения для повторного использования буфкра данных прикладной программы. Внутренние буфера MPI в данном режиме не используются. Синхронный режим нелокален.
Инфикс R означает режим по готовности, в котором операция отсылки может стартовать только при наличии ранее выданного парного запроса на прием данных. Если такого запроса нет, то функция завершается ошибочно, иначе далее работа строится по алгоритму буферизованного режима. Режим по готовности нелокален.
Инфикс I модифицирует семантику функций отсылки, делая их неблокирующими (immediate). Неблокирующие версии характеризуются тем, что они только запускают (стартуют) процесс передачи, но не ждут его завершения (в том смысле, как это понимается в каждом режиме). Для проверки окончания операции используются функции MPI_Wait и MPI_Test.
Функции коллективного обмена характеризуются следующими особенностями:
на прием и/или передачу работают одновременно все процессы указываемой через коммуникатор области связи;
функции коллективного взаимодействия выполняют одновременно и прием, и передачу; они имеют большое количество параметров, одна часть которых нужна для приема, а другая - для передачи данных;
как правило, значения всех параметров (за исключением адресов буферов) должны быть идентичными во всех задачах;
MPI назначает идентификатор для сообщений автоматически;
кроме того, сообщения реально передаются не по указываемому
коммуникатору, а по временному коммуникатору-дубликату,
тем самым потоки данных коллективных функций надежно изолируются
друг от друга и от потоков, созданных функциями двухточечного обмена.
Широковещательная рассылка/прием сообщения с блокировкой процессов осуществляется функцией MPI_Bcast. Объявление функции имеет следующий вид:
int MPI_Bcast ( void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm);
Процесс с идентификатором root с помощью этой функции рассылает всем процессам в группе comm сообщение из буфера buf длиной count элементов типа datatype. Все процессы (в том числе и процесс root) в группе comm принимают сообщение в буфер buf от процесса с идентификатором root. Значение аргументов count, datatype и root должно быть одинаковым во всех процессах группы comm. Широковещательная рассылка может завершиться удачно только при условии, когда все без исключения процессы в группе осуществят вызов функции MPI_Bcast.
Для сбора однородных данных со всех процессов группы используется функция MPI_Gather (gather - совок). Объявление функции имеет следующий вид:
int MPI_Gather ( void *sbuf, int scount, MPI_Datatype sdatatype, void *rbuf, int rcount, MPI_Datatype rdatatype, int root, MPI_Comm comm);
Каждый процесс в группе comm отсылает процессу с номером root scount элементов данных типа sdatatype. Процесс root принимает от каждого процесса rcount элементов данных типа rdatatype и размещает их последовательно в буфере rbuf (порядок следования "подмассивов" в буфере rbuf соответствует номерам отсылающих процессов). Аргументы rbuf, rcount и rdatatype имеют смысл только для принимающего процесса root. Аргументы scount и sdatatype для всех отсылающих процессов должны быть одинаковыми и совпадать со значениями rcount и rdatatype, соответственно, для root.
Для сбора разного количества элементов данных от разных процессов используется функция MPI_Gatherv ("векторная").
Функция MPI_Allgather аналогична функции MPI_Gather, но прием осуществляется не в одном процессе с идентификатором root , а во всех. При этом каждый процесс имеет специфическое содержимое в передающем буфере, но все получают одинаковое содержимое в буфере приемном. Как и в MPI_Gather, приемный буфер последовательно заполняется элементами данных из всех передающих буферов. Существует "векторный" вариант данной функции (MPI_Allgatherv).
Для одновременной рассылки разных (но однотипных) данных разным процессам в группе используется функция MPI_Scatter (scatter - разбрызгиватель). Объявление функции имеет следующий вид:
int MPI_Scatter ( void *sbuf, int scount, MPI_Datatype sdatatype, void *rbuf, int rcount, MPI_Datatype rdatatype, int root, MPI_Comm comm);
Процесс с идентификатором root с помощью этой функции распределяет по всем процессам в группе comm содержимое буфера sbuf. Каждому процессу отсылается scount элементов типа sdatatype, начиная с (scount*(rank-1))-ого элемента данных sbuf. Каждый процесс (в том числе и процесс root) в группе comm принимает rcount элементов типа rdatatype в буфер rbuf от процесса с идентификатором root. Аргументы sbuf, scount и sdatatype имеют смысл только для отсылающего процесса root. Аргументы rcount и rdatatype для всех принимающих процессов должны быть одинаковыми и совпадать со значениями scount и sdatatype, соответственно, для root.
Для распределения разного количества элементов данных разным процессам используется функция MPI_Scatterv ("векторная").
Функция MPI_Alltoall реализует схему распределения данных "от каждого процесса - всем", совмещая в себя логику действий одновременно функции MPI_Gather и функции MPI_Scatter. Векторный вариант функции - MPI_Alltoallv.
Функции коллективного обмена несовместимы с функциями обмена двухточечного: недопустимым, например, является вызов в одном из процессов функции MPI_Recv для приема широковещательного сообщения, сгенерированного функцией MPI_Bcast.
Для синхронизации работы всех процессов в группе без передачи данных используется функция MPI_Barrier. Объявление функции имеет следующий вид:
int MPI_Barrier ( MPI_Comm comm);
Возврат из функции происходит в момент, когда все процессы в группе вызовут данную функцию.
Глобальными в MPI являются операции (такие, как сложение, умножение, поиск экстремума и т.п.), выполняемые встроенными средствами над элементами данных, поставляемыми разными процессами.
Простейшей функцией, реализующей глобальные вычисления, является функция MPI_Reduce. Объявление функции имеет следующий вид:
int MPI_Reduce ( void *buf, void *res, int count, MPI_Datatype datatype, MPI_op op, int root, MPI_Comm comm);
Данная функция должна быть вызвана в каждом процессе группы comm, при этом в буфере buf должно располагаться count элементов данных типа datatype. В результате в i-ом элементе данных буфера res у принимающего процесса root будет размещен результат применения операции op ко всем i-ым элементам данных буферов buf всех процессов группы comm. Аргументы count, datatype, op и root должны быть одинаковы во всех процессах группы comm. Аргумент res имеет смысл только в принимающем процессе root.
В MPI насчитывается 12 предопределенных операций (аргумент op функции MPI_Reduce), идентифицируемых следующими символьными константами:
MPI_MAX и MPI_MIN - поиск максимума и минимума, соответственно;
MPI_SUM - вычисление суммы;
MPI_PROD - вычисление произведения;
MPI_LAND, MPI_BAND, MPI_LOR, MPI_BOR, MPI_LXOR, MPI_BXOR - логические и битовые операции И, ИЛИ, исключающее ИЛИ;
MPI_MAXLOC, MPI_MINLOC - поиск индексированного максимума и минимума.
Допустимые типы операндов для глобальных операций приведены в таблице 1.
|
Допустимый тип операндов |
MPI_MAX, MPI_MIN |
|
MPI_SUM, MPI_PROD |
|
MPI_LAND, MPI_LOR, MPI_LXOR |
|
MPI_LAND, MPI_LOR, MPI_LXOR |
|
С помощью функций MPI_Op_create и MPI_Op_free пользователь может создавать собственные глобальные операции.
Существуют и более сложные варианты функции MPI_Reduce. Так функция MPI_Allreduce рассылает результат применения глобальной операции не одному (root) процессу, а всем процессам в группе. Функция MPI_Reduce_scatter распределяет массив-результат частями по отдельным процессам. Функция MPI_Scan аналогична функции MPI_Allreduce в том, что каждый процесс в группе получает результирующий массив. Однако, здесь содержимое приемного буфера в процессе k является результатом применения глобальной операции к буферам-источникам процессов с номерами от 0 до k включительно.
Ниже приведен текст MPI-программы умножения матрицы на вектор.
/*----- Умножает матрицу на вектор: C=AxB -----*/ // Для хранения матрицы используется одномерный массив, в котором // последовательно располагаются строки матрицы. // Для обращения к элементу матрицы A, находящемуся в j-ом столбце i-ой // строки используется конструкция A[i*n+j], где n - количество столбцов. // Такой способ хранения позволяет легко декомпозировать матрицу на // "горизонтальные" ленты. #include <stdlib.h> #include <unistd.h> #include <stdio.h> #include <mpi.h> #define _REENTRANT #define N_PER_PROC 100 // Кол-во строк матрицы на процесс int main(int argc, char **argv) { int myrank, total; double *A, *B, *C; // Используются только в root double *a, *b, *c; // Лента матрицы [mxn], вектор [n], рез-т [m] int n; // Размерность квадратной матрицы int m; // Ширина горизонтальной ленты матрицы int i, j; int intBuf[2]; MPI_Init (&argc, &argv); MPI_Comm_size (MPI_COMM_WORLD, &total); MPI_Comm_rank (MPI_COMM_WORLD, &myrank); printf ("Total=%d, rank=%d\n", total, myrank); if (!myrank) { // Подготовка исх. данных (только root) n = N_PER_PROC * total; A = (double *) malloc (sizeof(double)*n*n); B = (double *) malloc (sizeof(double)*n); C = (double *) malloc (sizeof(double)*n); // Инициализация матрицы A и вектора B for (i=0; i<n; i++) { B[i] = (double)i; for (j=0; j<n; j++) A[i*n+j] = (double)(i+j); }; }; if (!myrank) { intBuf[0] = n; intBuf[1] = N_PER_PROC; }; MPI_Bcast((void *)intBuf, 2, MPI_INT, 0, MPI_COMM_WORLD); n = intBuf[0]; m = intBuf[1]; a = (double *) malloc (sizeof(double)*n*m); b = (double *) malloc (sizeof(double)*n); c = (double *) malloc (sizeof(double)*m); if (!myrank) { // Лишнее действие, B не нужен memcpy (b, B, sizeof(double)*n); }; MPI_Bcast((void *)b, n, MPI_DOUBLE, 0, MPI_COMM_WORLD); MPI_Scatter((void *)A, n*m, MPI_DOUBLE, (void *)a, n*m, MPI_DOUBLE, 0, MPI_COMM_WORLD); for (i=0; i<m; i++) { c[i] = 0; for (j=0; j<n; j++) c[i] += a[n*i+j]*b[j]; }; MPI_Gather((void *)c, m, MPI_DOUBLE, (void *)C, m, MPI_DOUBLE, 0, MPI_COMM_WORLD); if (!myrank) for (i=0; i<n; i++) printf ("%g\n", C[i]); MPI_Finalize(); exit(0); }