MPI 实现并行直接插入排序


话说 直插排序串行只需要十几行 并行就大不一样。。。

基本思路是

1/主线程控制数据读取 分发和汇总

2/从线程中数据按线程号非降有序 每次有新数据 主线程广播 从线程按照数据大小判断是否在自己处理范围内 如果在的话 在自己保存的数组中进行直接插入

3/最后 主线程按序收集显示所有数据

  1. #include "mpi.h"   
  2. #include <unistd.h>   
  3. #include <fcntl.h>   
  4. #include <ctype.h>   
  5. #include <stdio.h>   
  6. #include <stdlib.h>   
  7.   
  8. #define END 0   
  9. #define INIT 1   
  10. #define DATA 2   
  11. #define ROOT 0   
  12.   
  13. int readValue(char* file,int* values)  
  14. {  
  15.     int fd,size,count=0;  
  16.     char buffer[80],num[11];  
  17.     fd=open(file,O_RDONLY);  
  18.     do  
  19.     {  
  20.         size=read(fd,buffer,sizeof(buffer));  
  21.         int j=0;  
  22.         for(int i=0;i<size;++i)  
  23.         {  
  24.             if(buffer[i]<'9'&&buffer[i]>'0')  
  25.             {  
  26.                 num[j++]=buffer[i];  
  27.             }else  
  28.             {  
  29.                 num[j]='\0';  
  30.                 values[count++]=atoi(num);  
  31.                 j=0;      
  32.             }  
  33.         }  
  34.     }while(size!=0);  
  35.     close(fd);  
  36.     return count;  
  37. }  
  38. void insert(int* arr,int pos,int temp)  
  39. {  
  40.     while(pos>=0&&temp<arr[pos])  
  41.     {  
  42.         arr[pos+1]=arr[pos];  
  43.         --pos;  
  44.     }  
  45.     arr[pos+1]=temp;  
  46. }  
  47. void serial(int* arr,int size)  
  48. {  
  49.     for(int i=1;i<size;++i)  
  50.     {  
  51.         insert(arr,i-1,arr[i]);  
  52.     }  
  53. }  
  54.   
  55. int main(int argc,char *argv[])  
  56. {  
  57.     int* values;  
  58.     int self,size,length,temp;  
  59.     MPI_Init(&argc,&argv);  
  60.     MPI_Comm_size(MPI_COMM_WORLD,&size);  
  61.     MPI_Comm_rank(MPI_COMM_WORLD,&self);  
  62.     MPI_Status status;  
  63.     if(self==0)  
  64.     {  
  65.         //read value from file   
  66.         values=(int*)malloc(100*sizeof(int));  
  67.         length=readValue("/home/gt/parellel/sort/data.in",values);  
  68.         //initial root value of each process   
  69.         serial(values,size);  
  70.         //broadcast the max length of each process   
  71.         MPI_Bcast(&length,1,MPI_INT,ROOT,MPI_COMM_WORLD);  
  72.         //the boundary   
  73.         for(int i=1;i<size;++i)  
  74.         {  
  75.             MPI_Ssend(&values[i-1],1,MPI_INT,i,INIT,MPI_COMM_WORLD);  
  76.         }  
  77.         MPI_Recv(&temp,1,MPI_INT,self+1,DATA,MPI_COMM_WORLD,&status);  
  78.         //broadcast each value and do insert in each process   
  79.         for(int i=size-1;i<length;++i)  
  80.         {  
  81.             temp=values[i];  
  82.             MPI_Bcast(&temp,1,MPI_INT,ROOT,MPI_COMM_WORLD);  
  83.             printf("broadcast %d \n",values[i]);  
  84.             //MPI_Barrier(MPI_COMM_WORLD);   
  85.         }  
  86.           
  87.         //ends   
  88.         printf("ends \n");  
  89.         temp=END;  
  90.         MPI_Bcast(&temp,1,MPI_INT,ROOT,MPI_COMM_WORLD);  
  91.         //recieve from each process in order   
  92.         int pos=0;  
  93.         int len=0;  
  94.         for(int i=1;i<size;++i)  
  95.         {  
  96.             MPI_Recv(&len,1,MPI_INT,i,DATA,MPI_COMM_WORLD,&status);  
  97.             for(int j=0;j<len;++j)  
  98.             {  
  99.                 MPI_Recv(&temp,1,MPI_INT,i,DATA,MPI_COMM_WORLD,&status);  
  100.                 values[pos++]=temp;  
  101.             }  
  102.         }  
  103.         for(int i=0;i<length;++i)  
  104.         {  
  105.             printf("%d \n",values[i]);  
  106.         }  
  107.         free(values);  
  108.     }  
  109.     else  
  110.     {     
  111.         MPI_Request request;  
  112.         //recieve max length   
  113.         MPI_Bcast(&length,1,MPI_INT,ROOT,MPI_COMM_WORLD);  
  114.         //position   
  115.         int pos=0;  
  116.         //post irecv   
  117.         MPI_Irecv(&temp,1,MPI_INT,ROOT,INIT,MPI_COMM_WORLD,&request);  
  118.         //data on this process   
  119.         int* local=(int*) malloc(length*sizeof(int)+1);  
  120.         //wait for the recieve   
  121.         MPI_Wait(&request,&status);  
  122.         //exchange upper limit   
  123.         int upper=999999;//max of int   
  124.         if(self!=size-1)MPI_Irecv(&upper,1,MPI_INT,self+1,DATA,MPI_COMM_WORLD,&request);  
  125.         MPI_Ssend(&temp,1,MPI_INT,self-1,DATA,MPI_COMM_WORLD);  
  126.         if(self!=size-1)MPI_Wait(&request,&status);  
  127.         local[pos++]=temp;  
  128.         MPI_Bcast(&temp,1,MPI_INT,ROOT,MPI_COMM_WORLD);  
  129.         //MPI_Barrier(MPI_COMM_WORLD);   
  130.         while(temp!=END)  
  131.         {  
  132.             printf("%d recieve %d \n",self,temp);  
  133.             if(temp<=upper&&(self==1||local[0]<temp))  
  134.             {  
  135.                 printf("%d insert %d \n",self,temp);  
  136.                 local[pos++]=temp;  
  137.                 insert(local,pos-2,temp);  
  138.             }  
  139.             MPI_Bcast(&temp,1,MPI_INT,ROOT,MPI_COMM_WORLD);  
  140.             //MPI_Barrier(MPI_COMM_WORLD);   
  141.         }  
  142.         //ends   
  143.         //sends the data in order to root process   
  144.         MPI_Ssend(&pos,1,MPI_INT,ROOT,DATA,MPI_COMM_WORLD);  
  145.         for(int i=0;i<pos;++i)     
  146.         {  
  147.             MPI_Ssend(&local[i],1,MPI_INT,ROOT,DATA,MPI_COMM_WORLD);  
  148.             printf("% d send back %d \n",self,local[i]);  
  149.         }  
  150.         free(local);  
  151.     }  
  152.     MPI_Finalize();  
  153.     return 0;  
  154. }  

相关内容