source: GenericIO.cxx @ d6628a6

Revision d6628a6, 52.1 KB checked in by Hal Finkel <hfinkel@…>, 7 years ago (diff)

Fully qualify uses of cout

  • Property mode set to 100644
Line 
1/*
2 *                    Copyright (C) 2015, UChicago Argonne, LLC
3 *                               All Rights Reserved
4 *
5 *                               Generic IO (ANL-15-066)
6 *                     Hal Finkel, Argonne National Laboratory
7 *
8 *                              OPEN SOURCE LICENSE
9 *
10 * Under the terms of Contract No. DE-AC02-06CH11357 with UChicago Argonne,
11 * LLC, the U.S. Government retains certain rights in this software.
12 *
13 * Redistribution and use in source and binary forms, with or without
14 * modification, are permitted provided that the following conditions are met:
15 *
16 *   1. Redistributions of source code must retain the above copyright notice,
17 *      this list of conditions and the following disclaimer.
18 *
19 *   2. Redistributions in binary form must reproduce the above copyright
20 *      notice, this list of conditions and the following disclaimer in the
21 *      documentation and/or other materials provided with the distribution.
22 *
23 *   3. Neither the names of UChicago Argonne, LLC or the Department of Energy
24 *      nor the names of its contributors may be used to endorse or promote
25 *      products derived from this software without specific prior written
26 *      permission.
27 *
28 * *****************************************************************************
29 *
30 *                                  DISCLAIMER
31 * THE SOFTWARE IS SUPPLIED “AS IS” WITHOUT WARRANTY OF ANY KIND.  NEITHER THE
32 * UNTED STATES GOVERNMENT, NOR THE UNITED STATES DEPARTMENT OF ENERGY, NOR
33 * UCHICAGO ARGONNE, LLC, NOR ANY OF THEIR EMPLOYEES, MAKES ANY WARRANTY,
34 * EXPRESS OR IMPLIED, OR ASSUMES ANY LEGAL LIABILITY OR RESPONSIBILITY FOR THE
35 * ACCURACY, COMPLETENESS, OR USEFULNESS OF ANY INFORMATION, DATA, APPARATUS,
36 * PRODUCT, OR PROCESS DISCLOSED, OR REPRESENTS THAT ITS USE WOULD NOT INFRINGE
37 * PRIVATELY OWNED RIGHTS.
38 *
39 * *****************************************************************************
40 */
41
42#define _XOPEN_SOURCE 600
43#include "CRC64.h"
44#include "GenericIO.h"
45
46extern "C" {
47#include "blosc.h"
48}
49
50#include <sstream>
51#include <fstream>
52#include <stdexcept>
53#include <iterator>
54#include <algorithm>
55#include <cassert>
56#include <cstddef>
57#include <cstring>
58
59#ifndef GENERICIO_NO_MPI
60#include <ctime>
61#endif
62
63#include <sys/types.h>
64#include <sys/stat.h>
65#include <fcntl.h>
66#include <errno.h>
67
68#ifdef __bgq__
69#include <mpix.h>
70#endif
71
72#ifndef MPI_UINT64_T
73#define MPI_UINT64_T (sizeof(long) == 8 ? MPI_LONG : MPI_LONG_LONG)
74#endif
75
76using namespace std;
77
78namespace gio {
79
80
81#ifndef GENERICIO_NO_MPI
82GenericFileIO_MPI::~GenericFileIO_MPI() {
83  (void) MPI_File_close(&FH);
84}
85
86void GenericFileIO_MPI::open(const std::string &FN, bool ForReading) {
87  FileName = FN;
88
89  int amode = ForReading ? MPI_MODE_RDONLY : (MPI_MODE_WRONLY | MPI_MODE_CREATE);
90  if (MPI_File_open(Comm, const_cast<char *>(FileName.c_str()), amode,
91                    MPI_INFO_NULL, &FH) != MPI_SUCCESS)
92    throw runtime_error((!ForReading ? "Unable to create the file: " :
93                                       "Unable to open the file: ") +
94                        FileName);
95}
96
97void GenericFileIO_MPI::setSize(size_t sz) {
98  if (MPI_File_set_size(FH, sz) != MPI_SUCCESS)
99    throw runtime_error("Unable to set size for file: " + FileName);
100}
101
102void GenericFileIO_MPI::read(void *buf, size_t count, off_t offset,
103                             const std::string &D) {
104  while (count > 0) {
105    MPI_Status status;
106    if (MPI_File_read_at(FH, offset, buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
107      throw runtime_error("Unable to read " + D + " from file: " + FileName);
108
109    int scount;
110    (void) MPI_Get_count(&status, MPI_BYTE, &scount);
111
112    count -= scount;
113    buf = ((char *) buf) + scount;
114    offset += scount;
115  }
116}
117
118void GenericFileIO_MPI::write(const void *buf, size_t count, off_t offset,
119                              const std::string &D) {
120  while (count > 0) {
121    MPI_Status status;
122    if (MPI_File_write_at(FH, offset, (void *) buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
123      throw runtime_error("Unable to write " + D + " to file: " + FileName);
124
125    int scount = 0;
126    // On some systems, MPI_Get_count will not return zero even when count is zero.
127    if (count > 0)
128      (void) MPI_Get_count(&status, MPI_BYTE, &scount);
129
130    count -= scount;
131    buf = ((char *) buf) + scount;
132    offset += scount;
133  }
134}
135
136void GenericFileIO_MPICollective::read(void *buf, size_t count, off_t offset,
137                             const std::string &D) {
138  int Continue = 0;
139
140  do {
141    MPI_Status status;
142    if (MPI_File_read_at_all(FH, offset, buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
143      throw runtime_error("Unable to read " + D + " from file: " + FileName);
144
145    int scount = 0;
146    // On some systems, MPI_Get_count will not return zero even when count is zero.
147    if (count > 0)
148      (void) MPI_Get_count(&status, MPI_BYTE, &scount);
149
150    count -= scount;
151    buf = ((char *) buf) + scount;
152    offset += scount;
153
154    int NeedContinue = (count > 0);
155    MPI_Allreduce(&NeedContinue, &Continue, 1, MPI_INT, MPI_SUM, Comm);
156  } while (Continue);
157}
158
159void GenericFileIO_MPICollective::write(const void *buf, size_t count, off_t offset,
160                              const std::string &D) {
161  int Continue = 0;
162
163  do {
164    MPI_Status status;
165    if (MPI_File_write_at_all(FH, offset, (void *) buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
166      throw runtime_error("Unable to write " + D + " to file: " + FileName);
167
168    int scount;
169    (void) MPI_Get_count(&status, MPI_BYTE, &scount);
170
171    count -= scount;
172    buf = ((char *) buf) + scount;
173    offset += scount;
174
175    int NeedContinue = (count > 0);
176    MPI_Allreduce(&NeedContinue, &Continue, 1, MPI_INT, MPI_SUM, Comm);
177  } while (Continue);
178}
179#endif
180
181GenericFileIO_POSIX::~GenericFileIO_POSIX() {
182  if (FH != -1) close(FH);
183}
184
185void GenericFileIO_POSIX::open(const std::string &FN, bool ForReading) {
186  FileName = FN;
187
188  int flags = ForReading ? O_RDONLY : (O_WRONLY | O_CREAT);
189  int mode = S_IRUSR | S_IWUSR | S_IRGRP;
190  errno = 0;
191  if ((FH = ::open(FileName.c_str(), flags, mode)) == -1)
192    throw runtime_error((!ForReading ? "Unable to create the file: " :
193                                       "Unable to open the file: ") +
194                        FileName + ": " + strerror(errno));
195}
196
197void GenericFileIO_POSIX::setSize(size_t sz) {
198  if (ftruncate(FH, sz) == -1)
199    throw runtime_error("Unable to set size for file: " + FileName);
200}
201
202void GenericFileIO_POSIX::read(void *buf, size_t count, off_t offset,
203                               const std::string &D) {
204  while (count > 0) {
205    ssize_t scount;
206    errno = 0;
207    if ((scount = pread(FH, buf, count, offset)) == -1) {
208      if (errno == EINTR)
209        continue;
210
211      throw runtime_error("Unable to read " + D + " from file: " + FileName +
212                          ": " + strerror(errno));
213    }
214
215    count -= scount;
216    buf = ((char *) buf) + scount;
217    offset += scount;
218  }
219}
220
221void GenericFileIO_POSIX::write(const void *buf, size_t count, off_t offset,
222                                const std::string &D) {
223  while (count > 0) {
224    ssize_t scount;
225    errno = 0;
226    if ((scount = pwrite(FH, buf, count, offset)) == -1) {
227      if (errno == EINTR)
228        continue;
229
230      throw runtime_error("Unable to write " + D + " to file: " + FileName +
231                          ": " + strerror(errno));
232    }
233
234    count -= scount;
235    buf = ((char *) buf) + scount;
236    offset += scount;
237  }
238}
239
240static bool isBigEndian() {
241  const uint32_t one = 1;
242  return !(*((char *)(&one)));
243}
244
245static void bswap(void *v, size_t s) {
246  char *p = (char *) v;
247  for (size_t i = 0; i < s/2; ++i)
248    std::swap(p[i], p[s - (i+1)]);
249}
250
251// Using #pragma pack here, instead of __attribute__((packed)) because xlc, at
252// least as of v12.1, won't take __attribute__((packed)) on non-POD and/or
253// templated types.
254#pragma pack(1)
255
256template <typename T, bool IsBigEndian>
257struct endian_specific_value {
258  operator T() const {
259    T rvalue = value;
260    if (IsBigEndian != isBigEndian())
261      bswap(&rvalue, sizeof(T));
262
263    return rvalue;
264  };
265
266  endian_specific_value &operator = (T nvalue) {
267    if (IsBigEndian != isBigEndian())
268      bswap(&nvalue, sizeof(T));
269
270    value = nvalue;
271    return *this;
272  }
273
274  endian_specific_value &operator += (T nvalue) {
275    *this = *this + nvalue;
276    return *this;
277  }
278
279  endian_specific_value &operator -= (T nvalue) {
280    *this = *this - nvalue;
281    return *this;
282  }
283
284private:
285  T value;
286};
287
288static const size_t CRCSize = 8;
289
290static const size_t MagicSize = 8;
291static const char *MagicBE = "HACC01B";
292static const char *MagicLE = "HACC01L";
293
294template <bool IsBigEndian>
295struct GlobalHeader {
296  char Magic[MagicSize];
297  endian_specific_value<uint64_t, IsBigEndian> HeaderSize;
298  endian_specific_value<uint64_t, IsBigEndian> NElems; // The global total
299  endian_specific_value<uint64_t, IsBigEndian> Dims[3];
300  endian_specific_value<uint64_t, IsBigEndian> NVars;
301  endian_specific_value<uint64_t, IsBigEndian> VarsSize;
302  endian_specific_value<uint64_t, IsBigEndian> VarsStart;
303  endian_specific_value<uint64_t, IsBigEndian> NRanks;
304  endian_specific_value<uint64_t, IsBigEndian> RanksSize;
305  endian_specific_value<uint64_t, IsBigEndian> RanksStart;
306  endian_specific_value<uint64_t, IsBigEndian> GlobalHeaderSize;
307  endian_specific_value<double,   IsBigEndian> PhysOrigin[3];
308  endian_specific_value<double,   IsBigEndian> PhysScale[3];
309  endian_specific_value<uint64_t, IsBigEndian> BlocksSize;
310  endian_specific_value<uint64_t, IsBigEndian> BlocksStart;
311};
312
313enum {
314  FloatValue          = (1 << 0),
315  SignedValue         = (1 << 1),
316  ValueIsPhysCoordX   = (1 << 2),
317  ValueIsPhysCoordY   = (1 << 3),
318  ValueIsPhysCoordZ   = (1 << 4),
319  ValueMaybePhysGhost = (1 << 5)
320};
321
322static const size_t NameSize = 256;
323template <bool IsBigEndian>
324struct VariableHeader {
325  char Name[NameSize];
326  endian_specific_value<uint64_t, IsBigEndian> Flags;
327  endian_specific_value<uint64_t, IsBigEndian> Size;
328  endian_specific_value<uint64_t, IsBigEndian> ElementSize;
329};
330
331template <bool IsBigEndian>
332struct RankHeader {
333  endian_specific_value<uint64_t, IsBigEndian> Coords[3];
334  endian_specific_value<uint64_t, IsBigEndian> NElems;
335  endian_specific_value<uint64_t, IsBigEndian> Start;
336  endian_specific_value<uint64_t, IsBigEndian> GlobalRank;
337};
338
339static const size_t FilterNameSize = 8;
340static const size_t MaxFilters = 4;
341template <bool IsBigEndian>
342struct BlockHeader {
343  char Filters[MaxFilters][FilterNameSize];
344  endian_specific_value<uint64_t, IsBigEndian> Start;
345  endian_specific_value<uint64_t, IsBigEndian> Size;
346};
347
348template <bool IsBigEndian>
349struct CompressHeader {
350  endian_specific_value<uint64_t, IsBigEndian> OrigCRC;
351};
352const char *CompressName = "BLOSC";
353
354#pragma pack()
355
356unsigned GenericIO::DefaultFileIOType = FileIOPOSIX;
357int GenericIO::DefaultPartition = 0;
358bool GenericIO::DefaultShouldCompress = false;
359
360#ifndef GENERICIO_NO_MPI
361std::size_t GenericIO::CollectiveMPIIOThreshold = 0;
362#endif
363
364static bool blosc_initialized = false;
365
366#ifndef GENERICIO_NO_MPI
367void GenericIO::write() {
368  if (isBigEndian())
369    write<true>();
370  else
371    write<false>();
372}
373
374// Note: writing errors are not currently recoverable (one rank may fail
375// while the others don't).
376template <bool IsBigEndian>
377void GenericIO::write() {
378  const char *Magic = IsBigEndian ? MagicBE : MagicLE;
379
380  uint64_t FileSize = 0;
381
382  int NRanks, Rank;
383  MPI_Comm_rank(Comm, &Rank);
384  MPI_Comm_size(Comm, &NRanks);
385
386#ifdef __bgq__
387  MPI_Barrier(Comm);
388#endif
389  MPI_Comm_split(Comm, Partition, Rank, &SplitComm);
390
391  int SplitNRanks, SplitRank;
392  MPI_Comm_rank(SplitComm, &SplitRank);
393  MPI_Comm_size(SplitComm, &SplitNRanks);
394
395  string LocalFileName;
396  if (SplitNRanks != NRanks) {
397    if (Rank == 0) {
398      // In split mode, the specified file becomes the rank map, and the real
399      // data is partitioned.
400
401      vector<int> MapRank, MapPartition;
402      MapRank.resize(NRanks);
403      for (int i = 0; i < NRanks; ++i) MapRank[i] = i;
404
405      MapPartition.resize(NRanks);
406      MPI_Gather(&Partition, 1, MPI_INT, &MapPartition[0], 1, MPI_INT, 0, Comm);
407
408      GenericIO GIO(MPI_COMM_SELF, FileName, FileIOType);
409      GIO.setNumElems(NRanks);
410      GIO.addVariable("$rank", MapRank); /* this is for use by humans; the reading
411                                            code assumes that the partitions are in
412                                            rank order */
413      GIO.addVariable("$partition", MapPartition);
414
415      vector<int> CX, CY, CZ;
416      int TopoStatus;
417      MPI_Topo_test(Comm, &TopoStatus);
418      if (TopoStatus == MPI_CART) {
419        CX.resize(NRanks);
420        CY.resize(NRanks);
421        CZ.resize(NRanks);
422
423        for (int i = 0; i < NRanks; ++i) {
424          int C[3];
425          MPI_Cart_coords(Comm, i, 3, C);
426
427          CX[i] = C[0];
428          CY[i] = C[1];
429          CZ[i] = C[2];
430        }
431
432        GIO.addVariable("$x", CX);
433        GIO.addVariable("$y", CY);
434        GIO.addVariable("$z", CZ);
435      }
436
437      GIO.write();
438    } else {
439      MPI_Gather(&Partition, 1, MPI_INT, 0, 0, MPI_INT, 0, Comm);
440    }
441
442    stringstream ss;
443    ss << FileName << "#" << Partition;
444    LocalFileName = ss.str();
445  } else {
446    LocalFileName = FileName;
447  }
448
449  RankHeader<IsBigEndian> RHLocal;
450  int Dims[3], Periods[3], Coords[3];
451
452  int TopoStatus;
453  MPI_Topo_test(Comm, &TopoStatus);
454  if (TopoStatus == MPI_CART) {
455    MPI_Cart_get(Comm, 3, Dims, Periods, Coords);
456  } else {
457    Dims[0] = NRanks;
458    std::fill(Dims + 1, Dims + 3, 1);
459    std::fill(Periods, Periods + 3, 0);
460    Coords[0] = Rank;
461    std::fill(Coords + 1, Coords + 3, 0);
462  }
463
464  std::copy(Coords, Coords + 3, RHLocal.Coords);
465  RHLocal.NElems = NElems;
466  RHLocal.Start = 0;
467  RHLocal.GlobalRank = Rank;
468
469  bool ShouldCompress = DefaultShouldCompress;
470  const char *EnvStr = getenv("GENERICIO_COMPRESS");
471  if (EnvStr) {
472    int Mod = atoi(EnvStr);
473    ShouldCompress = (Mod > 0);
474  }
475
476  bool NeedsBlockHeaders = ShouldCompress;
477  EnvStr = getenv("GENERICIO_FORCE_BLOCKS");
478  if (!NeedsBlockHeaders && EnvStr) {
479    int Mod = atoi(EnvStr);
480    NeedsBlockHeaders = (Mod > 0);
481  }
482
483  vector<BlockHeader<IsBigEndian> > LocalBlockHeaders;
484  vector<void *> LocalData;
485  vector<bool> LocalHasExtraSpace;
486  vector<vector<unsigned char> > LocalCData;
487  if (NeedsBlockHeaders) {
488    LocalBlockHeaders.resize(Vars.size());
489    LocalData.resize(Vars.size());
490    LocalHasExtraSpace.resize(Vars.size());
491    if (ShouldCompress)
492      LocalCData.resize(Vars.size());
493
494    for (size_t i = 0; i < Vars.size(); ++i) {
495      // Filters null by default, leave null starting address (needs to be
496      // calculated by the header-writing rank).
497      memset(&LocalBlockHeaders[i], 0, sizeof(BlockHeader<IsBigEndian>));
498      if (ShouldCompress) {
499        LocalCData[i].resize(sizeof(CompressHeader<IsBigEndian>));
500
501        CompressHeader<IsBigEndian> *CH = (CompressHeader<IsBigEndian>*) &LocalCData[i][0];
502        CH->OrigCRC = crc64_omp(Vars[i].Data, Vars[i].Size*NElems);
503
504#ifdef _OPENMP
505#pragma omp master
506  {
507#endif
508
509       if (!blosc_initialized) {
510         blosc_init();
511         blosc_initialized = true;
512       }
513
514#ifdef _OPENMP
515       blosc_set_nthreads(omp_get_max_threads());
516  }
517#endif
518
519        LocalCData[i].resize(LocalCData[i].size() + NElems*Vars[i].Size);
520        if (blosc_compress(9, 1, Vars[i].Size, NElems*Vars[i].Size, Vars[i].Data,
521                           &LocalCData[i][0] + sizeof(CompressHeader<IsBigEndian>),
522                           NElems*Vars[i].Size) <= 0)
523          goto nocomp;
524
525        strncpy(LocalBlockHeaders[i].Filters[0], CompressName, FilterNameSize);
526        size_t CNBytes, CCBytes, CBlockSize;
527        blosc_cbuffer_sizes(&LocalCData[i][0] + sizeof(CompressHeader<IsBigEndian>),
528                            &CNBytes, &CCBytes, &CBlockSize);
529        LocalCData[i].resize(CCBytes + sizeof(CompressHeader<IsBigEndian>));
530
531        LocalBlockHeaders[i].Size = LocalCData[i].size();
532        LocalCData[i].resize(LocalCData[i].size() + CRCSize);
533        LocalData[i] = &LocalCData[i][0];
534        LocalHasExtraSpace[i] = true;
535      } else {
536nocomp:
537        LocalBlockHeaders[i].Size = NElems*Vars[i].Size;
538        LocalData[i] = Vars[i].Data;
539        LocalHasExtraSpace[i] = Vars[i].HasExtraSpace;
540      }
541    }
542  }
543
544  double StartTime = MPI_Wtime();
545
546  if (SplitRank == 0) {
547    uint64_t HeaderSize = sizeof(GlobalHeader<IsBigEndian>) + Vars.size()*sizeof(VariableHeader<IsBigEndian>) +
548                          SplitNRanks*sizeof(RankHeader<IsBigEndian>) + CRCSize;
549    if (NeedsBlockHeaders)
550      HeaderSize += SplitNRanks*Vars.size()*sizeof(BlockHeader<IsBigEndian>);
551
552    vector<char> Header(HeaderSize, 0);
553    GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &Header[0];
554    std::copy(Magic, Magic + MagicSize, GH->Magic);
555    GH->HeaderSize = HeaderSize - CRCSize;
556    GH->NElems = NElems; // This will be updated later
557    std::copy(Dims, Dims + 3, GH->Dims);
558    GH->NVars = Vars.size();
559    GH->VarsSize = sizeof(VariableHeader<IsBigEndian>);
560    GH->VarsStart = sizeof(GlobalHeader<IsBigEndian>);
561    GH->NRanks = SplitNRanks;
562    GH->RanksSize = sizeof(RankHeader<IsBigEndian>);
563    GH->RanksStart = GH->VarsStart + Vars.size()*sizeof(VariableHeader<IsBigEndian>);
564    GH->GlobalHeaderSize = sizeof(GlobalHeader<IsBigEndian>);
565    std::copy(PhysOrigin, PhysOrigin + 3, GH->PhysOrigin);
566    std::copy(PhysScale,  PhysScale  + 3, GH->PhysScale);
567    if (!NeedsBlockHeaders) {
568      GH->BlocksSize = GH->BlocksStart = 0;
569    } else {
570      GH->BlocksSize = sizeof(BlockHeader<IsBigEndian>);
571      GH->BlocksStart = GH->RanksStart + SplitNRanks*sizeof(RankHeader<IsBigEndian>);
572    }
573
574    uint64_t RecordSize = 0;
575    VariableHeader<IsBigEndian> *VH = (VariableHeader<IsBigEndian> *) &Header[GH->VarsStart];
576    for (size_t i = 0; i < Vars.size(); ++i, ++VH) {
577      string VName(Vars[i].Name);
578      VName.resize(NameSize);
579
580      std::copy(VName.begin(), VName.end(), VH->Name);
581      uint64_t VFlags = 0;
582      if (Vars[i].IsFloat)  VFlags |= FloatValue;
583      if (Vars[i].IsSigned) VFlags |= SignedValue;
584      if (Vars[i].IsPhysCoordX) VFlags |= ValueIsPhysCoordX;
585      if (Vars[i].IsPhysCoordY) VFlags |= ValueIsPhysCoordY;
586      if (Vars[i].IsPhysCoordZ) VFlags |= ValueIsPhysCoordZ;
587      if (Vars[i].MaybePhysGhost) VFlags |= ValueMaybePhysGhost;
588      VH->Flags = VFlags;
589      RecordSize += VH->Size = Vars[i].Size;
590      VH->ElementSize = Vars[i].ElementSize;
591    }
592
593    MPI_Gather(&RHLocal, sizeof(RHLocal), MPI_BYTE,
594               &Header[GH->RanksStart], sizeof(RHLocal),
595               MPI_BYTE, 0, SplitComm);
596
597    if (NeedsBlockHeaders) {
598      MPI_Gather(&LocalBlockHeaders[0],
599                 Vars.size()*sizeof(BlockHeader<IsBigEndian>), MPI_BYTE,
600                 &Header[GH->BlocksStart],
601                 Vars.size()*sizeof(BlockHeader<IsBigEndian>), MPI_BYTE,
602                 0, SplitComm);
603
604      BlockHeader<IsBigEndian> *BH = (BlockHeader<IsBigEndian> *) &Header[GH->BlocksStart];
605      for (int i = 0; i < SplitNRanks; ++i)
606      for (size_t j = 0; j < Vars.size(); ++j, ++BH) {
607        if (i == 0 && j == 0)
608          BH->Start = HeaderSize;
609        else
610          BH->Start = BH[-1].Start + BH[-1].Size + CRCSize;
611      }
612
613      RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &Header[GH->RanksStart];
614      RH->Start = HeaderSize; ++RH;
615      for (int i = 1; i < SplitNRanks; ++i, ++RH) {
616        RH->Start =
617          ((BlockHeader<IsBigEndian> *) &Header[GH->BlocksStart])[i*Vars.size()].Start;
618        GH->NElems += RH->NElems;
619      }
620
621      // Compute the total file size.
622      uint64_t LastData = BH[-1].Size + CRCSize;
623      FileSize = BH[-1].Start + LastData;
624    } else {
625      RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &Header[GH->RanksStart];
626      RH->Start = HeaderSize; ++RH;
627      for (int i = 1; i < SplitNRanks; ++i, ++RH) {
628        uint64_t PrevNElems = RH[-1].NElems;
629        uint64_t PrevData = PrevNElems*RecordSize + CRCSize*Vars.size();
630        RH->Start = RH[-1].Start + PrevData;
631        GH->NElems += RH->NElems;
632      }
633
634      // Compute the total file size.
635      uint64_t LastNElems = RH[-1].NElems;
636      uint64_t LastData = LastNElems*RecordSize + CRCSize*Vars.size();
637      FileSize = RH[-1].Start + LastData;
638    }
639
640    // Now that the starting offset has been computed, send it back to each rank.
641    MPI_Scatter(&Header[GH->RanksStart], sizeof(RHLocal),
642                MPI_BYTE, &RHLocal, sizeof(RHLocal),
643                MPI_BYTE, 0, SplitComm);
644
645    if (NeedsBlockHeaders)
646      MPI_Scatter(&Header[GH->BlocksStart],
647                  sizeof(BlockHeader<IsBigEndian>)*Vars.size(), MPI_BYTE,
648                  &LocalBlockHeaders[0],
649                  sizeof(BlockHeader<IsBigEndian>)*Vars.size(), MPI_BYTE,
650                  0, SplitComm);
651
652    uint64_t HeaderCRC = crc64_omp(&Header[0], HeaderSize - CRCSize);
653    crc64_invert(HeaderCRC, &Header[HeaderSize - CRCSize]);
654
655    if (FileIOType == FileIOMPI)
656      FH.get() = new GenericFileIO_MPI(MPI_COMM_SELF);
657    else if (FileIOType == FileIOMPICollective)
658      FH.get() = new GenericFileIO_MPICollective(MPI_COMM_SELF);
659    else
660      FH.get() = new GenericFileIO_POSIX();
661
662    FH.get()->open(LocalFileName);
663    FH.get()->setSize(FileSize);
664    FH.get()->write(&Header[0], HeaderSize, 0, "header");
665
666    close();
667  } else {
668    MPI_Gather(&RHLocal, sizeof(RHLocal), MPI_BYTE, 0, 0, MPI_BYTE, 0, SplitComm);
669    if (NeedsBlockHeaders)
670      MPI_Gather(&LocalBlockHeaders[0], Vars.size()*sizeof(BlockHeader<IsBigEndian>),
671                 MPI_BYTE, 0, 0, MPI_BYTE, 0, SplitComm);
672    MPI_Scatter(0, 0, MPI_BYTE, &RHLocal, sizeof(RHLocal), MPI_BYTE, 0, SplitComm);
673    if (NeedsBlockHeaders)
674      MPI_Scatter(0, 0, MPI_BYTE, &LocalBlockHeaders[0], sizeof(BlockHeader<IsBigEndian>)*Vars.size(),
675                  MPI_BYTE, 0, SplitComm);
676  }
677
678  MPI_Barrier(SplitComm);
679
680  if (FileIOType == FileIOMPI)
681    FH.get() = new GenericFileIO_MPI(SplitComm);
682  else if (FileIOType == FileIOMPICollective)
683    FH.get() = new GenericFileIO_MPICollective(SplitComm);
684  else
685    FH.get() = new GenericFileIO_POSIX();
686
687  FH.get()->open(LocalFileName);
688
689  uint64_t Offset = RHLocal.Start;
690  for (size_t i = 0; i < Vars.size(); ++i) {
691    uint64_t WriteSize = NeedsBlockHeaders ?
692                         LocalBlockHeaders[i].Size : NElems*Vars[i].Size;
693    void *Data = NeedsBlockHeaders ? LocalData[i] : Vars[i].Data;
694    uint64_t CRC = crc64_omp(Data, WriteSize);
695    bool HasExtraSpace = NeedsBlockHeaders ?
696                         LocalHasExtraSpace[i] : Vars[i].HasExtraSpace;
697    char *CRCLoc = HasExtraSpace ?  ((char *) Data) + WriteSize : (char *) &CRC;
698
699    if (NeedsBlockHeaders)
700      Offset = LocalBlockHeaders[i].Start;
701
702    // When using extra space for the CRC write, preserve the original contents.
703    char CRCSave[CRCSize];
704    if (HasExtraSpace)
705      std::copy(CRCLoc, CRCLoc + CRCSize, CRCSave);
706
707    crc64_invert(CRC, CRCLoc);
708
709    if (HasExtraSpace) {
710      FH.get()->write(Data, WriteSize + CRCSize, Offset, Vars[i].Name + " with CRC");
711    } else {
712      FH.get()->write(Data, WriteSize, Offset, Vars[i].Name);
713      FH.get()->write(CRCLoc, CRCSize, Offset + WriteSize, Vars[i].Name + " CRC");
714    }
715
716    if (HasExtraSpace)
717       std::copy(CRCSave, CRCSave + CRCSize, CRCLoc);
718
719    Offset += WriteSize + CRCSize;
720  }
721
722  close();
723  MPI_Barrier(Comm);
724
725  double EndTime = MPI_Wtime();
726  double TotalTime = EndTime - StartTime;
727  double MaxTotalTime;
728  MPI_Reduce(&TotalTime, &MaxTotalTime, 1, MPI_DOUBLE, MPI_MAX, 0, Comm);
729
730  if (SplitNRanks != NRanks) {
731    uint64_t ContribFileSize = (SplitRank == 0) ? FileSize : 0;
732    MPI_Reduce(&ContribFileSize, &FileSize, 1, MPI_UINT64_T, MPI_SUM, 0, Comm);
733  }
734
735  if (Rank == 0) {
736    double Rate = ((double) FileSize) / MaxTotalTime / (1024.*1024.);
737    std::cout << "Wrote " << Vars.size() << " variables to " << FileName <<
738                  " (" << FileSize << " bytes) in " << MaxTotalTime << "s: " <<
739                  Rate << " MB/s" << std::endl;
740  }
741
742  MPI_Comm_free(&SplitComm);
743  SplitComm = MPI_COMM_NULL;
744}
745#endif // GENERICIO_NO_MPI
746
747template <bool IsBigEndian>
748void GenericIO::readHeaderLeader(void *GHPtr, MismatchBehavior MB, int NRanks,
749                                 int Rank, int SplitNRanks,
750                                 string &LocalFileName, uint64_t &HeaderSize,
751                                 vector<char> &Header) {
752  GlobalHeader<IsBigEndian> &GH = *(GlobalHeader<IsBigEndian> *) GHPtr;
753
754  if (MB == MismatchDisallowed) {
755    if (SplitNRanks != (int) GH.NRanks) {
756      stringstream ss;
757      ss << "Won't read " << LocalFileName << ": communicator-size mismatch: " <<
758            "current: " << SplitNRanks << ", file: " << GH.NRanks;
759      throw runtime_error(ss.str());
760    }
761
762#ifndef GENERICIO_NO_MPI
763    int TopoStatus;
764    MPI_Topo_test(Comm, &TopoStatus);
765    if (TopoStatus == MPI_CART) {
766      int Dims[3], Periods[3], Coords[3];
767      MPI_Cart_get(Comm, 3, Dims, Periods, Coords);
768
769      bool DimsMatch = true;
770      for (int i = 0; i < 3; ++i) {
771        if ((uint64_t) Dims[i] != GH.Dims[i]) {
772          DimsMatch = false;
773          break;
774        }
775      }
776
777      if (!DimsMatch) {
778        stringstream ss;
779        ss << "Won't read " << LocalFileName <<
780              ": communicator-decomposition mismatch: " <<
781              "current: " << Dims[0] << "x" << Dims[1] << "x" << Dims[2] <<
782              ", file: " << GH.Dims[0] << "x" << GH.Dims[1] << "x" <<
783              GH.Dims[2];
784        throw runtime_error(ss.str());
785      }
786    }
787#endif
788  } else if (MB == MismatchRedistribute && !Redistributing) {
789    Redistributing = true;
790
791    int NFileRanks = RankMap.empty() ? (int) GH.NRanks : (int) RankMap.size();
792    int NFileRanksPerRank = NFileRanks/NRanks;
793    int NRemFileRank = NFileRanks % NRanks;
794
795    if (!NFileRanksPerRank) {
796      // We have only the remainder, so the last NRemFileRank ranks get one
797      // file rank, and the others don't.
798      if (NRemFileRank && NRanks - Rank <= NRemFileRank)
799        SourceRanks.push_back(NRanks - (Rank + 1));
800    } else {
801      // Since NRemFileRank < NRanks, and we don't want to put any extra memory
802      // load on rank 0 (because rank 0's memory load is normally higher than
803      // the other ranks anyway), the last NRemFileRank will each take
804      // (NFileRanksPerRank+1) file ranks.
805
806      int FirstFileRank = 0, LastFileRank = NFileRanksPerRank - 1;
807      for (int i = 1; i <= Rank; ++i) {
808        FirstFileRank = LastFileRank + 1;
809        LastFileRank  = FirstFileRank + NFileRanksPerRank - 1;
810
811        if (NRemFileRank && NRanks - i <= NRemFileRank)
812          ++LastFileRank;
813      }
814
815      for (int i = FirstFileRank; i <= LastFileRank; ++i)
816        SourceRanks.push_back(i);
817    }
818  }
819
820  HeaderSize = GH.HeaderSize;
821  Header.resize(HeaderSize + CRCSize, 0xFE /* poison */);
822  FH.get()->read(&Header[0], HeaderSize + CRCSize, 0, "header");
823
824  uint64_t CRC = crc64_omp(&Header[0], HeaderSize + CRCSize);
825  if (CRC != (uint64_t) -1) {
826    throw runtime_error("Header CRC check failed: " + LocalFileName);
827  }
828}
829
830// Note: Errors from this function should be recoverable. This means that if
831// one rank throws an exception, then all ranks should.
832void GenericIO::openAndReadHeader(MismatchBehavior MB, int EffRank, bool CheckPartMap) {
833  int NRanks, Rank;
834#ifndef GENERICIO_NO_MPI
835  MPI_Comm_rank(Comm, &Rank);
836  MPI_Comm_size(Comm, &NRanks);
837#else
838  Rank = 0;
839  NRanks = 1;
840#endif
841
842  if (EffRank == -1)
843    EffRank = MB == MismatchRedistribute ? 0 : Rank;
844
845  if (RankMap.empty() && CheckPartMap) {
846    // First, check to see if the file is a rank map.
847    unsigned long RanksInMap = 0;
848    if (Rank == 0) {
849      try {
850#ifndef GENERICIO_NO_MPI
851        GenericIO GIO(MPI_COMM_SELF, FileName, FileIOType);
852#else
853        GenericIO GIO(FileName, FileIOType);
854#endif
855        GIO.openAndReadHeader(MismatchDisallowed, 0, false);
856        RanksInMap = GIO.readNumElems();
857
858        RankMap.resize(RanksInMap + GIO.requestedExtraSpace()/sizeof(int));
859        GIO.addVariable("$partition", RankMap, true);
860
861        GIO.readData(0, false);
862        RankMap.resize(RanksInMap);
863      } catch (...) {
864        RankMap.clear();
865        RanksInMap = 0;
866      }
867    }
868
869#ifndef GENERICIO_NO_MPI
870    MPI_Bcast(&RanksInMap, 1, MPI_UNSIGNED_LONG, 0, Comm);
871    if (RanksInMap > 0) {
872      RankMap.resize(RanksInMap);
873      MPI_Bcast(&RankMap[0], RanksInMap, MPI_INT, 0, Comm);
874    }
875#endif
876  }
877
878#ifndef GENERICIO_NO_MPI
879  if (SplitComm != MPI_COMM_NULL)
880    MPI_Comm_free(&SplitComm);
881#endif
882
883  string LocalFileName;
884  if (RankMap.empty()) {
885    LocalFileName = FileName;
886#ifndef GENERICIO_NO_MPI
887    MPI_Comm_dup(MB == MismatchRedistribute ? MPI_COMM_SELF : Comm, &SplitComm);
888#endif
889  } else {
890    stringstream ss;
891    ss << FileName << "#" << RankMap[EffRank];
892    LocalFileName = ss.str();
893#ifndef GENERICIO_NO_MPI
894    if (MB == MismatchRedistribute) {
895      MPI_Comm_dup(MPI_COMM_SELF, &SplitComm);
896    } else {
897#ifdef __bgq__
898      MPI_Barrier(Comm);
899#endif
900      MPI_Comm_split(Comm, RankMap[EffRank], Rank, &SplitComm);
901    }
902#endif
903  }
904
905  if (LocalFileName == OpenFileName)
906    return;
907  FH.close();
908
909  int SplitNRanks, SplitRank;
910#ifndef GENERICIO_NO_MPI
911  MPI_Comm_rank(SplitComm, &SplitRank);
912  MPI_Comm_size(SplitComm, &SplitNRanks);
913#else
914  SplitRank = 0;
915  SplitNRanks = 1;
916#endif
917
918  uint64_t HeaderSize;
919  vector<char> Header;
920
921  if (SplitRank == 0) {
922#ifndef GENERICIO_NO_MPI
923    if (FileIOType == FileIOMPI)
924      FH.get() = new GenericFileIO_MPI(MPI_COMM_SELF);
925    else if (FileIOType == FileIOMPICollective)
926      FH.get() = new GenericFileIO_MPICollective(MPI_COMM_SELF);
927    else
928#endif
929      FH.get() = new GenericFileIO_POSIX();
930
931#ifndef GENERICIO_NO_MPI
932    char True = 1, False = 0;
933#endif
934
935    try {
936      FH.get()->open(LocalFileName, true);
937
938      GlobalHeader<false> GH; // endianness does not matter yet...
939      FH.get()->read(&GH, sizeof(GlobalHeader<false>), 0, "global header");
940
941      if (string(GH.Magic, GH.Magic + MagicSize - 1) == MagicLE) {
942        readHeaderLeader<false>(&GH, MB, NRanks, Rank, SplitNRanks, LocalFileName,
943                                HeaderSize, Header);
944      } else if (string(GH.Magic, GH.Magic + MagicSize - 1) == MagicBE) {
945        readHeaderLeader<true>(&GH, MB, NRanks, Rank, SplitNRanks, LocalFileName,
946                               HeaderSize, Header);
947      } else {
948        string Error = "invalid file-type identifier";
949        throw runtime_error("Won't read " + LocalFileName + ": " + Error);
950      }
951
952#ifndef GENERICIO_NO_MPI
953      close();
954      MPI_Bcast(&True, 1, MPI_BYTE, 0, SplitComm);
955#endif
956    } catch (...) {
957#ifndef GENERICIO_NO_MPI
958      MPI_Bcast(&False, 1, MPI_BYTE, 0, SplitComm);
959#endif
960      close();
961      throw;
962    }
963  } else {
964#ifndef GENERICIO_NO_MPI
965    char Okay;
966    MPI_Bcast(&Okay, 1, MPI_BYTE, 0, SplitComm);
967    if (!Okay)
968      throw runtime_error("Failure broadcast from rank 0");
969#endif
970  }
971
972#ifndef GENERICIO_NO_MPI
973  MPI_Bcast(&HeaderSize, 1, MPI_UINT64_T, 0, SplitComm);
974#endif
975
976  Header.resize(HeaderSize, 0xFD /* poison */);
977#ifndef GENERICIO_NO_MPI
978  MPI_Bcast(&Header[0], HeaderSize, MPI_BYTE, 0, SplitComm);
979#endif
980
981  FH.getHeaderCache().clear();
982
983  GlobalHeader<false> *GH = (GlobalHeader<false> *) &Header[0];
984  FH.setIsBigEndian(string(GH->Magic, GH->Magic + MagicSize - 1) == MagicBE);
985
986  FH.getHeaderCache().swap(Header);
987  OpenFileName = LocalFileName;
988
989#ifndef GENERICIO_NO_MPI
990  if (!DisableCollErrChecking)
991    MPI_Barrier(Comm);
992
993  if (FileIOType == FileIOMPI)
994    FH.get() = new GenericFileIO_MPI(SplitComm);
995  else if (FileIOType == FileIOMPICollective)
996    FH.get() = new GenericFileIO_MPICollective(SplitComm);
997  else
998    FH.get() = new GenericFileIO_POSIX();
999
1000  int OpenErr = 0, TotOpenErr;
1001  try {
1002    FH.get()->open(LocalFileName, true);
1003    MPI_Allreduce(&OpenErr, &TotOpenErr, 1, MPI_INT, MPI_SUM,
1004                  DisableCollErrChecking ? MPI_COMM_SELF : Comm);
1005  } catch (...) {
1006    OpenErr = 1;
1007    MPI_Allreduce(&OpenErr, &TotOpenErr, 1, MPI_INT, MPI_SUM,
1008                  DisableCollErrChecking ? MPI_COMM_SELF : Comm);
1009    throw;
1010  }
1011
1012  if (TotOpenErr > 0) {
1013    stringstream ss;
1014    ss << TotOpenErr << " ranks failed to open file: " << LocalFileName;
1015    throw runtime_error(ss.str());
1016  }
1017#endif
1018}
1019
1020int GenericIO::readNRanks() {
1021  if (FH.isBigEndian())
1022    return readNRanks<true>();
1023  return readNRanks<false>();
1024}
1025
1026template <bool IsBigEndian>
1027int GenericIO::readNRanks() {
1028  if (RankMap.size())
1029    return RankMap.size();
1030
1031  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1032  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1033  return (int) GH->NRanks;
1034}
1035
1036void GenericIO::readDims(int Dims[3]) {
1037  if (FH.isBigEndian())
1038    readDims<true>(Dims);
1039  else
1040    readDims<false>(Dims);
1041}
1042
1043template <bool IsBigEndian>
1044void GenericIO::readDims(int Dims[3]) {
1045  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1046  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1047  std::copy(GH->Dims, GH->Dims + 3, Dims);
1048}
1049
1050uint64_t GenericIO::readTotalNumElems() {
1051  if (FH.isBigEndian())
1052    return readTotalNumElems<true>();
1053  return readTotalNumElems<false>();
1054}
1055
1056template <bool IsBigEndian>
1057uint64_t GenericIO::readTotalNumElems() {
1058  if (RankMap.size())
1059    return (uint64_t) -1;
1060
1061  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1062  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1063  return GH->NElems;
1064}
1065
1066void GenericIO::readPhysOrigin(double Origin[3]) {
1067  if (FH.isBigEndian())
1068    readPhysOrigin<true>(Origin);
1069  else
1070    readPhysOrigin<false>(Origin);
1071}
1072
1073// Define a "safe" version of offsetof (offsetof itself might not work for
1074// non-POD types, and at least xlC v12.1 will complain about this if you try).
1075#define offsetof_safe(S, F) (size_t(&(S)->F) - size_t(S))
1076
1077template <bool IsBigEndian>
1078void GenericIO::readPhysOrigin(double Origin[3]) {
1079  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1080  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1081  if (offsetof_safe(GH, PhysOrigin) >= GH->GlobalHeaderSize) {
1082    std::fill(Origin, Origin + 3, 0.0);
1083    return;
1084  }
1085
1086  std::copy(GH->PhysOrigin, GH->PhysOrigin + 3, Origin);
1087}
1088
1089void GenericIO::readPhysScale(double Scale[3]) {
1090  if (FH.isBigEndian())
1091    readPhysScale<true>(Scale);
1092  else
1093    readPhysScale<false>(Scale);
1094}
1095
1096template <bool IsBigEndian>
1097void GenericIO::readPhysScale(double Scale[3]) {
1098  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1099  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1100  if (offsetof_safe(GH, PhysScale) >= GH->GlobalHeaderSize) {
1101    std::fill(Scale, Scale + 3, 0.0);
1102    return;
1103  }
1104
1105  std::copy(GH->PhysScale, GH->PhysScale + 3, Scale);
1106}
1107
1108template <bool IsBigEndian>
1109static size_t getRankIndex(int EffRank, GlobalHeader<IsBigEndian> *GH,
1110                           vector<int> &RankMap, vector<char> &HeaderCache) {
1111  if (RankMap.empty())
1112    return EffRank;
1113
1114  for (size_t i = 0; i < GH->NRanks; ++i) {
1115    RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &HeaderCache[GH->RanksStart +
1116                                                 i*GH->RanksSize];
1117    if (offsetof_safe(RH, GlobalRank) >= GH->RanksSize)
1118      return EffRank;
1119
1120    if ((int) RH->GlobalRank == EffRank)
1121      return i;
1122  }
1123
1124  assert(false && "Index requested of an invalid rank");
1125  return (size_t) -1;
1126}
1127
1128int GenericIO::readGlobalRankNumber(int EffRank) {
1129  if (FH.isBigEndian())
1130    return readGlobalRankNumber<true>(EffRank);
1131  return readGlobalRankNumber<false>(EffRank);
1132}
1133
1134template <bool IsBigEndian>
1135int GenericIO::readGlobalRankNumber(int EffRank) {
1136  if (EffRank == -1) {
1137#ifndef GENERICIO_NO_MPI
1138    MPI_Comm_rank(Comm, &EffRank);
1139#else
1140    EffRank = 0;
1141#endif
1142  }
1143
1144  openAndReadHeader(MismatchAllowed, EffRank, false);
1145
1146  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1147
1148  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1149  size_t RankIndex = getRankIndex<IsBigEndian>(EffRank, GH, RankMap, FH.getHeaderCache());
1150
1151  assert(RankIndex < GH->NRanks && "Invalid rank specified");
1152
1153  RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->RanksStart +
1154                                               RankIndex*GH->RanksSize];
1155
1156  if (offsetof_safe(RH, GlobalRank) >= GH->RanksSize)
1157    return EffRank;
1158
1159  return (int) RH->GlobalRank;
1160}
1161
1162void GenericIO::getSourceRanks(vector<int> &SR) {
1163  SR.clear();
1164
1165  if (Redistributing) {
1166    std::copy(SourceRanks.begin(), SourceRanks.end(), std::back_inserter(SR));
1167    return;
1168  }
1169
1170  int Rank;
1171#ifndef GENERICIO_NO_MPI
1172  MPI_Comm_rank(Comm, &Rank);
1173#else
1174  Rank = 0;
1175#endif
1176
1177  SR.push_back(Rank);
1178}
1179
1180size_t GenericIO::readNumElems(int EffRank) {
1181  if (EffRank == -1 && Redistributing) {
1182    DisableCollErrChecking = true;
1183
1184    size_t TotalSize = 0;
1185    for (int i = 0, ie = SourceRanks.size(); i != ie; ++i)
1186      TotalSize += readNumElems(SourceRanks[i]);
1187
1188    DisableCollErrChecking = false;
1189    return TotalSize;
1190  }
1191
1192  if (FH.isBigEndian())
1193    return readNumElems<true>(EffRank);
1194  return readNumElems<false>(EffRank);
1195}
1196
1197template <bool IsBigEndian>
1198size_t GenericIO::readNumElems(int EffRank) {
1199  if (EffRank == -1) {
1200#ifndef GENERICIO_NO_MPI
1201    MPI_Comm_rank(Comm, &EffRank);
1202#else
1203    EffRank = 0;
1204#endif
1205  }
1206
1207  openAndReadHeader(Redistributing ? MismatchRedistribute : MismatchAllowed,
1208                    EffRank, false);
1209
1210  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1211
1212  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1213  size_t RankIndex = getRankIndex<IsBigEndian>(EffRank, GH, RankMap, FH.getHeaderCache());
1214
1215  assert(RankIndex < GH->NRanks && "Invalid rank specified");
1216
1217  RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->RanksStart +
1218                                               RankIndex*GH->RanksSize];
1219  return (size_t) RH->NElems;
1220}
1221
1222void GenericIO::readCoords(int Coords[3], int EffRank) {
1223  if (EffRank == -1 && Redistributing) {
1224    std::fill(Coords, Coords + 3, 0);
1225    return;
1226  }
1227
1228  if (FH.isBigEndian())
1229    readCoords<true>(Coords, EffRank);
1230  else
1231    readCoords<false>(Coords, EffRank);
1232}
1233
1234template <bool IsBigEndian>
1235void GenericIO::readCoords(int Coords[3], int EffRank) {
1236  if (EffRank == -1) {
1237#ifndef GENERICIO_NO_MPI
1238    MPI_Comm_rank(Comm, &EffRank);
1239#else
1240    EffRank = 0;
1241#endif
1242  }
1243
1244  openAndReadHeader(MismatchAllowed, EffRank, false);
1245
1246  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1247
1248  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1249  size_t RankIndex = getRankIndex<IsBigEndian>(EffRank, GH, RankMap, FH.getHeaderCache());
1250
1251  assert(RankIndex < GH->NRanks && "Invalid rank specified");
1252
1253  RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->RanksStart +
1254                                               RankIndex*GH->RanksSize];
1255
1256  std::copy(RH->Coords, RH->Coords + 3, Coords);
1257}
1258
1259void GenericIO::readData(int EffRank, bool PrintStats, bool CollStats) {
1260  int Rank;
1261#ifndef GENERICIO_NO_MPI
1262  MPI_Comm_rank(Comm, &Rank);
1263#else
1264  Rank = 0;
1265#endif
1266
1267  uint64_t TotalReadSize = 0;
1268#ifndef GENERICIO_NO_MPI
1269  double StartTime = MPI_Wtime();
1270#else
1271  double StartTime = double(clock())/CLOCKS_PER_SEC;
1272#endif
1273
1274  int NErrs[3] = { 0, 0, 0 };
1275
1276  if (EffRank == -1 && Redistributing) {
1277    DisableCollErrChecking = true;
1278
1279    size_t RowOffset = 0;
1280    for (int i = 0, ie = SourceRanks.size(); i != ie; ++i) {
1281      readData(SourceRanks[i], RowOffset, Rank, TotalReadSize, NErrs);
1282      RowOffset += readNumElems(SourceRanks[i]);
1283    }
1284
1285    DisableCollErrChecking = false;
1286  } else {
1287    readData(EffRank, 0, Rank, TotalReadSize, NErrs);
1288  }
1289
1290  int AllNErrs[3];
1291#ifndef GENERICIO_NO_MPI
1292  MPI_Allreduce(NErrs, AllNErrs, 3, MPI_INT, MPI_SUM, Comm);
1293#else
1294  AllNErrs[0] = NErrs[0]; AllNErrs[1] = NErrs[1]; AllNErrs[2] = NErrs[2];
1295#endif
1296
1297  if (AllNErrs[0] > 0 || AllNErrs[1] > 0 || AllNErrs[2] > 0) {
1298    stringstream ss;
1299    ss << "Experienced " << AllNErrs[0] << " I/O error(s), " <<
1300          AllNErrs[1] << " CRC error(s) and " << AllNErrs[2] <<
1301          " decompression CRC error(s) reading: " << OpenFileName;
1302    throw runtime_error(ss.str());
1303  }
1304
1305#ifndef GENERICIO_NO_MPI
1306  MPI_Barrier(Comm);
1307#endif
1308
1309#ifndef GENERICIO_NO_MPI
1310  double EndTime = MPI_Wtime();
1311#else
1312  double EndTime = double(clock())/CLOCKS_PER_SEC;
1313#endif
1314
1315  double TotalTime = EndTime - StartTime;
1316  double MaxTotalTime;
1317#ifndef GENERICIO_NO_MPI
1318  if (CollStats)
1319    MPI_Reduce(&TotalTime, &MaxTotalTime, 1, MPI_DOUBLE, MPI_MAX, 0, Comm);
1320  else
1321#endif
1322  MaxTotalTime = TotalTime;
1323
1324  uint64_t AllTotalReadSize;
1325#ifndef GENERICIO_NO_MPI
1326  if (CollStats)
1327    MPI_Reduce(&TotalReadSize, &AllTotalReadSize, 1, MPI_UINT64_T, MPI_SUM, 0, Comm);
1328  else
1329#endif
1330  AllTotalReadSize = TotalReadSize;
1331
1332  if (Rank == 0 && PrintStats) {
1333    double Rate = ((double) AllTotalReadSize) / MaxTotalTime / (1024.*1024.);
1334    std::cout << "Read " << Vars.size() << " variables from " << FileName <<
1335                 " (" << AllTotalReadSize << " bytes) in " << MaxTotalTime << "s: " <<
1336                 Rate << " MB/s [excluding header read]" << std::endl;
1337  }
1338}
1339
1340void GenericIO::readData(int EffRank, size_t RowOffset, int Rank,
1341                         uint64_t &TotalReadSize, int NErrs[3]) {
1342  if (FH.isBigEndian())
1343    readData<true>(EffRank, RowOffset, Rank, TotalReadSize, NErrs);
1344  else
1345    readData<false>(EffRank, RowOffset, Rank, TotalReadSize, NErrs);
1346}
1347
1348// Note: Errors from this function should be recoverable. This means that if
1349// one rank throws an exception, then all ranks should.
1350template <bool IsBigEndian>
1351void GenericIO::readData(int EffRank, size_t RowOffset, int Rank,
1352                         uint64_t &TotalReadSize, int NErrs[3]) {
1353  openAndReadHeader(Redistributing ? MismatchRedistribute : MismatchAllowed,
1354                    EffRank, false);
1355
1356  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1357
1358  if (EffRank == -1)
1359    EffRank = Rank;
1360
1361  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1362  size_t RankIndex = getRankIndex<IsBigEndian>(EffRank, GH, RankMap, FH.getHeaderCache());
1363
1364  assert(RankIndex < GH->NRanks && "Invalid rank specified");
1365
1366  RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->RanksStart +
1367                                               RankIndex*GH->RanksSize];
1368
1369  for (size_t i = 0; i < Vars.size(); ++i) {
1370    uint64_t Offset = RH->Start;
1371    bool VarFound = false;
1372    for (uint64_t j = 0; j < GH->NVars; ++j) {
1373      VariableHeader<IsBigEndian> *VH = (VariableHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->VarsStart +
1374                                                           j*GH->VarsSize];
1375
1376      string VName(VH->Name, VH->Name + NameSize);
1377      size_t VNameNull = VName.find('\0');
1378      if (VNameNull < NameSize)
1379        VName.resize(VNameNull);
1380
1381      uint64_t ReadSize = RH->NElems*VH->Size + CRCSize;
1382      if (VName != Vars[i].Name) {
1383        Offset += ReadSize;
1384        continue;
1385      }
1386
1387      size_t ElementSize = VH->Size;
1388      if (offsetof_safe(VH, ElementSize) < GH->VarsSize)
1389        ElementSize = VH->ElementSize;
1390
1391      VarFound = true;
1392      bool IsFloat = (bool) (VH->Flags & FloatValue),
1393           IsSigned = (bool) (VH->Flags & SignedValue);
1394      if (VH->Size != Vars[i].Size) {
1395        stringstream ss;
1396        ss << "Size mismatch for variable " << Vars[i].Name <<
1397              " in: " << OpenFileName << ": current: " << Vars[i].Size <<
1398              ", file: " << VH->Size;
1399        throw runtime_error(ss.str());
1400      } else if (ElementSize != Vars[i].ElementSize) {
1401        stringstream ss;
1402        ss << "Element size mismatch for variable " << Vars[i].Name <<
1403              " in: " << OpenFileName << ": current: " << Vars[i].ElementSize <<
1404              ", file: " << ElementSize;
1405        throw runtime_error(ss.str());
1406      } else if (IsFloat != Vars[i].IsFloat) {
1407        string Float("float"), Int("integer");
1408        stringstream ss;
1409        ss << "Type mismatch for variable " << Vars[i].Name <<
1410              " in: " << OpenFileName << ": current: " <<
1411              (Vars[i].IsFloat ? Float : Int) <<
1412              ", file: " << (IsFloat ? Float : Int);
1413        throw runtime_error(ss.str());
1414      } else if (IsSigned != Vars[i].IsSigned) {
1415        string Signed("signed"), Uns("unsigned");
1416        stringstream ss;
1417        ss << "Type mismatch for variable " << Vars[i].Name <<
1418              " in: " << OpenFileName << ": current: " <<
1419              (Vars[i].IsSigned ? Signed : Uns) <<
1420              ", file: " << (IsSigned ? Signed : Uns);
1421        throw runtime_error(ss.str());
1422      }
1423
1424      size_t VarOffset = RowOffset*Vars[i].Size;
1425      void *VarData = ((char *) Vars[i].Data) + VarOffset;
1426
1427      vector<unsigned char> LData;
1428      void *Data = VarData;
1429      bool HasExtraSpace = Vars[i].HasExtraSpace;
1430      if (offsetof_safe(GH, BlocksStart) < GH->GlobalHeaderSize &&
1431          GH->BlocksSize > 0) {
1432        BlockHeader<IsBigEndian> *BH = (BlockHeader<IsBigEndian> *)
1433          &FH.getHeaderCache()[GH->BlocksStart +
1434                               (RankIndex*GH->NVars + j)*GH->BlocksSize];
1435        ReadSize = BH->Size + CRCSize;
1436        Offset = BH->Start;
1437
1438        if (strncmp(BH->Filters[0], CompressName, FilterNameSize) == 0) {
1439          LData.resize(ReadSize);
1440          Data = &LData[0];
1441          HasExtraSpace = true;
1442        } else if (BH->Filters[0][0] != '\0') {
1443          stringstream ss;
1444          ss << "Unknown filter \"" << BH->Filters[0] << "\" on variable " << Vars[i].Name;
1445          throw runtime_error(ss.str());
1446        }
1447      }
1448
1449      assert(HasExtraSpace && "Extra space required for reading");
1450
1451      char CRCSave[CRCSize];
1452      char *CRCLoc = ((char *) Data) + ReadSize - CRCSize;
1453      if (HasExtraSpace)
1454        std::copy(CRCLoc, CRCLoc + CRCSize, CRCSave);
1455
1456      int Retry = 0;
1457      {
1458        int RetryCount = 300;
1459        const char *EnvStr = getenv("GENERICIO_RETRY_COUNT");
1460        if (EnvStr)
1461          RetryCount = atoi(EnvStr);
1462
1463        int RetrySleep = 100; // ms
1464        EnvStr = getenv("GENERICIO_RETRY_SLEEP");
1465        if (EnvStr)
1466          RetrySleep = atoi(EnvStr);
1467
1468        for (; Retry < RetryCount; ++Retry) {
1469          try {
1470            FH.get()->read(Data, ReadSize, Offset, Vars[i].Name);
1471            break;
1472          } catch (...) { }
1473
1474          usleep(1000*RetrySleep);
1475        }
1476
1477        if (Retry == RetryCount) {
1478          ++NErrs[0];
1479          break;
1480        } else if (Retry > 0) {
1481          EnvStr = getenv("GENERICIO_VERBOSE");
1482          if (EnvStr) {
1483            int Mod = atoi(EnvStr);
1484            if (Mod > 0) {
1485              int Rank;
1486#ifndef GENERICIO_NO_MPI
1487              MPI_Comm_rank(MPI_COMM_WORLD, &Rank);
1488#else
1489              Rank = 0;
1490#endif
1491
1492              std::cerr << "Rank " << Rank << ": " << Retry <<
1493                           " I/O retries were necessary for reading " <<
1494                           Vars[i].Name << " from: " << OpenFileName << "\n";
1495
1496              std::cerr.flush();
1497            }
1498          }
1499        }
1500      }
1501
1502      TotalReadSize += ReadSize;
1503
1504      uint64_t CRC = crc64_omp(Data, ReadSize);
1505      if (CRC != (uint64_t) -1) {
1506        ++NErrs[1];
1507
1508        int Rank;
1509#ifndef GENERICIO_NO_MPI
1510        MPI_Comm_rank(MPI_COMM_WORLD, &Rank);
1511#else
1512        Rank = 0;
1513#endif
1514
1515        // All ranks will do this and have a good time!
1516        string dn = "gio_crc_errors";
1517        mkdir(dn.c_str(), 0777);
1518
1519        srand(time(0));
1520        int DumpNum = rand();
1521        stringstream ssd;
1522        ssd << dn << "/gio_crc_error_dump." << Rank << "." << DumpNum << ".bin";
1523
1524        stringstream ss;
1525        ss << dn << "/gio_crc_error_log." << Rank << ".txt";
1526
1527        ofstream ofs(ss.str().c_str(), ofstream::out | ofstream::app);
1528        ofs << "On-Disk CRC Error Report:\n";
1529        ofs << "Variable: " << Vars[i].Name << "\n";
1530        ofs << "File: " << OpenFileName << "\n";
1531        ofs << "I/O Retries: " << Retry << "\n";
1532        ofs << "Size: " << ReadSize << " bytes\n";
1533        ofs << "Offset: " << Offset << " bytes\n";
1534        ofs << "CRC: " << CRC << " (expected is -1)\n";
1535        ofs << "Dump file: " << ssd.str() << "\n";
1536        ofs << "\n";
1537        ofs.close();
1538
1539        ofstream dofs(ssd.str().c_str(), ofstream::out);
1540        dofs.write((const char *) Data, ReadSize);
1541        dofs.close();
1542
1543        uint64_t RawCRC = crc64_omp(Data, ReadSize - CRCSize);
1544        unsigned char *UData = (unsigned char *) Data;
1545        crc64_invert(RawCRC, &UData[ReadSize - CRCSize]);
1546        uint64_t NewCRC = crc64_omp(Data, ReadSize);
1547        std::cerr << "Recalulated CRC: " << NewCRC << ((NewCRC == -1) ? "ok" : "bad") << "\n";
1548        break;
1549      }
1550
1551      if (HasExtraSpace)
1552        std::copy(CRCSave, CRCSave + CRCSize, CRCLoc);
1553
1554      if (LData.size()) {
1555        CompressHeader<IsBigEndian> *CH = (CompressHeader<IsBigEndian>*) &LData[0];
1556
1557#ifdef _OPENMP
1558#pragma omp master
1559  {
1560#endif
1561
1562       if (!blosc_initialized) {
1563         blosc_init();
1564         blosc_initialized = true;
1565       }
1566
1567#ifdef _OPENMP
1568       blosc_set_nthreads(omp_get_max_threads());
1569  }
1570#endif
1571
1572        blosc_decompress(&LData[0] + sizeof(CompressHeader<IsBigEndian>),
1573                         VarData, Vars[i].Size*RH->NElems);
1574
1575        if (CH->OrigCRC != crc64_omp(VarData, Vars[i].Size*RH->NElems)) {
1576          ++NErrs[2];
1577          break;
1578        }
1579      }
1580
1581      // Byte swap the data if necessary.
1582      if (IsBigEndian != isBigEndian())
1583        for (size_t j = 0;
1584             j < RH->NElems*(Vars[i].Size/Vars[i].ElementSize); ++j) {
1585          char *Offset = ((char *) VarData) + j*Vars[i].ElementSize;
1586          bswap(Offset, Vars[i].ElementSize);
1587        }
1588
1589      break;
1590    }
1591
1592    if (!VarFound)
1593      throw runtime_error("Variable " + Vars[i].Name +
1594                          " not found in: " + OpenFileName);
1595
1596    // This is for debugging.
1597    if (NErrs[0] || NErrs[1] || NErrs[2]) {
1598      const char *EnvStr = getenv("GENERICIO_VERBOSE");
1599      if (EnvStr) {
1600        int Mod = atoi(EnvStr);
1601        if (Mod > 0) {
1602          int Rank;
1603#ifndef GENERICIO_NO_MPI
1604          MPI_Comm_rank(MPI_COMM_WORLD, &Rank);
1605#else
1606          Rank = 0;
1607#endif
1608
1609          std::cerr << "Rank " << Rank << ": " << NErrs[0] << " I/O error(s), " <<
1610          NErrs[1] << " CRC error(s) and " << NErrs[2] <<
1611          " decompression CRC error(s) reading: " << Vars[i].Name <<
1612          " from: " << OpenFileName << "\n";
1613
1614          std::cerr.flush();
1615        }
1616      }
1617    }
1618
1619    if (NErrs[0] || NErrs[1] || NErrs[2])
1620      break;
1621  }
1622}
1623
1624void GenericIO::getVariableInfo(vector<VariableInfo> &VI) {
1625  if (FH.isBigEndian())
1626    getVariableInfo<true>(VI);
1627  else
1628    getVariableInfo<false>(VI);
1629}
1630
1631template <bool IsBigEndian>
1632void GenericIO::getVariableInfo(vector<VariableInfo> &VI) {
1633  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1634
1635  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1636  for (uint64_t j = 0; j < GH->NVars; ++j) {
1637    VariableHeader<IsBigEndian> *VH = (VariableHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->VarsStart +
1638                                                         j*GH->VarsSize];
1639
1640    string VName(VH->Name, VH->Name + NameSize);
1641    size_t VNameNull = VName.find('\0');
1642    if (VNameNull < NameSize)
1643      VName.resize(VNameNull);
1644
1645    size_t ElementSize = VH->Size;
1646    if (offsetof_safe(VH, ElementSize) < GH->VarsSize)
1647      ElementSize = VH->ElementSize;
1648
1649    bool IsFloat = (bool) (VH->Flags & FloatValue),
1650         IsSigned = (bool) (VH->Flags & SignedValue),
1651         IsPhysCoordX = (bool) (VH->Flags & ValueIsPhysCoordX),
1652         IsPhysCoordY = (bool) (VH->Flags & ValueIsPhysCoordY),
1653         IsPhysCoordZ = (bool) (VH->Flags & ValueIsPhysCoordZ),
1654         MaybePhysGhost = (bool) (VH->Flags & ValueMaybePhysGhost);
1655    VI.push_back(VariableInfo(VName, (size_t) VH->Size, IsFloat, IsSigned,
1656                              IsPhysCoordX, IsPhysCoordY, IsPhysCoordZ,
1657                              MaybePhysGhost, ElementSize));
1658  }
1659}
1660
1661void GenericIO::setNaturalDefaultPartition() {
1662#ifdef __bgq__
1663  DefaultPartition = MPIX_IO_link_id();
1664#else
1665#ifndef GENERICIO_NO_MPI
1666  bool UseName = true;
1667  const char *EnvStr = getenv("GENERICIO_PARTITIONS_USE_NAME");
1668  if (EnvStr) {
1669    int Mod = atoi(EnvStr);
1670    UseName = (Mod != 0);
1671  }
1672
1673  if (UseName) {
1674    // This is a heuristic to generate ~256 partitions based on the
1675    // names of the nodes.
1676    char Name[MPI_MAX_PROCESSOR_NAME];
1677    int Len = 0;
1678
1679    MPI_Get_processor_name(Name, &Len);
1680    unsigned char color = 0;
1681    for (int i = 0; i < Len; ++i)
1682      color += (unsigned char) Name[i];
1683
1684    DefaultPartition = color;
1685  }
1686
1687  // This is for debugging.
1688  EnvStr = getenv("GENERICIO_RANK_PARTITIONS");
1689  if (EnvStr) {
1690    int Mod = atoi(EnvStr);
1691    if (Mod > 0) {
1692      int Rank;
1693      MPI_Comm_rank(MPI_COMM_WORLD, &Rank);
1694      DefaultPartition += Rank % Mod;
1695    }
1696  }
1697#endif
1698#endif
1699}
1700
1701} /* END namespace cosmotk */
Note: See TracBrowser for help on using the repository browser.