source: GenericIO.cxx @ 00587dc

Revision 00587dc, 41.5 KB checked in by Hal Finkel <hfinkel@…>, 10 years ago (diff)

Initial Commit (gio-base-20150317)

  • Property mode set to 100644
Line 
1#define _XOPEN_SOURCE 600
2#include "CRC64.h"
3#include "GenericIO.h"
4
5extern "C" {
6#include "blosc.h"
7}
8
9#include <sstream>
10#include <fstream>
11#include <stdexcept>
12#include <algorithm>
13#include <cassert>
14#include <cstddef>
15#include <cstring>
16
17#ifndef GENERICIO_NO_MPI
18#include <ctime>
19#endif
20
21#include <sys/types.h>
22#include <sys/stat.h>
23#include <fcntl.h>
24#include <errno.h>
25
26#ifdef __bgq__
27#include <spi/include/kernel/location.h>
28#include <spi/include/kernel/process.h>
29#include <firmware/include/personality.h>
30#endif
31
32#ifndef MPI_UINT64_T
33#define MPI_UINT64_T (sizeof(long) == 8 ? MPI_LONG : MPI_LONG_LONG)
34#endif
35
36using namespace std;
37
38namespace gio {
39
40
41#ifndef GENERICIO_NO_MPI
42GenericFileIO_MPI::~GenericFileIO_MPI() {
43  (void) MPI_File_close(&FH);
44}
45
46void GenericFileIO_MPI::open(const std::string &FN, bool ForReading) {
47  FileName = FN;
48
49  int amode = ForReading ? MPI_MODE_RDONLY : (MPI_MODE_WRONLY | MPI_MODE_CREATE);
50  if (MPI_File_open(Comm, const_cast<char *>(FileName.c_str()), amode,
51                    MPI_INFO_NULL, &FH) != MPI_SUCCESS)
52    throw runtime_error((!ForReading ? "Unable to create the file: " :
53                                       "Unable to open the file: ") +
54                        FileName);
55}
56
57void GenericFileIO_MPI::setSize(size_t sz) {
58  if (MPI_File_set_size(FH, sz) != MPI_SUCCESS)
59    throw runtime_error("Unable to set size for file: " + FileName);
60}
61
62void GenericFileIO_MPI::read(void *buf, size_t count, off_t offset,
63                             const std::string &D) {
64  while (count > 0) {
65    MPI_Status status;
66    if (MPI_File_read_at(FH, offset, buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
67      throw runtime_error("Unable to read " + D + " from file: " + FileName);
68
69    int scount;
70    (void) MPI_Get_count(&status, MPI_BYTE, &scount);
71
72    count -= scount;
73    buf = ((char *) buf) + scount;
74    offset += scount;
75  }
76}
77
78void GenericFileIO_MPI::write(const void *buf, size_t count, off_t offset,
79                              const std::string &D) {
80  while (count > 0) {
81    MPI_Status status;
82    if (MPI_File_write_at(FH, offset, (void *) buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
83      throw runtime_error("Unable to write " + D + " to file: " + FileName);
84
85    int scount;
86    (void) MPI_Get_count(&status, MPI_BYTE, &scount);
87
88    count -= scount;
89    buf = ((char *) buf) + scount;
90    offset += scount;
91  }
92}
93
94void GenericFileIO_MPICollective::read(void *buf, size_t count, off_t offset,
95                             const std::string &D) {
96  int Continue = 0;
97
98  do {
99    MPI_Status status;
100    if (MPI_File_read_at_all(FH, offset, buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
101      throw runtime_error("Unable to read " + D + " from file: " + FileName);
102
103    int scount;
104    (void) MPI_Get_count(&status, MPI_BYTE, &scount);
105
106    count -= scount;
107    buf = ((char *) buf) + scount;
108    offset += scount;
109
110    int NeedContinue = (count > 0);
111    MPI_Allreduce(&NeedContinue, &Continue, 1, MPI_INT, MPI_SUM, Comm);
112  } while (Continue);
113}
114
115void GenericFileIO_MPICollective::write(const void *buf, size_t count, off_t offset,
116                              const std::string &D) {
117  int Continue = 0;
118
119  do {
120    MPI_Status status;
121    if (MPI_File_write_at_all(FH, offset, (void *) buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
122      throw runtime_error("Unable to write " + D + " to file: " + FileName);
123
124    int scount;
125    (void) MPI_Get_count(&status, MPI_BYTE, &scount);
126
127    count -= scount;
128    buf = ((char *) buf) + scount;
129    offset += scount;
130
131    int NeedContinue = (count > 0);
132    MPI_Allreduce(&NeedContinue, &Continue, 1, MPI_INT, MPI_SUM, Comm);
133  } while (Continue);
134}
135#endif
136
137GenericFileIO_POSIX::~GenericFileIO_POSIX() {
138  if (FH != -1) close(FH);
139}
140
141void GenericFileIO_POSIX::open(const std::string &FN, bool ForReading) {
142  FileName = FN;
143
144  int flags = ForReading ? O_RDONLY : (O_WRONLY | O_CREAT);
145  int mode = S_IRUSR | S_IWUSR | S_IRGRP;
146  errno = 0;
147  if ((FH = ::open(FileName.c_str(), flags, mode)) == -1)
148    throw runtime_error((!ForReading ? "Unable to create the file: " :
149                                       "Unable to open the file: ") +
150                        FileName + ": " + strerror(errno));
151}
152
153void GenericFileIO_POSIX::setSize(size_t sz) {
154  if (ftruncate(FH, sz) == -1)
155    throw runtime_error("Unable to set size for file: " + FileName);
156}
157
158void GenericFileIO_POSIX::read(void *buf, size_t count, off_t offset,
159                               const std::string &D) {
160  while (count > 0) {
161    ssize_t scount;
162    errno = 0;
163    if ((scount = pread(FH, buf, count, offset)) == -1) {
164      if (errno == EINTR)
165        continue;
166
167      throw runtime_error("Unable to read " + D + " from file: " + FileName);
168    }
169
170    count -= scount;
171    buf = ((char *) buf) + scount;
172    offset += scount;
173  }
174}
175
176void GenericFileIO_POSIX::write(const void *buf, size_t count, off_t offset,
177                                const std::string &D) {
178  while (count > 0) {
179    ssize_t scount;
180    errno = 0;
181    if ((scount = pwrite(FH, buf, count, offset)) == -1) {
182      if (errno == EINTR)
183        continue;
184
185      throw runtime_error("Unable to write " + D + " to file: " + FileName);
186    }
187
188    count -= scount;
189    buf = ((char *) buf) + scount;
190    offset += scount;
191  }
192}
193
194static bool isBigEndian() {
195  const uint32_t one = 1;
196  return !(*((char *)(&one)));
197}
198
199static const size_t CRCSize = 8;
200
201static const size_t MagicSize = 8;
202static const char *MagicBE = "HACC01B";
203static const char *MagicLE = "HACC01L";
204
205struct GlobalHeader {
206  char Magic[MagicSize];
207  uint64_t HeaderSize;
208  uint64_t NElems; // The global total
209  uint64_t Dims[3];
210  uint64_t NVars;
211  uint64_t VarsSize;
212  uint64_t VarsStart;
213  uint64_t NRanks;
214  uint64_t RanksSize;
215  uint64_t RanksStart;
216  uint64_t GlobalHeaderSize;
217  double   PhysOrigin[3];
218  double   PhysScale[3];
219  uint64_t BlocksSize;
220  uint64_t BlocksStart;
221} __attribute__((packed));
222
223enum {
224  FloatValue          = (1 << 0),
225  SignedValue         = (1 << 1),
226  ValueIsPhysCoordX   = (1 << 2),
227  ValueIsPhysCoordY   = (1 << 3),
228  ValueIsPhysCoordZ   = (1 << 4),
229  ValueMaybePhysGhost = (1 << 5)
230};
231
232static const size_t NameSize = 256;
233struct VariableHeader {
234  char Name[NameSize];
235  uint64_t Flags;
236  uint64_t Size;
237} __attribute__((packed));
238
239struct RankHeader {
240  uint64_t Coords[3];
241  uint64_t NElems;
242  uint64_t Start;
243  uint64_t GlobalRank;
244} __attribute__((packed));
245
246static const size_t FilterNameSize = 8;
247static const size_t MaxFilters = 4;
248struct BlockHeader {
249  char Filters[MaxFilters][FilterNameSize];
250  uint64_t Start;
251  uint64_t Size;
252} __attribute__((packed));
253
254struct CompressHeader {
255  uint64_t OrigCRC;
256} __attribute__((packed));
257const char *CompressName = "BLOSC";
258
259unsigned GenericIO::DefaultFileIOType = FileIOPOSIX;
260int GenericIO::DefaultPartition = 0;
261bool GenericIO::DefaultShouldCompress = false;
262
263#ifndef GENERICIO_NO_MPI
264std::size_t GenericIO::CollectiveMPIIOThreshold = 0;
265#endif
266
267static bool blosc_initialized = false;
268
269#ifndef GENERICIO_NO_MPI
270// Note: writing errors are not currently recoverable (one rank may fail
271// while the others don't).
272void GenericIO::write() {
273  const char *Magic = isBigEndian() ? MagicBE : MagicLE;
274
275  uint64_t FileSize = 0;
276
277  int NRanks, Rank;
278  MPI_Comm_rank(Comm, &Rank);
279  MPI_Comm_size(Comm, &NRanks);
280
281#ifdef __bgq__
282  MPI_Barrier(Comm);
283#endif
284  MPI_Comm_split(Comm, Partition, Rank, &SplitComm);
285
286  int SplitNRanks, SplitRank;
287  MPI_Comm_rank(SplitComm, &SplitRank);
288  MPI_Comm_size(SplitComm, &SplitNRanks);
289
290  string LocalFileName;
291  if (SplitNRanks != NRanks) {
292    if (Rank == 0) {
293      // In split mode, the specified file becomes the rank map, and the real
294      // data is partitioned.
295
296      vector<int> MapRank, MapPartition;
297      MapRank.resize(NRanks);
298      for (int i = 0; i < NRanks; ++i) MapRank[i] = i;
299
300      MapPartition.resize(NRanks);
301      MPI_Gather(&Partition, 1, MPI_INT, &MapPartition[0], 1, MPI_INT, 0, Comm);
302
303      GenericIO GIO(MPI_COMM_SELF, FileName, FileIOType);
304      GIO.setNumElems(NRanks);
305      GIO.addVariable("$rank", MapRank); /* this is for use by humans; the reading
306                                            code assumes that the partitions are in
307                                            rank order */
308      GIO.addVariable("$partition", MapPartition);
309
310      vector<int> CX, CY, CZ;
311      int TopoStatus;
312      MPI_Topo_test(Comm, &TopoStatus);
313      if (TopoStatus == MPI_CART) {
314        CX.resize(NRanks);
315        CY.resize(NRanks);
316        CZ.resize(NRanks);
317
318        for (int i = 0; i < NRanks; ++i) {
319          int C[3];
320          MPI_Cart_coords(Comm, i, 3, C);
321
322          CX[i] = C[0];
323          CY[i] = C[1];
324          CZ[i] = C[2];
325        }
326
327        GIO.addVariable("$x", CX);
328        GIO.addVariable("$y", CY);
329        GIO.addVariable("$z", CZ);
330      }
331
332      GIO.write();
333    } else {
334      MPI_Gather(&Partition, 1, MPI_INT, 0, 0, MPI_INT, 0, Comm);
335    }
336
337    stringstream ss;
338    ss << FileName << "#" << Partition;
339    LocalFileName = ss.str();
340  } else {
341    LocalFileName = FileName;
342  }
343
344  RankHeader RHLocal;
345  int Dims[3], Periods[3], Coords[3];
346
347  int TopoStatus;
348  MPI_Topo_test(Comm, &TopoStatus);
349  if (TopoStatus == MPI_CART) {
350    MPI_Cart_get(Comm, 3, Dims, Periods, Coords);
351  } else {
352    Dims[0] = NRanks;
353    std::fill(Dims + 1, Dims + 3, 1);
354    std::fill(Periods, Periods + 3, 0);
355    Coords[0] = Rank;
356    std::fill(Coords + 1, Coords + 3, 0);
357  }
358
359  std::copy(Coords, Coords + 3, RHLocal.Coords);
360  RHLocal.NElems = NElems;
361  RHLocal.Start = 0;
362  RHLocal.GlobalRank = Rank;
363
364  bool ShouldCompress = DefaultShouldCompress;
365  const char *EnvStr = getenv("GENERICIO_COMPRESS");
366  if (EnvStr) {
367    int Mod = atoi(EnvStr);
368    ShouldCompress = (Mod > 0);
369  }
370
371  bool NeedsBlockHeaders = ShouldCompress;
372  EnvStr = getenv("GENERICIO_FORCE_BLOCKS");
373  if (!NeedsBlockHeaders && EnvStr) {
374    int Mod = atoi(EnvStr);
375    NeedsBlockHeaders = (Mod > 0);
376  }
377
378  vector<BlockHeader> LocalBlockHeaders;
379  vector<void *> LocalData;
380  vector<bool> LocalHasExtraSpace;
381  vector<vector<unsigned char> > LocalCData;
382  if (NeedsBlockHeaders) {
383    LocalBlockHeaders.resize(Vars.size());
384    LocalData.resize(Vars.size());
385    LocalHasExtraSpace.resize(Vars.size());
386    if (ShouldCompress)
387      LocalCData.resize(Vars.size());
388
389    for (size_t i = 0; i < Vars.size(); ++i) {
390      // Filters null by default, leave null starting address (needs to be
391      // calculated by the header-writing rank).
392      memset(&LocalBlockHeaders[i], 0, sizeof(BlockHeader));
393      if (ShouldCompress) {
394        LocalCData[i].resize(sizeof(CompressHeader));
395
396        CompressHeader *CH = (CompressHeader*) &LocalCData[i][0];
397        CH->OrigCRC = crc64_omp(Vars[i].Data, Vars[i].Size*NElems);
398
399#ifdef _OPENMP
400#pragma omp master
401  {
402#endif
403
404       if (!blosc_initialized) {
405         blosc_init();
406         blosc_initialized = true;
407       }
408
409#ifdef _OPENMP
410       blosc_set_nthreads(omp_get_max_threads());
411  }
412#endif
413
414        LocalCData[i].resize(LocalCData[i].size() + NElems*Vars[i].Size);
415        if (blosc_compress(9, 1, Vars[i].Size, NElems*Vars[i].Size, Vars[i].Data,
416                           &LocalCData[i][0] + sizeof(CompressHeader),
417                           NElems*Vars[i].Size) <= 0)
418          goto nocomp;
419
420        strncpy(LocalBlockHeaders[i].Filters[0], CompressName, FilterNameSize);
421        size_t CNBytes, CCBytes, CBlockSize;
422        blosc_cbuffer_sizes(&LocalCData[i][0] + sizeof(CompressHeader),
423                            &CNBytes, &CCBytes, &CBlockSize);
424        LocalCData[i].resize(CCBytes + sizeof(CompressHeader));
425
426        LocalBlockHeaders[i].Size = LocalCData[i].size();
427        LocalCData[i].resize(LocalCData[i].size() + CRCSize);
428        LocalData[i] = &LocalCData[i][0];
429        LocalHasExtraSpace[i] = true;
430      } else {
431nocomp:
432        LocalBlockHeaders[i].Size = NElems*Vars[i].Size;
433        LocalData[i] = Vars[i].Data;
434        LocalHasExtraSpace[i] = Vars[i].HasExtraSpace;
435      }
436    }
437  }
438
439  double StartTime = MPI_Wtime();
440
441  if (SplitRank == 0) {
442    uint64_t HeaderSize = sizeof(GlobalHeader) + Vars.size()*sizeof(VariableHeader) +
443                          SplitNRanks*sizeof(RankHeader) + CRCSize;
444    if (NeedsBlockHeaders)
445      HeaderSize += SplitNRanks*Vars.size()*sizeof(BlockHeader);
446
447    vector<char> Header(HeaderSize, 0);
448    GlobalHeader *GH = (GlobalHeader *) &Header[0];
449    std::copy(Magic, Magic + MagicSize, GH->Magic);
450    GH->HeaderSize = HeaderSize - CRCSize;
451    GH->NElems = NElems; // This will be updated later
452    std::copy(Dims, Dims + 3, GH->Dims);
453    GH->NVars = Vars.size();
454    GH->VarsSize = sizeof(VariableHeader);
455    GH->VarsStart = sizeof(GlobalHeader);
456    GH->NRanks = SplitNRanks;
457    GH->RanksSize = sizeof(RankHeader);
458    GH->RanksStart = GH->VarsStart + Vars.size()*sizeof(VariableHeader);
459    GH->GlobalHeaderSize = sizeof(GlobalHeader);
460    std::copy(PhysOrigin, PhysOrigin + 3, GH->PhysOrigin);
461    std::copy(PhysScale,  PhysScale  + 3, GH->PhysScale);
462    if (!NeedsBlockHeaders) {
463      GH->BlocksSize = GH->BlocksStart = 0;
464    } else {
465      GH->BlocksSize = sizeof(BlockHeader);
466      GH->BlocksStart = GH->RanksStart + SplitNRanks*sizeof(RankHeader);
467    }
468
469    uint64_t RecordSize = 0;
470    VariableHeader *VH = (VariableHeader *) &Header[GH->VarsStart];
471    for (size_t i = 0; i < Vars.size(); ++i, ++VH) {
472      string VName(Vars[i].Name);
473      VName.resize(NameSize);
474
475      std::copy(VName.begin(), VName.end(), VH->Name);
476      if (Vars[i].IsFloat)  VH->Flags |= FloatValue;
477      if (Vars[i].IsSigned) VH->Flags |= SignedValue;
478      if (Vars[i].IsPhysCoordX) VH->Flags |= ValueIsPhysCoordX;
479      if (Vars[i].IsPhysCoordY) VH->Flags |= ValueIsPhysCoordY;
480      if (Vars[i].IsPhysCoordZ) VH->Flags |= ValueIsPhysCoordZ;
481      if (Vars[i].MaybePhysGhost) VH->Flags |= ValueMaybePhysGhost;
482      RecordSize += VH->Size = Vars[i].Size;
483    }
484
485    MPI_Gather(&RHLocal, sizeof(RHLocal), MPI_BYTE,
486               &Header[GH->RanksStart], sizeof(RHLocal),
487               MPI_BYTE, 0, SplitComm);
488
489    if (NeedsBlockHeaders) {
490      MPI_Gather(&LocalBlockHeaders[0],
491                 Vars.size()*sizeof(BlockHeader), MPI_BYTE,
492                 &Header[GH->BlocksStart],
493                 Vars.size()*sizeof(BlockHeader), MPI_BYTE,
494                 0, SplitComm);
495
496      BlockHeader *BH = (BlockHeader *) &Header[GH->BlocksStart];
497      for (int i = 0; i < SplitNRanks; ++i)
498      for (size_t j = 0; j < Vars.size(); ++j, ++BH) {
499        if (i == 0 && j == 0)
500          BH->Start = HeaderSize;
501        else
502          BH->Start = BH[-1].Start + BH[-1].Size + CRCSize;
503      }
504
505      RankHeader *RH = (RankHeader *) &Header[GH->RanksStart];
506      RH->Start = HeaderSize; ++RH;
507      for (int i = 1; i < SplitNRanks; ++i, ++RH) {
508        RH->Start =
509          ((BlockHeader *) &Header[GH->BlocksStart])[i*Vars.size()].Start;
510        GH->NElems += RH->NElems;
511      }
512
513      // Compute the total file size.
514      uint64_t LastData = BH[-1].Size + CRCSize;
515      FileSize = BH[-1].Start + LastData;
516    } else {
517      RankHeader *RH = (RankHeader *) &Header[GH->RanksStart];
518      RH->Start = HeaderSize; ++RH;
519      for (int i = 1; i < SplitNRanks; ++i, ++RH) {
520        uint64_t PrevNElems = RH[-1].NElems;
521        uint64_t PrevData = PrevNElems*RecordSize + CRCSize*Vars.size();
522        RH->Start = RH[-1].Start + PrevData;
523        GH->NElems += RH->NElems;
524      }
525
526      // Compute the total file size.
527      uint64_t LastNElems = RH[-1].NElems;
528      uint64_t LastData = LastNElems*RecordSize + CRCSize*Vars.size();
529      FileSize = RH[-1].Start + LastData;
530    }
531
532    // Now that the starting offset has been computed, send it back to each rank.
533    MPI_Scatter(&Header[GH->RanksStart], sizeof(RHLocal),
534                MPI_BYTE, &RHLocal, sizeof(RHLocal),
535                MPI_BYTE, 0, SplitComm);
536
537    if (NeedsBlockHeaders)
538      MPI_Scatter(&Header[GH->BlocksStart],
539                  sizeof(BlockHeader)*Vars.size(), MPI_BYTE,
540                  &LocalBlockHeaders[0],
541                  sizeof(BlockHeader)*Vars.size(), MPI_BYTE,
542                  0, SplitComm);
543
544    uint64_t HeaderCRC = crc64_omp(&Header[0], HeaderSize - CRCSize);
545    crc64_invert(HeaderCRC, &Header[HeaderSize - CRCSize]);
546
547    if (FileIOType == FileIOMPI)
548      FH.get() = new GenericFileIO_MPI(MPI_COMM_SELF);
549    else if (FileIOType == FileIOMPICollective)
550      FH.get() = new GenericFileIO_MPICollective(MPI_COMM_SELF);
551    else
552      FH.get() = new GenericFileIO_POSIX();
553
554    FH.get()->open(LocalFileName);
555    FH.get()->setSize(FileSize);
556    FH.get()->write(&Header[0], HeaderSize, 0, "header");
557
558    close();
559  } else {
560    MPI_Gather(&RHLocal, sizeof(RHLocal), MPI_BYTE, 0, 0, MPI_BYTE, 0, SplitComm);
561    if (NeedsBlockHeaders)
562      MPI_Gather(&LocalBlockHeaders[0], Vars.size()*sizeof(BlockHeader),
563                 MPI_BYTE, 0, 0, MPI_BYTE, 0, SplitComm);
564    MPI_Scatter(0, 0, MPI_BYTE, &RHLocal, sizeof(RHLocal), MPI_BYTE, 0, SplitComm);
565    if (NeedsBlockHeaders)
566      MPI_Scatter(0, 0, MPI_BYTE, &LocalBlockHeaders[0], sizeof(BlockHeader)*Vars.size(),
567                  MPI_BYTE, 0, SplitComm);
568  }
569
570  MPI_Barrier(SplitComm);
571
572  if (FileIOType == FileIOMPI)
573    FH.get() = new GenericFileIO_MPI(SplitComm);
574  else if (FileIOType == FileIOMPICollective)
575    FH.get() = new GenericFileIO_MPICollective(SplitComm);
576  else
577    FH.get() = new GenericFileIO_POSIX();
578
579  FH.get()->open(LocalFileName);
580
581  uint64_t Offset = RHLocal.Start;
582  for (size_t i = 0; i < Vars.size(); ++i) {
583    uint64_t WriteSize = NeedsBlockHeaders ?
584                         LocalBlockHeaders[i].Size : NElems*Vars[i].Size;
585    void *Data = NeedsBlockHeaders ? LocalData[i] : Vars[i].Data;
586    uint64_t CRC = crc64_omp(Data, WriteSize);
587    bool HasExtraSpace = NeedsBlockHeaders ?
588                         LocalHasExtraSpace[i] : Vars[i].HasExtraSpace;
589    char *CRCLoc = HasExtraSpace ?  ((char *) Data) + WriteSize : (char *) &CRC;
590
591    if (NeedsBlockHeaders)
592      Offset = LocalBlockHeaders[i].Start;
593
594    // When using extra space for the CRC write, preserve the original contents.
595    char CRCSave[CRCSize];
596    if (HasExtraSpace)
597      std::copy(CRCLoc, CRCLoc + CRCSize, CRCSave);
598
599    crc64_invert(CRC, CRCLoc);
600
601    if (HasExtraSpace) {
602      FH.get()->write(Data, WriteSize + CRCSize, Offset, Vars[i].Name + " with CRC");
603    } else {
604      FH.get()->write(Data, WriteSize, Offset, Vars[i].Name);
605      FH.get()->write(CRCLoc, CRCSize, Offset + WriteSize, Vars[i].Name + " CRC");
606    }
607
608    if (HasExtraSpace)
609       std::copy(CRCSave, CRCSave + CRCSize, CRCLoc);
610
611    Offset += WriteSize + CRCSize;
612  }
613
614  close();
615  MPI_Barrier(Comm);
616
617  double EndTime = MPI_Wtime();
618  double TotalTime = EndTime - StartTime;
619  double MaxTotalTime;
620  MPI_Reduce(&TotalTime, &MaxTotalTime, 1, MPI_DOUBLE, MPI_MAX, 0, Comm);
621
622  if (SplitNRanks != NRanks) {
623    uint64_t ContribFileSize = (SplitRank == 0) ? FileSize : 0;
624    MPI_Reduce(&ContribFileSize, &FileSize, 1, MPI_UINT64_T, MPI_SUM, 0, Comm);
625  }
626
627  if (Rank == 0) {
628    double Rate = ((double) FileSize) / MaxTotalTime / (1024.*1024.);
629    cout << "Wrote " << Vars.size() << " variables to " << FileName <<
630            " (" << FileSize << " bytes) in " << MaxTotalTime << "s: " <<
631            Rate << " MB/s" << endl;
632  }
633
634  MPI_Comm_free(&SplitComm);
635  SplitComm = MPI_COMM_NULL;
636}
637#endif // GENERICIO_NO_MPI
638
639// Note: Errors from this function should be recoverable. This means that if
640// one rank throws an exception, then all ranks should.
641void GenericIO::openAndReadHeader(bool MustMatch, int EffRank, bool CheckPartMap) {
642  const char *Magic = isBigEndian() ? MagicBE : MagicLE;
643  const char *MagicInv = isBigEndian() ? MagicLE : MagicBE;
644
645  int NRanks, Rank;
646#ifndef GENERICIO_NO_MPI
647  MPI_Comm_rank(Comm, &Rank);
648  MPI_Comm_size(Comm, &NRanks);
649#else
650  Rank = 0;
651  NRanks = 1;
652#endif
653
654  if (EffRank == -1)
655    EffRank = Rank;
656
657  if (RankMap.empty() && CheckPartMap) {
658    // First, check to see if the file is a rank map.
659    unsigned long RanksInMap = 0;
660    if (Rank == 0) {
661      try {
662#ifndef GENERICIO_NO_MPI
663        GenericIO GIO(MPI_COMM_SELF, FileName, FileIOType);
664#else
665        GenericIO GIO(FileName, FileIOType);
666#endif
667        GIO.openAndReadHeader(true, 0, false);
668        RanksInMap = GIO.readNumElems();
669
670        RankMap.resize(RanksInMap + GIO.requestedExtraSpace()/sizeof(int));
671        GIO.addVariable("$partition", RankMap, true);
672
673        GIO.readData(0, false);
674        RankMap.resize(RanksInMap);
675      } catch (...) {
676        RankMap.clear();
677        RanksInMap = 0;
678      }
679    }
680
681#ifndef GENERICIO_NO_MPI
682    MPI_Bcast(&RanksInMap, 1, MPI_UNSIGNED_LONG, 0, Comm);
683    if (RanksInMap > 0) {
684      RankMap.resize(RanksInMap);
685      MPI_Bcast(&RankMap[0], RanksInMap, MPI_INT, 0, Comm);
686    }
687#endif
688  }
689
690#ifndef GENERICIO_NO_MPI
691  if (SplitComm != MPI_COMM_NULL)
692    MPI_Comm_free(&SplitComm);
693#endif
694
695  string LocalFileName;
696  if (RankMap.empty()) {
697    LocalFileName = FileName;
698#ifndef GENERICIO_NO_MPI
699    MPI_Comm_dup(Comm, &SplitComm);
700#endif
701  } else {
702    stringstream ss;
703    ss << FileName << "#" << RankMap[EffRank];
704    LocalFileName = ss.str();
705#ifndef GENERICIO_NO_MPI
706#ifdef __bgq__
707    MPI_Barrier(Comm);
708#endif
709    MPI_Comm_split(Comm, RankMap[EffRank], Rank, &SplitComm);
710#endif
711  }
712
713  if (LocalFileName == OpenFileName)
714    return;
715  FH.close();
716
717  int SplitNRanks, SplitRank;
718#ifndef GENERICIO_NO_MPI
719  MPI_Comm_rank(SplitComm, &SplitRank);
720  MPI_Comm_size(SplitComm, &SplitNRanks);
721#else
722  SplitRank = 0;
723  SplitNRanks = 1;
724#endif
725
726  uint64_t HeaderSize;
727  vector<char> Header;
728
729  if (SplitRank == 0) {
730#ifndef GENERICIO_NO_MPI
731    if (FileIOType == FileIOMPI)
732      FH.get() = new GenericFileIO_MPI(MPI_COMM_SELF);
733    else if (FileIOType == FileIOMPICollective)
734      FH.get() = new GenericFileIO_MPICollective(MPI_COMM_SELF);
735    else
736#endif
737      FH.get() = new GenericFileIO_POSIX();
738
739#ifndef GENERICIO_NO_MPI
740    char True = 1, False = 0;
741#endif
742
743    try {
744      FH.get()->open(LocalFileName, true);
745
746      GlobalHeader GH;
747      FH.get()->read(&GH, sizeof(GlobalHeader), 0, "global header");
748
749      if (string(GH.Magic, GH.Magic + MagicSize - 1) != Magic) {
750        string Error;
751        if (string(GH.Magic, GH.Magic + MagicSize - 1) == MagicInv) {
752          Error = "wrong endianness";
753        } else {
754          Error = "invalid file-type identifier";
755        }
756        throw runtime_error("Won't read " + LocalFileName + ": " + Error);
757      }
758
759      if (MustMatch) {
760        if (SplitNRanks != (int) GH.NRanks) {
761          stringstream ss;
762          ss << "Won't read " << LocalFileName << ": communicator-size mismatch: " <<
763                "current: " << SplitNRanks << ", file: " << GH.NRanks;
764          throw runtime_error(ss.str());
765        }
766
767#ifndef GENERICIO_NO_MPI
768        int TopoStatus;
769        MPI_Topo_test(Comm, &TopoStatus);
770        if (TopoStatus == MPI_CART) {
771          int Dims[3], Periods[3], Coords[3];
772          MPI_Cart_get(Comm, 3, Dims, Periods, Coords);
773
774          bool DimsMatch = true;
775          for (int i = 0; i < 3; ++i) {
776            if ((uint64_t) Dims[i] != GH.Dims[i]) {
777              DimsMatch = false;
778              break;
779            }
780          }
781
782          if (!DimsMatch) {
783            stringstream ss;
784            ss << "Won't read " << LocalFileName <<
785                  ": communicator-decomposition mismatch: " <<
786                  "current: " << Dims[0] << "x" << Dims[1] << "x" << Dims[2] <<
787                  ", file: " << GH.Dims[0] << "x" << GH.Dims[1] << "x" <<
788                  GH.Dims[2];
789            throw runtime_error(ss.str());
790          }
791        }
792#endif
793      }
794
795      HeaderSize = GH.HeaderSize;
796      Header.resize(HeaderSize + CRCSize, 0xFE /* poison */);
797      FH.get()->read(&Header[0], HeaderSize + CRCSize, 0, "header");
798
799      uint64_t CRC = crc64_omp(&Header[0], HeaderSize + CRCSize);
800      if (CRC != (uint64_t) -1) {
801        throw runtime_error("Header CRC check failed: " + LocalFileName);
802      }
803
804#ifndef GENERICIO_NO_MPI
805      close();
806      MPI_Bcast(&True, 1, MPI_BYTE, 0, SplitComm);
807#endif
808    } catch (...) {
809#ifndef GENERICIO_NO_MPI
810      MPI_Bcast(&False, 1, MPI_BYTE, 0, SplitComm);
811#endif
812      close();
813      throw;
814    }
815  } else {
816#ifndef GENERICIO_NO_MPI
817    char Okay;
818    MPI_Bcast(&Okay, 1, MPI_BYTE, 0, SplitComm);
819    if (!Okay)
820      throw runtime_error("Failure broadcast from rank 0");
821#endif
822  }
823
824#ifndef GENERICIO_NO_MPI
825  MPI_Bcast(&HeaderSize, 1, MPI_UINT64_T, 0, SplitComm);
826#endif
827
828  Header.resize(HeaderSize, 0xFD /* poison */);
829#ifndef GENERICIO_NO_MPI
830  MPI_Bcast(&Header[0], HeaderSize, MPI_BYTE, 0, SplitComm);
831#endif
832
833  FH.getHeaderCache().clear();
834  FH.getHeaderCache().swap(Header);
835  OpenFileName = LocalFileName;
836
837#ifndef GENERICIO_NO_MPI
838  MPI_Barrier(Comm);
839
840  if (FileIOType == FileIOMPI)
841    FH.get() = new GenericFileIO_MPI(SplitComm);
842  else if (FileIOType == FileIOMPICollective)
843    FH.get() = new GenericFileIO_MPICollective(SplitComm);
844  else
845    FH.get() = new GenericFileIO_POSIX();
846
847  int OpenErr = 0, TotOpenErr;
848  try {
849    FH.get()->open(LocalFileName, true);
850    MPI_Allreduce(&OpenErr, &TotOpenErr, 1, MPI_INT, MPI_SUM, Comm);
851  } catch (...) {
852    OpenErr = 1;
853    MPI_Allreduce(&OpenErr, &TotOpenErr, 1, MPI_INT, MPI_SUM, Comm);
854    throw;
855  }
856
857  if (TotOpenErr > 0) {
858    stringstream ss;
859    ss << TotOpenErr << " ranks failed to open file: " << LocalFileName;
860    throw runtime_error(ss.str());
861  }
862#endif
863}
864
865int GenericIO::readNRanks() {
866  if (RankMap.size())
867    return RankMap.size();
868
869  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
870  GlobalHeader *GH = (GlobalHeader *) &FH.getHeaderCache()[0];
871  return (int) GH->NRanks;
872}
873
874void GenericIO::readDims(int Dims[3]) {
875  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
876  GlobalHeader *GH = (GlobalHeader *) &FH.getHeaderCache()[0];
877  std::copy(GH->Dims, GH->Dims + 3, Dims);
878}
879
880uint64_t GenericIO::readTotalNumElems() {
881  if (RankMap.size())
882    return (uint64_t) -1;
883
884  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
885  GlobalHeader *GH = (GlobalHeader *) &FH.getHeaderCache()[0];
886  return GH->NElems;
887}
888
889void GenericIO::readPhysOrigin(double Origin[3]) {
890  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
891  GlobalHeader *GH = (GlobalHeader *) &FH.getHeaderCache()[0];
892  if (offsetof(GlobalHeader, PhysOrigin) >= GH->GlobalHeaderSize) {
893    std::fill(Origin, Origin + 3, 0.0);
894    return;
895  }
896
897  std::copy(GH->PhysOrigin, GH->PhysOrigin + 3, Origin);
898}
899
900void GenericIO::readPhysScale(double Scale[3]) {
901  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
902  GlobalHeader *GH = (GlobalHeader *) &FH.getHeaderCache()[0];
903  if (offsetof(GlobalHeader, PhysScale) >= GH->GlobalHeaderSize) {
904    std::fill(Scale, Scale + 3, 0.0);
905    return;
906  }
907
908  std::copy(GH->PhysScale, GH->PhysScale + 3, Scale);
909}
910
911static size_t getRankIndex(int EffRank, GlobalHeader *GH,
912                           vector<int> &RankMap, vector<char> &HeaderCache) {
913  if (offsetof(RankHeader, GlobalRank) >= GH->RanksSize || RankMap.empty())
914    return EffRank;
915
916  for (size_t i = 0; i < GH->NRanks; ++i) {
917    RankHeader *RH = (RankHeader *) &HeaderCache[GH->RanksStart +
918                                                 i*GH->RanksSize];
919    if ((int) RH->GlobalRank == EffRank)
920      return i;
921  }
922
923  assert(false && "Index requested of an invalid rank");
924  return (size_t) -1;
925}
926
927int GenericIO::readGlobalRankNumber(int EffRank) {
928  if (EffRank == -1) {
929#ifndef GENERICIO_NO_MPI
930    MPI_Comm_rank(Comm, &EffRank);
931#else
932    EffRank = 0;
933#endif
934  }
935
936  openAndReadHeader(false, EffRank, false);
937
938  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
939
940  GlobalHeader *GH = (GlobalHeader *) &FH.getHeaderCache()[0];
941  size_t RankIndex = getRankIndex(EffRank, GH, RankMap, FH.getHeaderCache());
942
943  assert(RankIndex < GH->NRanks && "Invalid rank specified");
944
945  if (offsetof(RankHeader, GlobalRank) >= GH->RanksSize)
946    return EffRank;
947
948  RankHeader *RH = (RankHeader *) &FH.getHeaderCache()[GH->RanksStart +
949                                               RankIndex*GH->RanksSize];
950
951  return (int) RH->GlobalRank;
952}
953
954size_t GenericIO::readNumElems(int EffRank) {
955  if (EffRank == -1) {
956#ifndef GENERICIO_NO_MPI
957    MPI_Comm_rank(Comm, &EffRank);
958#else
959    EffRank = 0;
960#endif
961  }
962
963  openAndReadHeader(false, EffRank, false);
964
965  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
966
967  GlobalHeader *GH = (GlobalHeader *) &FH.getHeaderCache()[0];
968  size_t RankIndex = getRankIndex(EffRank, GH, RankMap, FH.getHeaderCache());
969
970  assert(RankIndex < GH->NRanks && "Invalid rank specified");
971
972  RankHeader *RH = (RankHeader *) &FH.getHeaderCache()[GH->RanksStart +
973                                               RankIndex*GH->RanksSize];
974  return (size_t) RH->NElems;
975}
976
977void GenericIO::readCoords(int Coords[3], int EffRank) {
978  if (EffRank == -1) {
979#ifndef GENERICIO_NO_MPI
980    MPI_Comm_rank(Comm, &EffRank);
981#else
982    EffRank = 0;
983#endif
984  }
985
986  openAndReadHeader(false, EffRank, false);
987
988  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
989
990  GlobalHeader *GH = (GlobalHeader *) &FH.getHeaderCache()[0];
991  size_t RankIndex = getRankIndex(EffRank, GH, RankMap, FH.getHeaderCache());
992
993  assert(RankIndex < GH->NRanks && "Invalid rank specified");
994
995  RankHeader *RH = (RankHeader *) &FH.getHeaderCache()[GH->RanksStart +
996                                               RankIndex*GH->RanksSize];
997
998  std::copy(RH->Coords, RH->Coords + 3, Coords);
999}
1000
1001// Note: Errors from this function should be recoverable. This means that if
1002// one rank throws an exception, then all ranks should.
1003void GenericIO::readData(int EffRank, bool PrintStats, bool CollStats) {
1004  int Rank;
1005#ifndef GENERICIO_NO_MPI
1006  MPI_Comm_rank(Comm, &Rank);
1007#else
1008  Rank = 0;
1009#endif
1010
1011  openAndReadHeader(false, EffRank, false);
1012
1013  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1014
1015  if (EffRank == -1)
1016    EffRank = Rank;
1017
1018  GlobalHeader *GH = (GlobalHeader *) &FH.getHeaderCache()[0];
1019  size_t RankIndex = getRankIndex(EffRank, GH, RankMap, FH.getHeaderCache());
1020
1021  assert(RankIndex < GH->NRanks && "Invalid rank specified");
1022
1023  RankHeader *RH = (RankHeader *) &FH.getHeaderCache()[GH->RanksStart +
1024                                               RankIndex*GH->RanksSize];
1025
1026  uint64_t TotalReadSize = 0;
1027#ifndef GENERICIO_NO_MPI
1028  double StartTime = MPI_Wtime();
1029#else
1030  double StartTime = double(clock())/CLOCKS_PER_SEC;
1031#endif
1032
1033  int NErrs[3] = { 0, 0, 0 };
1034  for (size_t i = 0; i < Vars.size(); ++i) {
1035    uint64_t Offset = RH->Start;
1036    bool VarFound = false;
1037    for (uint64_t j = 0; j < GH->NVars; ++j) {
1038      VariableHeader *VH = (VariableHeader *) &FH.getHeaderCache()[GH->VarsStart +
1039                                                           j*GH->VarsSize];
1040
1041      string VName(VH->Name, VH->Name + NameSize);
1042      size_t VNameNull = VName.find('\0');
1043      if (VNameNull < NameSize)
1044        VName.resize(VNameNull);
1045
1046      uint64_t ReadSize = RH->NElems*VH->Size + CRCSize;
1047      if (VName != Vars[i].Name) {
1048        Offset += ReadSize;
1049        continue;
1050      }
1051
1052      VarFound = true;
1053      bool IsFloat = (bool) (VH->Flags & FloatValue),
1054           IsSigned = (bool) (VH->Flags & SignedValue);
1055      if (VH->Size != Vars[i].Size) {
1056        stringstream ss;
1057        ss << "Size mismatch for variable " << Vars[i].Name <<
1058              " in: " << OpenFileName << ": current: " << Vars[i].Size <<
1059              ", file: " << VH->Size;
1060        throw runtime_error(ss.str());
1061      } else if (IsFloat != Vars[i].IsFloat) {
1062        string Float("float"), Int("integer");
1063        stringstream ss;
1064        ss << "Type mismatch for variable " << Vars[i].Name <<
1065              " in: " << OpenFileName << ": current: " <<
1066              (Vars[i].IsFloat ? Float : Int) <<
1067              ", file: " << (IsFloat ? Float : Int);
1068        throw runtime_error(ss.str());
1069      } else if (IsSigned != Vars[i].IsSigned) {
1070        string Signed("signed"), Uns("unsigned");
1071        stringstream ss;
1072        ss << "Type mismatch for variable " << Vars[i].Name <<
1073              " in: " << OpenFileName << ": current: " <<
1074              (Vars[i].IsSigned ? Signed : Uns) <<
1075              ", file: " << (IsSigned ? Signed : Uns);
1076        throw runtime_error(ss.str());
1077      }
1078
1079      vector<unsigned char> LData;
1080      void *Data = Vars[i].Data;
1081      bool HasExtraSpace = Vars[i].HasExtraSpace;
1082      if (offsetof(GlobalHeader, BlocksStart) < GH->GlobalHeaderSize &&
1083          GH->BlocksSize > 0) {
1084        BlockHeader *BH = (BlockHeader *)
1085          &FH.getHeaderCache()[GH->BlocksStart +
1086                               (RankIndex*GH->NVars + j)*GH->BlocksSize];
1087        ReadSize = BH->Size + CRCSize;
1088        Offset = BH->Start;
1089
1090        if (strncmp(BH->Filters[0], CompressName, FilterNameSize) == 0) {
1091          LData.resize(ReadSize);
1092          Data = &LData[0];
1093          HasExtraSpace = true;
1094        } else if (BH->Filters[0][0] != '\0') {
1095          stringstream ss;
1096          ss << "Unknown filter \"" << BH->Filters[0] << "\" on variable " << Vars[i].Name;
1097          throw runtime_error(ss.str());
1098        }
1099      }
1100
1101      assert(HasExtraSpace && "Extra space required for reading");
1102
1103      char CRCSave[CRCSize];
1104      char *CRCLoc = ((char *) Data) + ReadSize - CRCSize;
1105      if (HasExtraSpace)
1106        std::copy(CRCLoc, CRCLoc + CRCSize, CRCSave);
1107
1108      int Retry = 0;
1109      {
1110        int RetryCount = 300;
1111        const char *EnvStr = getenv("GENERICIO_RETRY_COUNT");
1112        if (EnvStr)
1113          RetryCount = atoi(EnvStr);
1114
1115        int RetrySleep = 100; // ms
1116        EnvStr = getenv("GENERICIO_RETRY_SLEEP");
1117        if (EnvStr)
1118          RetrySleep = atoi(EnvStr);
1119
1120        for (; Retry < RetryCount; ++Retry) {
1121          try {
1122            FH.get()->read(Data, ReadSize, Offset, Vars[i].Name);
1123            break;
1124          } catch (...) { }
1125
1126          usleep(1000*RetrySleep);
1127        }
1128
1129        if (Retry == RetryCount) {
1130          ++NErrs[0];
1131          break;
1132        } else if (Retry > 0) {
1133          EnvStr = getenv("GENERICIO_VERBOSE");
1134          if (EnvStr) {
1135            int Mod = atoi(EnvStr);
1136            if (Mod > 0) {
1137              int Rank;
1138#ifndef GENERICIO_NO_MPI
1139              MPI_Comm_rank(MPI_COMM_WORLD, &Rank);
1140#else
1141              Rank = 0;
1142#endif
1143
1144              std::cerr << "Rank " << Rank << ": " << Retry <<
1145                           " I/O retries were necessary for reading " <<
1146                           Vars[i].Name << " from: " << OpenFileName << "\n";
1147
1148              std::cerr.flush();
1149            }
1150          }
1151        }
1152      }
1153
1154      TotalReadSize += ReadSize;
1155
1156      uint64_t CRC = crc64_omp(Data, ReadSize);
1157      if (CRC != (uint64_t) -1) {
1158        ++NErrs[1];
1159
1160        int Rank;
1161#ifndef GENERICIO_NO_MPI
1162        MPI_Comm_rank(MPI_COMM_WORLD, &Rank);
1163#else
1164        Rank = 0;
1165#endif
1166
1167        // All ranks will do this and have a good time!
1168        string dn = "gio_crc_errors";
1169        mkdir(dn.c_str(), 0777);
1170
1171        srand(time(0));
1172        int DumpNum = rand();
1173        stringstream ssd;
1174        ssd << dn << "/gio_crc_error_dump." << Rank << "." << DumpNum << ".bin";
1175
1176        stringstream ss;
1177        ss << dn << "/gio_crc_error_log." << Rank << ".txt";
1178
1179        ofstream ofs(ss.str().c_str(), ofstream::out | ofstream::app);
1180        ofs << "On-Disk CRC Error Report:\n";
1181        ofs << "Variable: " << Vars[i].Name << "\n";
1182        ofs << "File: " << OpenFileName << "\n";
1183        ofs << "I/O Retries: " << Retry << "\n";
1184        ofs << "Size: " << ReadSize << " bytes\n";
1185        ofs << "Offset: " << Offset << " bytes\n";
1186        ofs << "CRC: " << CRC << " (expected is -1)\n";
1187        ofs << "Dump file: " << ssd.str() << "\n";
1188        ofs << "\n";
1189        ofs.close();
1190
1191        ofstream dofs(ssd.str().c_str(), ofstream::out);
1192        dofs.write((const char *) Data, ReadSize);
1193        dofs.close();
1194        break;
1195      }
1196
1197      if (HasExtraSpace)
1198        std::copy(CRCSave, CRCSave + CRCSize, CRCLoc);
1199
1200      if (LData.size()) {
1201        CompressHeader *CH = (CompressHeader*) &LData[0];
1202
1203#ifdef _OPENMP
1204#pragma omp master
1205  {
1206#endif
1207
1208       if (!blosc_initialized) {
1209         blosc_init();
1210         blosc_initialized = true;
1211       }
1212
1213#ifdef _OPENMP
1214       blosc_set_nthreads(omp_get_max_threads());
1215  }
1216#endif
1217
1218        blosc_decompress(&LData[0] + sizeof(CompressHeader),
1219                         Vars[i].Data, Vars[i].Size*RH->NElems);
1220
1221        if (CH->OrigCRC != crc64_omp(Vars[i].Data, Vars[i].Size*RH->NElems)) {
1222          ++NErrs[2];
1223          break;
1224        }
1225      }
1226
1227      break;
1228    }
1229
1230    if (!VarFound)
1231      throw runtime_error("Variable " + Vars[i].Name +
1232                          " not found in: " + OpenFileName);
1233
1234    // This is for debugging.
1235    if (NErrs[0] || NErrs[1] || NErrs[2]) {
1236      const char *EnvStr = getenv("GENERICIO_VERBOSE");
1237      if (EnvStr) {
1238        int Mod = atoi(EnvStr);
1239        if (Mod > 0) {
1240          int Rank;
1241#ifndef GENERICIO_NO_MPI
1242          MPI_Comm_rank(MPI_COMM_WORLD, &Rank);
1243#else
1244          Rank = 0;
1245#endif
1246
1247          std::cerr << "Rank " << Rank << ": " << NErrs[0] << " I/O error(s), " <<
1248          NErrs[1] << " CRC error(s) and " << NErrs[2] <<
1249          " decompression CRC error(s) reading: " << Vars[i].Name <<
1250          " from: " << OpenFileName << "\n";
1251
1252          std::cerr.flush();
1253        }
1254      }
1255    }
1256
1257    if (NErrs[0] || NErrs[1] || NErrs[2])
1258      break;
1259  }
1260
1261  int AllNErrs[3];
1262#ifndef GENERICIO_NO_MPI
1263  MPI_Allreduce(NErrs, AllNErrs, 3, MPI_INT, MPI_SUM, Comm);
1264#else
1265  AllNErrs[0] = NErrs[0]; AllNErrs[1] = NErrs[1]; AllNErrs[2] = NErrs[2];
1266#endif
1267
1268  if (AllNErrs[0] > 0 || AllNErrs[1] > 0 || AllNErrs[2] > 0) {
1269    stringstream ss;
1270    ss << "Experienced " << AllNErrs[0] << " I/O error(s), " <<
1271          AllNErrs[1] << " CRC error(s) and " << AllNErrs[2] <<
1272          " decompression CRC error(s) reading: " << OpenFileName;
1273    throw runtime_error(ss.str());
1274  }
1275
1276#ifndef GENERICIO_NO_MPI
1277  MPI_Barrier(Comm);
1278#endif
1279
1280#ifndef GENERICIO_NO_MPI
1281  double EndTime = MPI_Wtime();
1282#else
1283  double EndTime = double(clock())/CLOCKS_PER_SEC;
1284#endif
1285
1286  double TotalTime = EndTime - StartTime;
1287  double MaxTotalTime;
1288#ifndef GENERICIO_NO_MPI
1289  if (CollStats)
1290    MPI_Reduce(&TotalTime, &MaxTotalTime, 1, MPI_DOUBLE, MPI_MAX, 0, Comm);
1291  else
1292#endif
1293  MaxTotalTime = TotalTime;
1294
1295  uint64_t AllTotalReadSize;
1296#ifndef GENERICIO_NO_MPI
1297  if (CollStats)
1298    MPI_Reduce(&TotalReadSize, &AllTotalReadSize, 1, MPI_UINT64_T, MPI_SUM, 0, Comm);
1299  else
1300#endif
1301  AllTotalReadSize = TotalReadSize;
1302
1303  if (Rank == 0 && PrintStats) {
1304    double Rate = ((double) AllTotalReadSize) / MaxTotalTime / (1024.*1024.);
1305    cout << "Read " << Vars.size() << " variables from " << FileName <<
1306            " (" << AllTotalReadSize << " bytes) in " << MaxTotalTime << "s: " <<
1307            Rate << " MB/s [excluding header read]" << endl;
1308  }
1309
1310}
1311
1312void GenericIO::getVariableInfo(vector<VariableInfo> &VI) {
1313  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1314
1315  GlobalHeader *GH = (GlobalHeader *) &FH.getHeaderCache()[0];
1316  for (uint64_t j = 0; j < GH->NVars; ++j) {
1317    VariableHeader *VH = (VariableHeader *) &FH.getHeaderCache()[GH->VarsStart +
1318                                                         j*GH->VarsSize];
1319
1320    string VName(VH->Name, VH->Name + NameSize);
1321    size_t VNameNull = VName.find('\0');
1322    if (VNameNull < NameSize)
1323      VName.resize(VNameNull);
1324
1325    bool IsFloat = (bool) (VH->Flags & FloatValue),
1326         IsSigned = (bool) (VH->Flags & SignedValue),
1327         IsPhysCoordX = (bool) (VH->Flags & ValueIsPhysCoordX),
1328         IsPhysCoordY = (bool) (VH->Flags & ValueIsPhysCoordY),
1329         IsPhysCoordZ = (bool) (VH->Flags & ValueIsPhysCoordZ),
1330         MaybePhysGhost = (bool) (VH->Flags & ValueMaybePhysGhost);
1331    VI.push_back(VariableInfo(VName, (size_t) VH->Size, IsFloat, IsSigned,
1332                              IsPhysCoordX, IsPhysCoordY, IsPhysCoordZ,
1333                              MaybePhysGhost));
1334  }
1335}
1336
1337void GenericIO::setNaturalDefaultPartition() {
1338#ifdef __bgq__
1339  Personality_t pers;
1340  Kernel_GetPersonality(&pers, sizeof(pers));
1341
1342  // Nodes in an ION Partition
1343  int SPLIT_A = 2;
1344  int SPLIT_B = 2;
1345  int SPLIT_C = 4;
1346  int SPLIT_D = 4;
1347  int SPLIT_E = 2;
1348
1349  int Anodes, Bnodes, Cnodes, Dnodes, Enodes;
1350  int Acoord, Bcoord, Ccoord, Dcoord, Ecoord;
1351  int A_color, B_color, C_color, D_color, E_color;
1352  int A_blocks, B_blocks, C_blocks, D_blocks, E_blocks;
1353  uint32_t id_on_node;
1354  int ranks_per_node, color;
1355
1356  Anodes = pers.Network_Config.Anodes;
1357  Acoord = pers.Network_Config.Acoord;
1358
1359  Bnodes = pers.Network_Config.Bnodes;
1360  Bcoord = pers.Network_Config.Bcoord;
1361
1362  Cnodes = pers.Network_Config.Cnodes;
1363  Ccoord = pers.Network_Config.Ccoord;
1364
1365  Dnodes = pers.Network_Config.Dnodes;
1366  Dcoord = pers.Network_Config.Dcoord;
1367
1368  Enodes = pers.Network_Config.Enodes;
1369  Ecoord = pers.Network_Config.Ecoord;
1370
1371  A_color  = Acoord /  SPLIT_A;
1372  B_color  = Bcoord /  SPLIT_B;
1373  C_color  = Ccoord /  SPLIT_C;
1374  D_color  = Dcoord /  SPLIT_D;
1375  E_color  = Ecoord /  SPLIT_E;
1376
1377  // Number of blocks
1378  A_blocks = Anodes / SPLIT_A;
1379  B_blocks = Bnodes / SPLIT_B;
1380  C_blocks = Cnodes / SPLIT_C;
1381  D_blocks = Dnodes / SPLIT_D;
1382  E_blocks = Enodes / SPLIT_E;
1383
1384  color = (A_color * (B_blocks * C_blocks * D_blocks * E_blocks))
1385    + (B_color * (C_blocks * D_blocks * E_blocks))
1386    + (C_color * ( D_blocks * E_blocks))
1387    + (D_color * ( E_blocks))
1388    + E_color;
1389
1390  DefaultPartition = color;
1391#else
1392#ifndef GENERICIO_NO_MPI
1393  bool UseName = true;
1394  const char *EnvStr = getenv("GENERICIO_PARTITIONS_USE_NAME");
1395  if (EnvStr) {
1396    int Mod = atoi(EnvStr);
1397    UseName = (Mod != 0);
1398  }
1399
1400  if (UseName) {
1401    // This is a heuristic to generate ~256 partitions based on the
1402    // names of the nodes.
1403    char Name[MPI_MAX_PROCESSOR_NAME];
1404    int Len = 0;
1405
1406    MPI_Get_processor_name(Name, &Len);
1407    unsigned char color = 0;
1408    for (int i = 0; i < Len; ++i)
1409      color += (unsigned char) Name[i];
1410
1411    DefaultPartition = color;
1412  }
1413
1414  // This is for debugging.
1415  EnvStr = getenv("GENERICIO_RANK_PARTITIONS");
1416  if (EnvStr) {
1417    int Mod = atoi(EnvStr);
1418    if (Mod > 0) {
1419      int Rank;
1420      MPI_Comm_rank(MPI_COMM_WORLD, &Rank);
1421      DefaultPartition += Rank % Mod;
1422    }
1423  }
1424#endif
1425#endif
1426}
1427
1428} /* END namespace cosmotk */
Note: See TracBrowser for help on using the repository browser.