Changeset 8f0a211 for GenericIO.cxx


Ignore:
Timestamp:
04/12/16 19:18:46 (9 years ago)
Author:
Hal Finkel <hfinkel@…>
Branches:
master, pympi
Children:
14e73bb
Parents:
a4fee13
git-author:
Hal Finkel <hfinkel@…> (04/12/16 19:18:46)
git-committer:
Hal Finkel <hfinkel@…> (04/12/16 19:18:46)
Message:

Add redistribution support

File:
1 edited

Legend:

Unmodified
Added
Removed
  • GenericIO.cxx

    ra4fee13 r8f0a211  
    739739 
    740740template <bool IsBigEndian> 
    741 void GenericIO::readHeaderLeader(void *GHPtr, bool MustMatch, int SplitNRanks, 
    742                                  string &LocalFileName, uint64_t &HeaderSize, vector<char> &Header) { 
     741void GenericIO::readHeaderLeader(void *GHPtr, MismatchBehavior MB, int NRanks, 
     742                                 int Rank, int SplitNRanks, 
     743                                 string &LocalFileName, uint64_t &HeaderSize, 
     744                                 vector<char> &Header) { 
    743745  GlobalHeader<IsBigEndian> &GH = *(GlobalHeader<IsBigEndian> *) GHPtr; 
    744746 
    745   if (MustMatch) { 
     747  if (MB == MismatchDisallowed) { 
    746748    if (SplitNRanks != (int) GH.NRanks) { 
    747749      stringstream ss; 
     
    777779    } 
    778780#endif 
     781  } else if (MB == MismatchRedistribute && !Redistributing) { 
     782    Redistributing = true; 
     783 
     784    int NFileRanks = RankMap.empty() ? (int) GH.NRanks : (int) RankMap.size(); 
     785    int NFileRanksPerRank = NFileRanks/NRanks; 
     786    int NRemFileRank = NFileRanks % NRanks; 
     787 
     788    if (!NFileRanksPerRank) { 
     789      // We have only the remainder, so the last NRemFileRank ranks get one 
     790      // file rank, and the others don't. 
     791      if (NRemFileRank && NRanks - Rank <= NRemFileRank) 
     792        SourceRanks.push_back(NRanks - (Rank + 1)); 
     793    } else { 
     794      // Since NRemFileRank < NRanks, and we don't want to put any extra memory 
     795      // load on rank 0 (because rank 0's memory load is normally higher than 
     796      // the other ranks anyway), the last NRemFileRank will each take 
     797      // (NFileRanksPerRank+1) file ranks. 
     798 
     799      int FirstFileRank = 0, LastFileRank = NFileRanksPerRank - 1; 
     800      for (int i = 1; i <= Rank; ++i) { 
     801        FirstFileRank = LastFileRank + 1; 
     802        LastFileRank  = FirstFileRank + NFileRanksPerRank - 1; 
     803 
     804        if (NRemFileRank && NRanks - i <= NRemFileRank) 
     805          ++LastFileRank; 
     806      } 
     807 
     808      for (int i = FirstFileRank; i <= LastFileRank; ++i) 
     809        SourceRanks.push_back(i); 
     810    } 
    779811  } 
    780812 
     
    791823// Note: Errors from this function should be recoverable. This means that if 
    792824// one rank throws an exception, then all ranks should. 
    793 void GenericIO::openAndReadHeader(bool MustMatch, int EffRank, bool CheckPartMap) { 
     825void GenericIO::openAndReadHeader(MismatchBehavior MB, int EffRank, bool CheckPartMap) { 
    794826  int NRanks, Rank; 
    795827#ifndef GENERICIO_NO_MPI 
     
    802834 
    803835  if (EffRank == -1) 
    804     EffRank = Rank; 
     836    EffRank = MB == MismatchRedistribute ? 0 : Rank; 
    805837 
    806838  if (RankMap.empty() && CheckPartMap) { 
     
    814846        GenericIO GIO(FileName, FileIOType); 
    815847#endif 
    816         GIO.openAndReadHeader(true, 0, false); 
     848        GIO.openAndReadHeader(MismatchDisallowed, 0, false); 
    817849        RanksInMap = GIO.readNumElems(); 
    818850 
     
    846878    LocalFileName = FileName; 
    847879#ifndef GENERICIO_NO_MPI 
    848     MPI_Comm_dup(Comm, &SplitComm); 
     880    MPI_Comm_dup(MB == MismatchRedistribute ? MPI_COMM_SELF : Comm, &SplitComm); 
    849881#endif 
    850882  } else { 
     
    853885    LocalFileName = ss.str(); 
    854886#ifndef GENERICIO_NO_MPI 
     887    if (MB == MismatchRedistribute) { 
     888      MPI_Comm_dup(MPI_COMM_SELF, &SplitComm); 
     889    } else { 
    855890#ifdef __bgq__ 
    856     MPI_Barrier(Comm); 
    857 #endif 
    858     MPI_Comm_split(Comm, RankMap[EffRank], Rank, &SplitComm); 
     891      MPI_Barrier(Comm); 
     892#endif 
     893      MPI_Comm_split(Comm, RankMap[EffRank], Rank, &SplitComm); 
     894    } 
    859895#endif 
    860896  } 
     
    897933 
    898934      if (string(GH.Magic, GH.Magic + MagicSize - 1) == MagicLE) { 
    899         readHeaderLeader<false>(&GH, MustMatch, SplitNRanks, LocalFileName, 
     935        readHeaderLeader<false>(&GH, MB, NRanks, Rank, SplitNRanks, LocalFileName, 
    900936                                HeaderSize, Header); 
    901937      } else if (string(GH.Magic, GH.Magic + MagicSize - 1) == MagicBE) { 
    902         readHeaderLeader<true>(&GH, MustMatch, SplitNRanks, LocalFileName, 
     938        readHeaderLeader<true>(&GH, MB, NRanks, Rank, SplitNRanks, LocalFileName, 
    903939                               HeaderSize, Header); 
    904940      } else { 
     
    936972#endif 
    937973 
    938  
    939974  FH.getHeaderCache().clear(); 
    940975 
     
    946981 
    947982#ifndef GENERICIO_NO_MPI 
    948   MPI_Barrier(Comm); 
     983  if (!DisableCollErrChecking) 
     984    MPI_Barrier(Comm); 
    949985 
    950986  if (FileIOType == FileIOMPI) 
     
    958994  try { 
    959995    FH.get()->open(LocalFileName, true); 
    960     MPI_Allreduce(&OpenErr, &TotOpenErr, 1, MPI_INT, MPI_SUM, Comm); 
     996    MPI_Allreduce(&OpenErr, &TotOpenErr, 1, MPI_INT, MPI_SUM, 
     997                  DisableCollErrChecking ? MPI_COMM_SELF : Comm); 
    961998  } catch (...) { 
    962999    OpenErr = 1; 
    963     MPI_Allreduce(&OpenErr, &TotOpenErr, 1, MPI_INT, MPI_SUM, Comm); 
     1000    MPI_Allreduce(&OpenErr, &TotOpenErr, 1, MPI_INT, MPI_SUM, 
     1001                  DisableCollErrChecking ? MPI_COMM_SELF : Comm); 
    9641002    throw; 
    9651003  } 
     
    10971135  } 
    10981136 
    1099   openAndReadHeader(false, EffRank, false); 
     1137  openAndReadHeader(MismatchAllowed, EffRank, false); 
    11001138 
    11011139  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty"); 
     
    11161154 
    11171155size_t GenericIO::readNumElems(int EffRank) { 
     1156  if (EffRank == -1 && Redistributing) { 
     1157    DisableCollErrChecking = true; 
     1158 
     1159    size_t TotalSize = 0; 
     1160    for (int i = 0, ie = SourceRanks.size(); i != ie; ++i) 
     1161      TotalSize += readNumElems(SourceRanks[i]); 
     1162 
     1163    DisableCollErrChecking = false; 
     1164    return TotalSize; 
     1165  } 
     1166 
    11181167  if (FH.isBigEndian()) 
    11191168    return readNumElems<true>(EffRank); 
     
    11311180  } 
    11321181 
    1133   openAndReadHeader(false, EffRank, false); 
     1182  openAndReadHeader(Redistributing ? MismatchRedistribute : MismatchAllowed, 
     1183                    EffRank, false); 
    11341184 
    11351185  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty"); 
     
    11461196 
    11471197void GenericIO::readCoords(int Coords[3], int EffRank) { 
     1198  if (EffRank == -1 && Redistributing) { 
     1199    std::fill(Coords, Coords + 3, 0); 
     1200    return; 
     1201  } 
     1202 
    11481203  if (FH.isBigEndian()) 
    11491204    readCoords<true>(Coords, EffRank); 
     
    11621217  } 
    11631218 
    1164   openAndReadHeader(false, EffRank, false); 
     1219  openAndReadHeader(MismatchAllowed, EffRank, false); 
    11651220 
    11661221  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty"); 
     
    11781233 
    11791234void GenericIO::readData(int EffRank, bool PrintStats, bool CollStats) { 
     1235  int Rank; 
     1236#ifndef GENERICIO_NO_MPI 
     1237  MPI_Comm_rank(Comm, &Rank); 
     1238#else 
     1239  Rank = 0; 
     1240#endif 
     1241 
     1242  uint64_t TotalReadSize = 0; 
     1243#ifndef GENERICIO_NO_MPI 
     1244  double StartTime = MPI_Wtime(); 
     1245#else 
     1246  double StartTime = double(clock())/CLOCKS_PER_SEC; 
     1247#endif 
     1248 
     1249  int NErrs[3] = { 0, 0, 0 }; 
     1250 
     1251  if (EffRank == -1 && Redistributing) { 
     1252    DisableCollErrChecking = true; 
     1253 
     1254    size_t RowOffset = 0; 
     1255    for (int i = 0, ie = SourceRanks.size(); i != ie; ++i) { 
     1256      readData(SourceRanks[i], RowOffset, Rank, TotalReadSize, NErrs); 
     1257      RowOffset += readNumElems(SourceRanks[i]); 
     1258    } 
     1259 
     1260    DisableCollErrChecking = false; 
     1261  } else { 
     1262    readData(EffRank, 0, Rank, TotalReadSize, NErrs); 
     1263  } 
     1264 
     1265  int AllNErrs[3]; 
     1266#ifndef GENERICIO_NO_MPI 
     1267  MPI_Allreduce(NErrs, AllNErrs, 3, MPI_INT, MPI_SUM, Comm); 
     1268#else 
     1269  AllNErrs[0] = NErrs[0]; AllNErrs[1] = NErrs[1]; AllNErrs[2] = NErrs[2]; 
     1270#endif 
     1271 
     1272  if (AllNErrs[0] > 0 || AllNErrs[1] > 0 || AllNErrs[2] > 0) { 
     1273    stringstream ss; 
     1274    ss << "Experienced " << AllNErrs[0] << " I/O error(s), " << 
     1275          AllNErrs[1] << " CRC error(s) and " << AllNErrs[2] << 
     1276          " decompression CRC error(s) reading: " << OpenFileName; 
     1277    throw runtime_error(ss.str()); 
     1278  } 
     1279 
     1280#ifndef GENERICIO_NO_MPI 
     1281  MPI_Barrier(Comm); 
     1282#endif 
     1283 
     1284#ifndef GENERICIO_NO_MPI 
     1285  double EndTime = MPI_Wtime(); 
     1286#else 
     1287  double EndTime = double(clock())/CLOCKS_PER_SEC; 
     1288#endif 
     1289 
     1290  double TotalTime = EndTime - StartTime; 
     1291  double MaxTotalTime; 
     1292#ifndef GENERICIO_NO_MPI 
     1293  if (CollStats) 
     1294    MPI_Reduce(&TotalTime, &MaxTotalTime, 1, MPI_DOUBLE, MPI_MAX, 0, Comm); 
     1295  else 
     1296#endif 
     1297  MaxTotalTime = TotalTime; 
     1298 
     1299  uint64_t AllTotalReadSize; 
     1300#ifndef GENERICIO_NO_MPI 
     1301  if (CollStats) 
     1302    MPI_Reduce(&TotalReadSize, &AllTotalReadSize, 1, MPI_UINT64_T, MPI_SUM, 0, Comm); 
     1303  else 
     1304#endif 
     1305  AllTotalReadSize = TotalReadSize; 
     1306 
     1307  if (Rank == 0 && PrintStats) { 
     1308    double Rate = ((double) AllTotalReadSize) / MaxTotalTime / (1024.*1024.); 
     1309    cout << "Read " << Vars.size() << " variables from " << FileName << 
     1310            " (" << AllTotalReadSize << " bytes) in " << MaxTotalTime << "s: " << 
     1311            Rate << " MB/s [excluding header read]" << endl; 
     1312  } 
     1313} 
     1314 
     1315void GenericIO::readData(int EffRank, size_t RowOffset, int Rank, 
     1316                         uint64_t &TotalReadSize, int NErrs[3]) { 
    11801317  if (FH.isBigEndian()) 
    1181     readData<true>(EffRank, PrintStats, CollStats); 
     1318    readData<true>(EffRank, RowOffset, Rank, TotalReadSize, NErrs); 
    11821319  else 
    1183     readData<false>(EffRank, PrintStats, CollStats); 
     1320    readData<false>(EffRank, RowOffset, Rank, TotalReadSize, NErrs); 
    11841321} 
    11851322 
     
    11871324// one rank throws an exception, then all ranks should. 
    11881325template <bool IsBigEndian> 
    1189 void GenericIO::readData(int EffRank, bool PrintStats, bool CollStats) { 
    1190   int Rank; 
    1191 #ifndef GENERICIO_NO_MPI 
    1192   MPI_Comm_rank(Comm, &Rank); 
    1193 #else 
    1194   Rank = 0; 
    1195 #endif 
    1196  
    1197   openAndReadHeader(false, EffRank, false); 
     1326void GenericIO::readData(int EffRank, size_t RowOffset, int Rank, 
     1327                         uint64_t &TotalReadSize, int NErrs[3]) { 
     1328  openAndReadHeader(Redistributing ? MismatchRedistribute : MismatchAllowed, 
     1329                    EffRank, false); 
    11981330 
    11991331  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty"); 
     
    12101342                                               RankIndex*GH->RanksSize]; 
    12111343 
    1212   uint64_t TotalReadSize = 0; 
    1213 #ifndef GENERICIO_NO_MPI 
    1214   double StartTime = MPI_Wtime(); 
    1215 #else 
    1216   double StartTime = double(clock())/CLOCKS_PER_SEC; 
    1217 #endif 
    1218  
    1219   int NErrs[3] = { 0, 0, 0 }; 
    12201344  for (size_t i = 0; i < Vars.size(); ++i) { 
    12211345    uint64_t Offset = RH->Start; 
     
    12631387      } 
    12641388 
     1389      size_t VarOffset = RowOffset*Vars[i].Size; 
     1390      void *VarData = ((char *) Vars[i].Data) + VarOffset; 
     1391 
    12651392      vector<unsigned char> LData; 
    1266       void *Data = Vars[i].Data; 
     1393      void *Data = VarData; 
    12671394      bool HasExtraSpace = Vars[i].HasExtraSpace; 
    12681395      if (offsetof_safe(GH, BlocksStart) < GH->GlobalHeaderSize && 
     
    14031530 
    14041531        blosc_decompress(&LData[0] + sizeof(CompressHeader<IsBigEndian>), 
    1405                          Vars[i].Data, Vars[i].Size*RH->NElems); 
    1406  
    1407         if (CH->OrigCRC != crc64_omp(Vars[i].Data, Vars[i].Size*RH->NElems)) { 
     1532                         VarData, Vars[i].Size*RH->NElems); 
     1533 
     1534        if (CH->OrigCRC != crc64_omp(VarData, Vars[i].Size*RH->NElems)) { 
    14081535          ++NErrs[2]; 
    14091536          break; 
     
    14141541      if (IsBigEndian != isBigEndian()) 
    14151542        for (size_t j = 0; j < RH->NElems; ++j) { 
    1416           char *Offset = ((char *) Vars[i].Data) + j*Vars[i].Size; 
     1543          char *Offset = ((char *) VarData) + j*Vars[i].Size; 
    14171544          bswap(Offset, Vars[i].Size); 
    14181545        } 
     
    14511578      break; 
    14521579  } 
    1453  
    1454   int AllNErrs[3]; 
    1455 #ifndef GENERICIO_NO_MPI 
    1456   MPI_Allreduce(NErrs, AllNErrs, 3, MPI_INT, MPI_SUM, Comm); 
    1457 #else 
    1458   AllNErrs[0] = NErrs[0]; AllNErrs[1] = NErrs[1]; AllNErrs[2] = NErrs[2]; 
    1459 #endif 
    1460  
    1461   if (AllNErrs[0] > 0 || AllNErrs[1] > 0 || AllNErrs[2] > 0) { 
    1462     stringstream ss; 
    1463     ss << "Experienced " << AllNErrs[0] << " I/O error(s), " << 
    1464           AllNErrs[1] << " CRC error(s) and " << AllNErrs[2] << 
    1465           " decompression CRC error(s) reading: " << OpenFileName; 
    1466     throw runtime_error(ss.str()); 
    1467   } 
    1468  
    1469 #ifndef GENERICIO_NO_MPI 
    1470   MPI_Barrier(Comm); 
    1471 #endif 
    1472  
    1473 #ifndef GENERICIO_NO_MPI 
    1474   double EndTime = MPI_Wtime(); 
    1475 #else 
    1476   double EndTime = double(clock())/CLOCKS_PER_SEC; 
    1477 #endif 
    1478  
    1479   double TotalTime = EndTime - StartTime; 
    1480   double MaxTotalTime; 
    1481 #ifndef GENERICIO_NO_MPI 
    1482   if (CollStats) 
    1483     MPI_Reduce(&TotalTime, &MaxTotalTime, 1, MPI_DOUBLE, MPI_MAX, 0, Comm); 
    1484   else 
    1485 #endif 
    1486   MaxTotalTime = TotalTime; 
    1487  
    1488   uint64_t AllTotalReadSize; 
    1489 #ifndef GENERICIO_NO_MPI 
    1490   if (CollStats) 
    1491     MPI_Reduce(&TotalReadSize, &AllTotalReadSize, 1, MPI_UINT64_T, MPI_SUM, 0, Comm); 
    1492   else 
    1493 #endif 
    1494   AllTotalReadSize = TotalReadSize; 
    1495  
    1496   if (Rank == 0 && PrintStats) { 
    1497     double Rate = ((double) AllTotalReadSize) / MaxTotalTime / (1024.*1024.); 
    1498     cout << "Read " << Vars.size() << " variables from " << FileName << 
    1499             " (" << AllTotalReadSize << " bytes) in " << MaxTotalTime << "s: " << 
    1500             Rate << " MB/s [excluding header read]" << endl; 
    1501   } 
    1502  
    15031580} 
    15041581 
Note: See TracChangeset for help on using the changeset viewer.