MPI 实现直接选择排序


选择排序

1/主线程完成读数据 分发 回收和过程控制

2/从线程完成从自己的数据集上选出最小的数值

3/主线程 从收到的数据中选出最小的更新 通知相应线程 发来新的最小值 选择出下一个最小值 循环至完成

  1. #include "mpi.h"   
  2. #include <unistd.h>   
  3. #include <fcntl.h>   
  4. #include <ctype.h>   
  5. #include <stdio.h>   
  6. #include <stdlib.h>   
  7.   
  8. #define TAG 0   
  9. #define INT_MAX 999999999   
  10. #define ROOT 0    
  11.   
  12. int readValue(char* file,int* values) {  
  13.     int fd,size,count=0;  
  14.     char buffer[80],num[11];  
  15.     fd=open(file,O_RDONLY);  
  16.     do {  
  17.         size=read(fd,buffer,sizeof(buffer));  
  18.         int j=0;  
  19.         for(int i=0;i<size;++i) {  
  20.             if(buffer[i]<'9'&&buffer[i]>'0') {  
  21.                 num[j++]=buffer[i];  
  22.             } else {  
  23.                 num[j]='\0';  
  24.                 values[count++]=atoi(num);  
  25.                 j=0;      
  26.             }  
  27.         }  
  28.     } while(size!=0);  
  29.     close(fd);  
  30.     return count;  
  31. }  
  32. void defineIntVector(MPI_Datatype* type,int length) {  
  33.     MPI_Type_vector(length,1,1,MPI_INT,type);  
  34.     MPI_Type_commit(type);  
  35. }  
  36. int getMin(int *mp,int *array,int end,int ex) {  
  37.     int min=array[0];  
  38.     *mp=0;  
  39.     for(int i=1;i<=end;++i) {  
  40.         if(min>array[i]) {  
  41.             min=array[i];  
  42.             *mp=i;  
  43.         }  
  44.     }  
  45.     if(ex) {  
  46.         array[*mp]=array[end];  
  47.         array[end]=min;  
  48.     }  
  49.     return min;  
  50. }  
  51. int main(int argc,char *argv[]) {  
  52.     int *values;  
  53.     int *mins;  
  54.     int self,size,length,part,min,mp;  
  55.     MPI_Datatype MPI_VECTOR;  
  56.     MPI_Status status;  
  57.     MPI_Init(&argc,&argv);  
  58.     MPI_Comm_rank(MPI_COMM_WORLD,&self);  
  59.     MPI_Comm_size(MPI_COMM_WORLD,&size);  
  60.     //read value and broadcast to other processes   
  61.     if(0==self) {  
  62.         values=(int *)malloc(100*sizeof(int));  
  63.         length=readValue("/home/gt/parellel/sort/data.in",values);  
  64.         //each process recieves a part of all data data expands to a std length   
  65.         part=length/(size-1);  
  66.         if(0==length%part) {  
  67.         } else {  
  68.             part+=1;  
  69.             for(int i=length;i<part*(size-1);++i) {  
  70.                 values[i]=INT_MAX;  
  71.             }  
  72.         }  
  73.         //broadcast length of each part   
  74.         MPI_Bcast(&part,1,MPI_INT,ROOT,MPI_COMM_WORLD);  
  75.         //minium value from each process   
  76.         defineIntVector(&MPI_VECTOR,part);  
  77.         //send out all the data   
  78.         for(int i=1;i<size;++i) {  
  79.             MPI_Ssend(&values[(i-1)*part],1,MPI_VECTOR,i,TAG,MPI_COMM_WORLD);  
  80.         }  
  81.         //gather all min from process   
  82.         //first   
  83.         mins=(int *)malloc(size*sizeof(int));  
  84.         min=INT_MAX;  
  85.         MPI_Gather(&min,1,MPI_INT,mins,1,MPI_INT,ROOT,MPI_COMM_WORLD);  
  86.         MPI_Barrier(MPI_COMM_WORLD);  
  87.         values[0]=getMin(&mp,mins,size-1,0);  
  88.         for(int i=1;i<length;++i) {  
  89.             //inform correspond process to provide another min   
  90.             MPI_Ssend(&mp,1,MPI_INT,mp,TAG,MPI_COMM_WORLD);  
  91.             //get result   
  92.             MPI_Recv(&mins[mp],1,MPI_INT,mp,TAG,MPI_COMM_WORLD,&status);  
  93.             values[i]=getMin(&mp,mins,size-1,0);  
  94.         }  
  95.         for(int i=1;i<size;++i) {  
  96.             mp=INT_MAX;  
  97.             MPI_Ssend(&mp,1,MPI_INT,i,TAG,MPI_COMM_WORLD);  
  98.         }  
  99.         for(int i=0;i<length;++i) {  
  100.             printf("%d ",values[i]);  
  101.         }  
  102.         printf("\n");  
  103.         free(mins);  
  104.     } else {  
  105.         MPI_Bcast(&part,1,MPI_INT,ROOT,MPI_COMM_WORLD);  
  106.         values=(int *)malloc(part*sizeof(int));  
  107.         defineIntVector(&MPI_VECTOR,part);  
  108.         MPI_Recv(values,1,MPI_VECTOR,ROOT,TAG,MPI_COMM_WORLD,&status);  
  109.         min=getMin(&mp,values,part-1,1);  
  110.         --part;  
  111.         MPI_Gather(&min,1,MPI_INT,mins,1,MPI_INT,ROOT,MPI_COMM_WORLD);  
  112.         MPI_Barrier(MPI_COMM_WORLD);  
  113.         while(1) {  
  114.             MPI_Recv(&mp,1,MPI_INT,ROOT,TAG,MPI_COMM_WORLD,&status);  
  115.             if(INT_MAX==mp) {//signal for end process   
  116.                 break;  
  117.             }  
  118.             min=getMin(&mp,values,part-1,1);  
  119.             --part;  
  120.             MPI_Ssend(&min,1,MPI_INT,ROOT,TAG,MPI_COMM_WORLD);  
  121.         }  
  122.     }  
  123.     free(values);  
  124.     MPI_Finalize();  
  125.     return 0;  
  126. }  

相关内容