Changeset 8f0a211 for GenericIO.cxx
- Timestamp:
- 04/12/16 19:18:46 (9 years ago)
- Branches:
- master, pympi
- Children:
- 14e73bb
- Parents:
- a4fee13
- git-author:
- Hal Finkel <hfinkel@…> (04/12/16 19:18:46)
- git-committer:
- Hal Finkel <hfinkel@…> (04/12/16 19:18:46)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
GenericIO.cxx
ra4fee13 r8f0a211 739 739 740 740 template <bool IsBigEndian> 741 void GenericIO::readHeaderLeader(void *GHPtr, bool MustMatch, int SplitNRanks, 742 string &LocalFileName, uint64_t &HeaderSize, vector<char> &Header) { 741 void GenericIO::readHeaderLeader(void *GHPtr, MismatchBehavior MB, int NRanks, 742 int Rank, int SplitNRanks, 743 string &LocalFileName, uint64_t &HeaderSize, 744 vector<char> &Header) { 743 745 GlobalHeader<IsBigEndian> &GH = *(GlobalHeader<IsBigEndian> *) GHPtr; 744 746 745 if (M ustMatch) {747 if (MB == MismatchDisallowed) { 746 748 if (SplitNRanks != (int) GH.NRanks) { 747 749 stringstream ss; … … 777 779 } 778 780 #endif 781 } else if (MB == MismatchRedistribute && !Redistributing) { 782 Redistributing = true; 783 784 int NFileRanks = RankMap.empty() ? (int) GH.NRanks : (int) RankMap.size(); 785 int NFileRanksPerRank = NFileRanks/NRanks; 786 int NRemFileRank = NFileRanks % NRanks; 787 788 if (!NFileRanksPerRank) { 789 // We have only the remainder, so the last NRemFileRank ranks get one 790 // file rank, and the others don't. 791 if (NRemFileRank && NRanks - Rank <= NRemFileRank) 792 SourceRanks.push_back(NRanks - (Rank + 1)); 793 } else { 794 // Since NRemFileRank < NRanks, and we don't want to put any extra memory 795 // load on rank 0 (because rank 0's memory load is normally higher than 796 // the other ranks anyway), the last NRemFileRank will each take 797 // (NFileRanksPerRank+1) file ranks. 798 799 int FirstFileRank = 0, LastFileRank = NFileRanksPerRank - 1; 800 for (int i = 1; i <= Rank; ++i) { 801 FirstFileRank = LastFileRank + 1; 802 LastFileRank = FirstFileRank + NFileRanksPerRank - 1; 803 804 if (NRemFileRank && NRanks - i <= NRemFileRank) 805 ++LastFileRank; 806 } 807 808 for (int i = FirstFileRank; i <= LastFileRank; ++i) 809 SourceRanks.push_back(i); 810 } 779 811 } 780 812 … … 791 823 // Note: Errors from this function should be recoverable. This means that if 792 824 // one rank throws an exception, then all ranks should. 793 void GenericIO::openAndReadHeader( bool MustMatch, int EffRank, bool CheckPartMap) {825 void GenericIO::openAndReadHeader(MismatchBehavior MB, int EffRank, bool CheckPartMap) { 794 826 int NRanks, Rank; 795 827 #ifndef GENERICIO_NO_MPI … … 802 834 803 835 if (EffRank == -1) 804 EffRank = Rank;836 EffRank = MB == MismatchRedistribute ? 0 : Rank; 805 837 806 838 if (RankMap.empty() && CheckPartMap) { … … 814 846 GenericIO GIO(FileName, FileIOType); 815 847 #endif 816 GIO.openAndReadHeader( true, 0, false);848 GIO.openAndReadHeader(MismatchDisallowed, 0, false); 817 849 RanksInMap = GIO.readNumElems(); 818 850 … … 846 878 LocalFileName = FileName; 847 879 #ifndef GENERICIO_NO_MPI 848 MPI_Comm_dup( Comm, &SplitComm);880 MPI_Comm_dup(MB == MismatchRedistribute ? MPI_COMM_SELF : Comm, &SplitComm); 849 881 #endif 850 882 } else { … … 853 885 LocalFileName = ss.str(); 854 886 #ifndef GENERICIO_NO_MPI 887 if (MB == MismatchRedistribute) { 888 MPI_Comm_dup(MPI_COMM_SELF, &SplitComm); 889 } else { 855 890 #ifdef __bgq__ 856 MPI_Barrier(Comm); 857 #endif 858 MPI_Comm_split(Comm, RankMap[EffRank], Rank, &SplitComm); 891 MPI_Barrier(Comm); 892 #endif 893 MPI_Comm_split(Comm, RankMap[EffRank], Rank, &SplitComm); 894 } 859 895 #endif 860 896 } … … 897 933 898 934 if (string(GH.Magic, GH.Magic + MagicSize - 1) == MagicLE) { 899 readHeaderLeader<false>(&GH, M ustMatch, SplitNRanks, LocalFileName,935 readHeaderLeader<false>(&GH, MB, NRanks, Rank, SplitNRanks, LocalFileName, 900 936 HeaderSize, Header); 901 937 } else if (string(GH.Magic, GH.Magic + MagicSize - 1) == MagicBE) { 902 readHeaderLeader<true>(&GH, M ustMatch, SplitNRanks, LocalFileName,938 readHeaderLeader<true>(&GH, MB, NRanks, Rank, SplitNRanks, LocalFileName, 903 939 HeaderSize, Header); 904 940 } else { … … 936 972 #endif 937 973 938 939 974 FH.getHeaderCache().clear(); 940 975 … … 946 981 947 982 #ifndef GENERICIO_NO_MPI 948 MPI_Barrier(Comm); 983 if (!DisableCollErrChecking) 984 MPI_Barrier(Comm); 949 985 950 986 if (FileIOType == FileIOMPI) … … 958 994 try { 959 995 FH.get()->open(LocalFileName, true); 960 MPI_Allreduce(&OpenErr, &TotOpenErr, 1, MPI_INT, MPI_SUM, Comm); 996 MPI_Allreduce(&OpenErr, &TotOpenErr, 1, MPI_INT, MPI_SUM, 997 DisableCollErrChecking ? MPI_COMM_SELF : Comm); 961 998 } catch (...) { 962 999 OpenErr = 1; 963 MPI_Allreduce(&OpenErr, &TotOpenErr, 1, MPI_INT, MPI_SUM, Comm); 1000 MPI_Allreduce(&OpenErr, &TotOpenErr, 1, MPI_INT, MPI_SUM, 1001 DisableCollErrChecking ? MPI_COMM_SELF : Comm); 964 1002 throw; 965 1003 } … … 1097 1135 } 1098 1136 1099 openAndReadHeader( false, EffRank, false);1137 openAndReadHeader(MismatchAllowed, EffRank, false); 1100 1138 1101 1139 assert(FH.getHeaderCache().size() && "HeaderCache must not be empty"); … … 1116 1154 1117 1155 size_t GenericIO::readNumElems(int EffRank) { 1156 if (EffRank == -1 && Redistributing) { 1157 DisableCollErrChecking = true; 1158 1159 size_t TotalSize = 0; 1160 for (int i = 0, ie = SourceRanks.size(); i != ie; ++i) 1161 TotalSize += readNumElems(SourceRanks[i]); 1162 1163 DisableCollErrChecking = false; 1164 return TotalSize; 1165 } 1166 1118 1167 if (FH.isBigEndian()) 1119 1168 return readNumElems<true>(EffRank); … … 1131 1180 } 1132 1181 1133 openAndReadHeader(false, EffRank, false); 1182 openAndReadHeader(Redistributing ? MismatchRedistribute : MismatchAllowed, 1183 EffRank, false); 1134 1184 1135 1185 assert(FH.getHeaderCache().size() && "HeaderCache must not be empty"); … … 1146 1196 1147 1197 void GenericIO::readCoords(int Coords[3], int EffRank) { 1198 if (EffRank == -1 && Redistributing) { 1199 std::fill(Coords, Coords + 3, 0); 1200 return; 1201 } 1202 1148 1203 if (FH.isBigEndian()) 1149 1204 readCoords<true>(Coords, EffRank); … … 1162 1217 } 1163 1218 1164 openAndReadHeader( false, EffRank, false);1219 openAndReadHeader(MismatchAllowed, EffRank, false); 1165 1220 1166 1221 assert(FH.getHeaderCache().size() && "HeaderCache must not be empty"); … … 1178 1233 1179 1234 void GenericIO::readData(int EffRank, bool PrintStats, bool CollStats) { 1235 int Rank; 1236 #ifndef GENERICIO_NO_MPI 1237 MPI_Comm_rank(Comm, &Rank); 1238 #else 1239 Rank = 0; 1240 #endif 1241 1242 uint64_t TotalReadSize = 0; 1243 #ifndef GENERICIO_NO_MPI 1244 double StartTime = MPI_Wtime(); 1245 #else 1246 double StartTime = double(clock())/CLOCKS_PER_SEC; 1247 #endif 1248 1249 int NErrs[3] = { 0, 0, 0 }; 1250 1251 if (EffRank == -1 && Redistributing) { 1252 DisableCollErrChecking = true; 1253 1254 size_t RowOffset = 0; 1255 for (int i = 0, ie = SourceRanks.size(); i != ie; ++i) { 1256 readData(SourceRanks[i], RowOffset, Rank, TotalReadSize, NErrs); 1257 RowOffset += readNumElems(SourceRanks[i]); 1258 } 1259 1260 DisableCollErrChecking = false; 1261 } else { 1262 readData(EffRank, 0, Rank, TotalReadSize, NErrs); 1263 } 1264 1265 int AllNErrs[3]; 1266 #ifndef GENERICIO_NO_MPI 1267 MPI_Allreduce(NErrs, AllNErrs, 3, MPI_INT, MPI_SUM, Comm); 1268 #else 1269 AllNErrs[0] = NErrs[0]; AllNErrs[1] = NErrs[1]; AllNErrs[2] = NErrs[2]; 1270 #endif 1271 1272 if (AllNErrs[0] > 0 || AllNErrs[1] > 0 || AllNErrs[2] > 0) { 1273 stringstream ss; 1274 ss << "Experienced " << AllNErrs[0] << " I/O error(s), " << 1275 AllNErrs[1] << " CRC error(s) and " << AllNErrs[2] << 1276 " decompression CRC error(s) reading: " << OpenFileName; 1277 throw runtime_error(ss.str()); 1278 } 1279 1280 #ifndef GENERICIO_NO_MPI 1281 MPI_Barrier(Comm); 1282 #endif 1283 1284 #ifndef GENERICIO_NO_MPI 1285 double EndTime = MPI_Wtime(); 1286 #else 1287 double EndTime = double(clock())/CLOCKS_PER_SEC; 1288 #endif 1289 1290 double TotalTime = EndTime - StartTime; 1291 double MaxTotalTime; 1292 #ifndef GENERICIO_NO_MPI 1293 if (CollStats) 1294 MPI_Reduce(&TotalTime, &MaxTotalTime, 1, MPI_DOUBLE, MPI_MAX, 0, Comm); 1295 else 1296 #endif 1297 MaxTotalTime = TotalTime; 1298 1299 uint64_t AllTotalReadSize; 1300 #ifndef GENERICIO_NO_MPI 1301 if (CollStats) 1302 MPI_Reduce(&TotalReadSize, &AllTotalReadSize, 1, MPI_UINT64_T, MPI_SUM, 0, Comm); 1303 else 1304 #endif 1305 AllTotalReadSize = TotalReadSize; 1306 1307 if (Rank == 0 && PrintStats) { 1308 double Rate = ((double) AllTotalReadSize) / MaxTotalTime / (1024.*1024.); 1309 cout << "Read " << Vars.size() << " variables from " << FileName << 1310 " (" << AllTotalReadSize << " bytes) in " << MaxTotalTime << "s: " << 1311 Rate << " MB/s [excluding header read]" << endl; 1312 } 1313 } 1314 1315 void GenericIO::readData(int EffRank, size_t RowOffset, int Rank, 1316 uint64_t &TotalReadSize, int NErrs[3]) { 1180 1317 if (FH.isBigEndian()) 1181 readData<true>(EffRank, PrintStats, CollStats);1318 readData<true>(EffRank, RowOffset, Rank, TotalReadSize, NErrs); 1182 1319 else 1183 readData<false>(EffRank, PrintStats, CollStats);1320 readData<false>(EffRank, RowOffset, Rank, TotalReadSize, NErrs); 1184 1321 } 1185 1322 … … 1187 1324 // one rank throws an exception, then all ranks should. 1188 1325 template <bool IsBigEndian> 1189 void GenericIO::readData(int EffRank, bool PrintStats, bool CollStats) { 1190 int Rank; 1191 #ifndef GENERICIO_NO_MPI 1192 MPI_Comm_rank(Comm, &Rank); 1193 #else 1194 Rank = 0; 1195 #endif 1196 1197 openAndReadHeader(false, EffRank, false); 1326 void GenericIO::readData(int EffRank, size_t RowOffset, int Rank, 1327 uint64_t &TotalReadSize, int NErrs[3]) { 1328 openAndReadHeader(Redistributing ? MismatchRedistribute : MismatchAllowed, 1329 EffRank, false); 1198 1330 1199 1331 assert(FH.getHeaderCache().size() && "HeaderCache must not be empty"); … … 1210 1342 RankIndex*GH->RanksSize]; 1211 1343 1212 uint64_t TotalReadSize = 0;1213 #ifndef GENERICIO_NO_MPI1214 double StartTime = MPI_Wtime();1215 #else1216 double StartTime = double(clock())/CLOCKS_PER_SEC;1217 #endif1218 1219 int NErrs[3] = { 0, 0, 0 };1220 1344 for (size_t i = 0; i < Vars.size(); ++i) { 1221 1345 uint64_t Offset = RH->Start; … … 1263 1387 } 1264 1388 1389 size_t VarOffset = RowOffset*Vars[i].Size; 1390 void *VarData = ((char *) Vars[i].Data) + VarOffset; 1391 1265 1392 vector<unsigned char> LData; 1266 void *Data = Var s[i].Data;1393 void *Data = VarData; 1267 1394 bool HasExtraSpace = Vars[i].HasExtraSpace; 1268 1395 if (offsetof_safe(GH, BlocksStart) < GH->GlobalHeaderSize && … … 1403 1530 1404 1531 blosc_decompress(&LData[0] + sizeof(CompressHeader<IsBigEndian>), 1405 Var s[i].Data, Vars[i].Size*RH->NElems);1406 1407 if (CH->OrigCRC != crc64_omp(Var s[i].Data, Vars[i].Size*RH->NElems)) {1532 VarData, Vars[i].Size*RH->NElems); 1533 1534 if (CH->OrigCRC != crc64_omp(VarData, Vars[i].Size*RH->NElems)) { 1408 1535 ++NErrs[2]; 1409 1536 break; … … 1414 1541 if (IsBigEndian != isBigEndian()) 1415 1542 for (size_t j = 0; j < RH->NElems; ++j) { 1416 char *Offset = ((char *) Var s[i].Data) + j*Vars[i].Size;1543 char *Offset = ((char *) VarData) + j*Vars[i].Size; 1417 1544 bswap(Offset, Vars[i].Size); 1418 1545 } … … 1451 1578 break; 1452 1579 } 1453 1454 int AllNErrs[3];1455 #ifndef GENERICIO_NO_MPI1456 MPI_Allreduce(NErrs, AllNErrs, 3, MPI_INT, MPI_SUM, Comm);1457 #else1458 AllNErrs[0] = NErrs[0]; AllNErrs[1] = NErrs[1]; AllNErrs[2] = NErrs[2];1459 #endif1460 1461 if (AllNErrs[0] > 0 || AllNErrs[1] > 0 || AllNErrs[2] > 0) {1462 stringstream ss;1463 ss << "Experienced " << AllNErrs[0] << " I/O error(s), " <<1464 AllNErrs[1] << " CRC error(s) and " << AllNErrs[2] <<1465 " decompression CRC error(s) reading: " << OpenFileName;1466 throw runtime_error(ss.str());1467 }1468 1469 #ifndef GENERICIO_NO_MPI1470 MPI_Barrier(Comm);1471 #endif1472 1473 #ifndef GENERICIO_NO_MPI1474 double EndTime = MPI_Wtime();1475 #else1476 double EndTime = double(clock())/CLOCKS_PER_SEC;1477 #endif1478 1479 double TotalTime = EndTime - StartTime;1480 double MaxTotalTime;1481 #ifndef GENERICIO_NO_MPI1482 if (CollStats)1483 MPI_Reduce(&TotalTime, &MaxTotalTime, 1, MPI_DOUBLE, MPI_MAX, 0, Comm);1484 else1485 #endif1486 MaxTotalTime = TotalTime;1487 1488 uint64_t AllTotalReadSize;1489 #ifndef GENERICIO_NO_MPI1490 if (CollStats)1491 MPI_Reduce(&TotalReadSize, &AllTotalReadSize, 1, MPI_UINT64_T, MPI_SUM, 0, Comm);1492 else1493 #endif1494 AllTotalReadSize = TotalReadSize;1495 1496 if (Rank == 0 && PrintStats) {1497 double Rate = ((double) AllTotalReadSize) / MaxTotalTime / (1024.*1024.);1498 cout << "Read " << Vars.size() << " variables from " << FileName <<1499 " (" << AllTotalReadSize << " bytes) in " << MaxTotalTime << "s: " <<1500 Rate << " MB/s [excluding header read]" << endl;1501 }1502 1503 1580 } 1504 1581
Note: See TracChangeset
for help on using the changeset viewer.