source: GenericIO.cxx @ 14e73bb

Revision 14e73bb, 51.1 KB checked in by Hal Finkel <hfinkel@…>, 8 years ago (diff)

Misc. improvements to splitting and error handling

  • Property mode set to 100644
Line 
1/*
2 *                    Copyright (C) 2015, UChicago Argonne, LLC
3 *                               All Rights Reserved
4 *
5 *                               Generic IO (ANL-15-066)
6 *                     Hal Finkel, Argonne National Laboratory
7 *
8 *                              OPEN SOURCE LICENSE
9 *
10 * Under the terms of Contract No. DE-AC02-06CH11357 with UChicago Argonne,
11 * LLC, the U.S. Government retains certain rights in this software.
12 *
13 * Redistribution and use in source and binary forms, with or without
14 * modification, are permitted provided that the following conditions are met:
15 *
16 *   1. Redistributions of source code must retain the above copyright notice,
17 *      this list of conditions and the following disclaimer.
18 *
19 *   2. Redistributions in binary form must reproduce the above copyright
20 *      notice, this list of conditions and the following disclaimer in the
21 *      documentation and/or other materials provided with the distribution.
22 *
23 *   3. Neither the names of UChicago Argonne, LLC or the Department of Energy
24 *      nor the names of its contributors may be used to endorse or promote
25 *      products derived from this software without specific prior written
26 *      permission.
27 *
28 * *****************************************************************************
29 *
30 *                                  DISCLAIMER
31 * THE SOFTWARE IS SUPPLIED “AS IS” WITHOUT WARRANTY OF ANY KIND.  NEITHER THE
32 * UNTED STATES GOVERNMENT, NOR THE UNITED STATES DEPARTMENT OF ENERGY, NOR
33 * UCHICAGO ARGONNE, LLC, NOR ANY OF THEIR EMPLOYEES, MAKES ANY WARRANTY,
34 * EXPRESS OR IMPLIED, OR ASSUMES ANY LEGAL LIABILITY OR RESPONSIBILITY FOR THE
35 * ACCURACY, COMPLETENESS, OR USEFULNESS OF ANY INFORMATION, DATA, APPARATUS,
36 * PRODUCT, OR PROCESS DISCLOSED, OR REPRESENTS THAT ITS USE WOULD NOT INFRINGE
37 * PRIVATELY OWNED RIGHTS.
38 *
39 * *****************************************************************************
40 */
41
42#define _XOPEN_SOURCE 600
43#include "CRC64.h"
44#include "GenericIO.h"
45
46extern "C" {
47#include "blosc.h"
48}
49
50#include <sstream>
51#include <fstream>
52#include <stdexcept>
53#include <iterator>
54#include <algorithm>
55#include <cassert>
56#include <cstddef>
57#include <cstring>
58
59#ifndef GENERICIO_NO_MPI
60#include <ctime>
61#endif
62
63#include <sys/types.h>
64#include <sys/stat.h>
65#include <fcntl.h>
66#include <errno.h>
67
68#ifdef __bgq__
69#include <mpix.h>
70#endif
71
72#ifndef MPI_UINT64_T
73#define MPI_UINT64_T (sizeof(long) == 8 ? MPI_LONG : MPI_LONG_LONG)
74#endif
75
76using namespace std;
77
78namespace gio {
79
80
81#ifndef GENERICIO_NO_MPI
82GenericFileIO_MPI::~GenericFileIO_MPI() {
83  (void) MPI_File_close(&FH);
84}
85
86void GenericFileIO_MPI::open(const std::string &FN, bool ForReading) {
87  FileName = FN;
88
89  int amode = ForReading ? MPI_MODE_RDONLY : (MPI_MODE_WRONLY | MPI_MODE_CREATE);
90  if (MPI_File_open(Comm, const_cast<char *>(FileName.c_str()), amode,
91                    MPI_INFO_NULL, &FH) != MPI_SUCCESS)
92    throw runtime_error((!ForReading ? "Unable to create the file: " :
93                                       "Unable to open the file: ") +
94                        FileName);
95}
96
97void GenericFileIO_MPI::setSize(size_t sz) {
98  if (MPI_File_set_size(FH, sz) != MPI_SUCCESS)
99    throw runtime_error("Unable to set size for file: " + FileName);
100}
101
102void GenericFileIO_MPI::read(void *buf, size_t count, off_t offset,
103                             const std::string &D) {
104  while (count > 0) {
105    MPI_Status status;
106    if (MPI_File_read_at(FH, offset, buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
107      throw runtime_error("Unable to read " + D + " from file: " + FileName);
108
109    int scount;
110    (void) MPI_Get_count(&status, MPI_BYTE, &scount);
111
112    count -= scount;
113    buf = ((char *) buf) + scount;
114    offset += scount;
115  }
116}
117
118void GenericFileIO_MPI::write(const void *buf, size_t count, off_t offset,
119                              const std::string &D) {
120  while (count > 0) {
121    MPI_Status status;
122    if (MPI_File_write_at(FH, offset, (void *) buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
123      throw runtime_error("Unable to write " + D + " to file: " + FileName);
124
125    int scount;
126    (void) MPI_Get_count(&status, MPI_BYTE, &scount);
127
128    count -= scount;
129    buf = ((char *) buf) + scount;
130    offset += scount;
131  }
132}
133
134void GenericFileIO_MPICollective::read(void *buf, size_t count, off_t offset,
135                             const std::string &D) {
136  int Continue = 0;
137
138  do {
139    MPI_Status status;
140    if (MPI_File_read_at_all(FH, offset, buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
141      throw runtime_error("Unable to read " + D + " from file: " + FileName);
142
143    int scount;
144    (void) MPI_Get_count(&status, MPI_BYTE, &scount);
145
146    count -= scount;
147    buf = ((char *) buf) + scount;
148    offset += scount;
149
150    int NeedContinue = (count > 0);
151    MPI_Allreduce(&NeedContinue, &Continue, 1, MPI_INT, MPI_SUM, Comm);
152  } while (Continue);
153}
154
155void GenericFileIO_MPICollective::write(const void *buf, size_t count, off_t offset,
156                              const std::string &D) {
157  int Continue = 0;
158
159  do {
160    MPI_Status status;
161    if (MPI_File_write_at_all(FH, offset, (void *) buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
162      throw runtime_error("Unable to write " + D + " to file: " + FileName);
163
164    int scount;
165    (void) MPI_Get_count(&status, MPI_BYTE, &scount);
166
167    count -= scount;
168    buf = ((char *) buf) + scount;
169    offset += scount;
170
171    int NeedContinue = (count > 0);
172    MPI_Allreduce(&NeedContinue, &Continue, 1, MPI_INT, MPI_SUM, Comm);
173  } while (Continue);
174}
175#endif
176
177GenericFileIO_POSIX::~GenericFileIO_POSIX() {
178  if (FH != -1) close(FH);
179}
180
181void GenericFileIO_POSIX::open(const std::string &FN, bool ForReading) {
182  FileName = FN;
183
184  int flags = ForReading ? O_RDONLY : (O_WRONLY | O_CREAT);
185  int mode = S_IRUSR | S_IWUSR | S_IRGRP;
186  errno = 0;
187  if ((FH = ::open(FileName.c_str(), flags, mode)) == -1)
188    throw runtime_error((!ForReading ? "Unable to create the file: " :
189                                       "Unable to open the file: ") +
190                        FileName + ": " + strerror(errno));
191}
192
193void GenericFileIO_POSIX::setSize(size_t sz) {
194  if (ftruncate(FH, sz) == -1)
195    throw runtime_error("Unable to set size for file: " + FileName);
196}
197
198void GenericFileIO_POSIX::read(void *buf, size_t count, off_t offset,
199                               const std::string &D) {
200  while (count > 0) {
201    ssize_t scount;
202    errno = 0;
203    if ((scount = pread(FH, buf, count, offset)) == -1) {
204      if (errno == EINTR)
205        continue;
206
207      throw runtime_error("Unable to read " + D + " from file: " + FileName +
208                          ": " + strerror(errno));
209    }
210
211    count -= scount;
212    buf = ((char *) buf) + scount;
213    offset += scount;
214  }
215}
216
217void GenericFileIO_POSIX::write(const void *buf, size_t count, off_t offset,
218                                const std::string &D) {
219  while (count > 0) {
220    ssize_t scount;
221    errno = 0;
222    if ((scount = pwrite(FH, buf, count, offset)) == -1) {
223      if (errno == EINTR)
224        continue;
225
226      throw runtime_error("Unable to write " + D + " to file: " + FileName +
227                          ": " + strerror(errno));
228    }
229
230    count -= scount;
231    buf = ((char *) buf) + scount;
232    offset += scount;
233  }
234}
235
236static bool isBigEndian() {
237  const uint32_t one = 1;
238  return !(*((char *)(&one)));
239}
240
241static void bswap(void *v, size_t s) {
242  char *p = (char *) v;
243  for (size_t i = 0; i < s/2; ++i)
244    std::swap(p[i], p[s - (i+1)]);
245}
246
247// Using #pragma pack here, instead of __attribute__((packed)) because xlc, at
248// least as of v12.1, won't take __attribute__((packed)) on non-POD and/or
249// templated types.
250#pragma pack(1)
251
252template <typename T, bool IsBigEndian>
253struct endian_specific_value {
254  operator T() const {
255    T rvalue = value;
256    if (IsBigEndian != isBigEndian())
257      bswap(&rvalue, sizeof(T));
258
259    return rvalue;
260  };
261
262  endian_specific_value &operator = (T nvalue) {
263    if (IsBigEndian != isBigEndian())
264      bswap(&nvalue, sizeof(T));
265
266    value = nvalue;
267    return *this;
268  }
269
270  endian_specific_value &operator += (T nvalue) {
271    *this = *this + nvalue;
272    return *this;
273  }
274
275  endian_specific_value &operator -= (T nvalue) {
276    *this = *this - nvalue;
277    return *this;
278  }
279
280private:
281  T value;
282};
283
284static const size_t CRCSize = 8;
285
286static const size_t MagicSize = 8;
287static const char *MagicBE = "HACC01B";
288static const char *MagicLE = "HACC01L";
289
290template <bool IsBigEndian>
291struct GlobalHeader {
292  char Magic[MagicSize];
293  endian_specific_value<uint64_t, IsBigEndian> HeaderSize;
294  endian_specific_value<uint64_t, IsBigEndian> NElems; // The global total
295  endian_specific_value<uint64_t, IsBigEndian> Dims[3];
296  endian_specific_value<uint64_t, IsBigEndian> NVars;
297  endian_specific_value<uint64_t, IsBigEndian> VarsSize;
298  endian_specific_value<uint64_t, IsBigEndian> VarsStart;
299  endian_specific_value<uint64_t, IsBigEndian> NRanks;
300  endian_specific_value<uint64_t, IsBigEndian> RanksSize;
301  endian_specific_value<uint64_t, IsBigEndian> RanksStart;
302  endian_specific_value<uint64_t, IsBigEndian> GlobalHeaderSize;
303  endian_specific_value<double,   IsBigEndian> PhysOrigin[3];
304  endian_specific_value<double,   IsBigEndian> PhysScale[3];
305  endian_specific_value<uint64_t, IsBigEndian> BlocksSize;
306  endian_specific_value<uint64_t, IsBigEndian> BlocksStart;
307};
308
309enum {
310  FloatValue          = (1 << 0),
311  SignedValue         = (1 << 1),
312  ValueIsPhysCoordX   = (1 << 2),
313  ValueIsPhysCoordY   = (1 << 3),
314  ValueIsPhysCoordZ   = (1 << 4),
315  ValueMaybePhysGhost = (1 << 5)
316};
317
318static const size_t NameSize = 256;
319template <bool IsBigEndian>
320struct VariableHeader {
321  char Name[NameSize];
322  endian_specific_value<uint64_t, IsBigEndian> Flags;
323  endian_specific_value<uint64_t, IsBigEndian> Size;
324};
325
326template <bool IsBigEndian>
327struct RankHeader {
328  endian_specific_value<uint64_t, IsBigEndian> Coords[3];
329  endian_specific_value<uint64_t, IsBigEndian> NElems;
330  endian_specific_value<uint64_t, IsBigEndian> Start;
331  endian_specific_value<uint64_t, IsBigEndian> GlobalRank;
332};
333
334static const size_t FilterNameSize = 8;
335static const size_t MaxFilters = 4;
336template <bool IsBigEndian>
337struct BlockHeader {
338  char Filters[MaxFilters][FilterNameSize];
339  endian_specific_value<uint64_t, IsBigEndian> Start;
340  endian_specific_value<uint64_t, IsBigEndian> Size;
341};
342
343template <bool IsBigEndian>
344struct CompressHeader {
345  endian_specific_value<uint64_t, IsBigEndian> OrigCRC;
346};
347const char *CompressName = "BLOSC";
348
349#pragma pack()
350
351unsigned GenericIO::DefaultFileIOType = FileIOPOSIX;
352int GenericIO::DefaultPartition = 0;
353bool GenericIO::DefaultShouldCompress = false;
354
355#ifndef GENERICIO_NO_MPI
356std::size_t GenericIO::CollectiveMPIIOThreshold = 0;
357#endif
358
359static bool blosc_initialized = false;
360
361#ifndef GENERICIO_NO_MPI
362void GenericIO::write() {
363  if (isBigEndian())
364    write<true>();
365  else
366    write<false>();
367}
368
369// Note: writing errors are not currently recoverable (one rank may fail
370// while the others don't).
371template <bool IsBigEndian>
372void GenericIO::write() {
373  const char *Magic = IsBigEndian ? MagicBE : MagicLE;
374
375  uint64_t FileSize = 0;
376
377  int NRanks, Rank;
378  MPI_Comm_rank(Comm, &Rank);
379  MPI_Comm_size(Comm, &NRanks);
380
381#ifdef __bgq__
382  MPI_Barrier(Comm);
383#endif
384  MPI_Comm_split(Comm, Partition, Rank, &SplitComm);
385
386  int SplitNRanks, SplitRank;
387  MPI_Comm_rank(SplitComm, &SplitRank);
388  MPI_Comm_size(SplitComm, &SplitNRanks);
389
390  string LocalFileName;
391  if (SplitNRanks != NRanks) {
392    if (Rank == 0) {
393      // In split mode, the specified file becomes the rank map, and the real
394      // data is partitioned.
395
396      vector<int> MapRank, MapPartition;
397      MapRank.resize(NRanks);
398      for (int i = 0; i < NRanks; ++i) MapRank[i] = i;
399
400      MapPartition.resize(NRanks);
401      MPI_Gather(&Partition, 1, MPI_INT, &MapPartition[0], 1, MPI_INT, 0, Comm);
402
403      GenericIO GIO(MPI_COMM_SELF, FileName, FileIOType);
404      GIO.setNumElems(NRanks);
405      GIO.addVariable("$rank", MapRank); /* this is for use by humans; the reading
406                                            code assumes that the partitions are in
407                                            rank order */
408      GIO.addVariable("$partition", MapPartition);
409
410      vector<int> CX, CY, CZ;
411      int TopoStatus;
412      MPI_Topo_test(Comm, &TopoStatus);
413      if (TopoStatus == MPI_CART) {
414        CX.resize(NRanks);
415        CY.resize(NRanks);
416        CZ.resize(NRanks);
417
418        for (int i = 0; i < NRanks; ++i) {
419          int C[3];
420          MPI_Cart_coords(Comm, i, 3, C);
421
422          CX[i] = C[0];
423          CY[i] = C[1];
424          CZ[i] = C[2];
425        }
426
427        GIO.addVariable("$x", CX);
428        GIO.addVariable("$y", CY);
429        GIO.addVariable("$z", CZ);
430      }
431
432      GIO.write();
433    } else {
434      MPI_Gather(&Partition, 1, MPI_INT, 0, 0, MPI_INT, 0, Comm);
435    }
436
437    stringstream ss;
438    ss << FileName << "#" << Partition;
439    LocalFileName = ss.str();
440  } else {
441    LocalFileName = FileName;
442  }
443
444  RankHeader<IsBigEndian> RHLocal;
445  int Dims[3], Periods[3], Coords[3];
446
447  int TopoStatus;
448  MPI_Topo_test(Comm, &TopoStatus);
449  if (TopoStatus == MPI_CART) {
450    MPI_Cart_get(Comm, 3, Dims, Periods, Coords);
451  } else {
452    Dims[0] = NRanks;
453    std::fill(Dims + 1, Dims + 3, 1);
454    std::fill(Periods, Periods + 3, 0);
455    Coords[0] = Rank;
456    std::fill(Coords + 1, Coords + 3, 0);
457  }
458
459  std::copy(Coords, Coords + 3, RHLocal.Coords);
460  RHLocal.NElems = NElems;
461  RHLocal.Start = 0;
462  RHLocal.GlobalRank = Rank;
463
464  bool ShouldCompress = DefaultShouldCompress;
465  const char *EnvStr = getenv("GENERICIO_COMPRESS");
466  if (EnvStr) {
467    int Mod = atoi(EnvStr);
468    ShouldCompress = (Mod > 0);
469  }
470
471  bool NeedsBlockHeaders = ShouldCompress;
472  EnvStr = getenv("GENERICIO_FORCE_BLOCKS");
473  if (!NeedsBlockHeaders && EnvStr) {
474    int Mod = atoi(EnvStr);
475    NeedsBlockHeaders = (Mod > 0);
476  }
477
478  vector<BlockHeader<IsBigEndian> > LocalBlockHeaders;
479  vector<void *> LocalData;
480  vector<bool> LocalHasExtraSpace;
481  vector<vector<unsigned char> > LocalCData;
482  if (NeedsBlockHeaders) {
483    LocalBlockHeaders.resize(Vars.size());
484    LocalData.resize(Vars.size());
485    LocalHasExtraSpace.resize(Vars.size());
486    if (ShouldCompress)
487      LocalCData.resize(Vars.size());
488
489    for (size_t i = 0; i < Vars.size(); ++i) {
490      // Filters null by default, leave null starting address (needs to be
491      // calculated by the header-writing rank).
492      memset(&LocalBlockHeaders[i], 0, sizeof(BlockHeader<IsBigEndian>));
493      if (ShouldCompress) {
494        LocalCData[i].resize(sizeof(CompressHeader<IsBigEndian>));
495
496        CompressHeader<IsBigEndian> *CH = (CompressHeader<IsBigEndian>*) &LocalCData[i][0];
497        CH->OrigCRC = crc64_omp(Vars[i].Data, Vars[i].Size*NElems);
498
499#ifdef _OPENMP
500#pragma omp master
501  {
502#endif
503
504       if (!blosc_initialized) {
505         blosc_init();
506         blosc_initialized = true;
507       }
508
509#ifdef _OPENMP
510       blosc_set_nthreads(omp_get_max_threads());
511  }
512#endif
513
514        LocalCData[i].resize(LocalCData[i].size() + NElems*Vars[i].Size);
515        if (blosc_compress(9, 1, Vars[i].Size, NElems*Vars[i].Size, Vars[i].Data,
516                           &LocalCData[i][0] + sizeof(CompressHeader<IsBigEndian>),
517                           NElems*Vars[i].Size) <= 0)
518          goto nocomp;
519
520        strncpy(LocalBlockHeaders[i].Filters[0], CompressName, FilterNameSize);
521        size_t CNBytes, CCBytes, CBlockSize;
522        blosc_cbuffer_sizes(&LocalCData[i][0] + sizeof(CompressHeader<IsBigEndian>),
523                            &CNBytes, &CCBytes, &CBlockSize);
524        LocalCData[i].resize(CCBytes + sizeof(CompressHeader<IsBigEndian>));
525
526        LocalBlockHeaders[i].Size = LocalCData[i].size();
527        LocalCData[i].resize(LocalCData[i].size() + CRCSize);
528        LocalData[i] = &LocalCData[i][0];
529        LocalHasExtraSpace[i] = true;
530      } else {
531nocomp:
532        LocalBlockHeaders[i].Size = NElems*Vars[i].Size;
533        LocalData[i] = Vars[i].Data;
534        LocalHasExtraSpace[i] = Vars[i].HasExtraSpace;
535      }
536    }
537  }
538
539  double StartTime = MPI_Wtime();
540
541  if (SplitRank == 0) {
542    uint64_t HeaderSize = sizeof(GlobalHeader<IsBigEndian>) + Vars.size()*sizeof(VariableHeader<IsBigEndian>) +
543                          SplitNRanks*sizeof(RankHeader<IsBigEndian>) + CRCSize;
544    if (NeedsBlockHeaders)
545      HeaderSize += SplitNRanks*Vars.size()*sizeof(BlockHeader<IsBigEndian>);
546
547    vector<char> Header(HeaderSize, 0);
548    GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &Header[0];
549    std::copy(Magic, Magic + MagicSize, GH->Magic);
550    GH->HeaderSize = HeaderSize - CRCSize;
551    GH->NElems = NElems; // This will be updated later
552    std::copy(Dims, Dims + 3, GH->Dims);
553    GH->NVars = Vars.size();
554    GH->VarsSize = sizeof(VariableHeader<IsBigEndian>);
555    GH->VarsStart = sizeof(GlobalHeader<IsBigEndian>);
556    GH->NRanks = SplitNRanks;
557    GH->RanksSize = sizeof(RankHeader<IsBigEndian>);
558    GH->RanksStart = GH->VarsStart + Vars.size()*sizeof(VariableHeader<IsBigEndian>);
559    GH->GlobalHeaderSize = sizeof(GlobalHeader<IsBigEndian>);
560    std::copy(PhysOrigin, PhysOrigin + 3, GH->PhysOrigin);
561    std::copy(PhysScale,  PhysScale  + 3, GH->PhysScale);
562    if (!NeedsBlockHeaders) {
563      GH->BlocksSize = GH->BlocksStart = 0;
564    } else {
565      GH->BlocksSize = sizeof(BlockHeader<IsBigEndian>);
566      GH->BlocksStart = GH->RanksStart + SplitNRanks*sizeof(RankHeader<IsBigEndian>);
567    }
568
569    uint64_t RecordSize = 0;
570    VariableHeader<IsBigEndian> *VH = (VariableHeader<IsBigEndian> *) &Header[GH->VarsStart];
571    for (size_t i = 0; i < Vars.size(); ++i, ++VH) {
572      string VName(Vars[i].Name);
573      VName.resize(NameSize);
574
575      std::copy(VName.begin(), VName.end(), VH->Name);
576      uint64_t VFlags = 0;
577      if (Vars[i].IsFloat)  VFlags |= FloatValue;
578      if (Vars[i].IsSigned) VFlags |= SignedValue;
579      if (Vars[i].IsPhysCoordX) VFlags |= ValueIsPhysCoordX;
580      if (Vars[i].IsPhysCoordY) VFlags |= ValueIsPhysCoordY;
581      if (Vars[i].IsPhysCoordZ) VFlags |= ValueIsPhysCoordZ;
582      if (Vars[i].MaybePhysGhost) VFlags |= ValueMaybePhysGhost;
583      VH->Flags = VFlags;
584      RecordSize += VH->Size = Vars[i].Size;
585    }
586
587    MPI_Gather(&RHLocal, sizeof(RHLocal), MPI_BYTE,
588               &Header[GH->RanksStart], sizeof(RHLocal),
589               MPI_BYTE, 0, SplitComm);
590
591    if (NeedsBlockHeaders) {
592      MPI_Gather(&LocalBlockHeaders[0],
593                 Vars.size()*sizeof(BlockHeader<IsBigEndian>), MPI_BYTE,
594                 &Header[GH->BlocksStart],
595                 Vars.size()*sizeof(BlockHeader<IsBigEndian>), MPI_BYTE,
596                 0, SplitComm);
597
598      BlockHeader<IsBigEndian> *BH = (BlockHeader<IsBigEndian> *) &Header[GH->BlocksStart];
599      for (int i = 0; i < SplitNRanks; ++i)
600      for (size_t j = 0; j < Vars.size(); ++j, ++BH) {
601        if (i == 0 && j == 0)
602          BH->Start = HeaderSize;
603        else
604          BH->Start = BH[-1].Start + BH[-1].Size + CRCSize;
605      }
606
607      RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &Header[GH->RanksStart];
608      RH->Start = HeaderSize; ++RH;
609      for (int i = 1; i < SplitNRanks; ++i, ++RH) {
610        RH->Start =
611          ((BlockHeader<IsBigEndian> *) &Header[GH->BlocksStart])[i*Vars.size()].Start;
612        GH->NElems += RH->NElems;
613      }
614
615      // Compute the total file size.
616      uint64_t LastData = BH[-1].Size + CRCSize;
617      FileSize = BH[-1].Start + LastData;
618    } else {
619      RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &Header[GH->RanksStart];
620      RH->Start = HeaderSize; ++RH;
621      for (int i = 1; i < SplitNRanks; ++i, ++RH) {
622        uint64_t PrevNElems = RH[-1].NElems;
623        uint64_t PrevData = PrevNElems*RecordSize + CRCSize*Vars.size();
624        RH->Start = RH[-1].Start + PrevData;
625        GH->NElems += RH->NElems;
626      }
627
628      // Compute the total file size.
629      uint64_t LastNElems = RH[-1].NElems;
630      uint64_t LastData = LastNElems*RecordSize + CRCSize*Vars.size();
631      FileSize = RH[-1].Start + LastData;
632    }
633
634    // Now that the starting offset has been computed, send it back to each rank.
635    MPI_Scatter(&Header[GH->RanksStart], sizeof(RHLocal),
636                MPI_BYTE, &RHLocal, sizeof(RHLocal),
637                MPI_BYTE, 0, SplitComm);
638
639    if (NeedsBlockHeaders)
640      MPI_Scatter(&Header[GH->BlocksStart],
641                  sizeof(BlockHeader<IsBigEndian>)*Vars.size(), MPI_BYTE,
642                  &LocalBlockHeaders[0],
643                  sizeof(BlockHeader<IsBigEndian>)*Vars.size(), MPI_BYTE,
644                  0, SplitComm);
645
646    uint64_t HeaderCRC = crc64_omp(&Header[0], HeaderSize - CRCSize);
647    crc64_invert(HeaderCRC, &Header[HeaderSize - CRCSize]);
648
649    if (FileIOType == FileIOMPI)
650      FH.get() = new GenericFileIO_MPI(MPI_COMM_SELF);
651    else if (FileIOType == FileIOMPICollective)
652      FH.get() = new GenericFileIO_MPICollective(MPI_COMM_SELF);
653    else
654      FH.get() = new GenericFileIO_POSIX();
655
656    FH.get()->open(LocalFileName);
657    FH.get()->setSize(FileSize);
658    FH.get()->write(&Header[0], HeaderSize, 0, "header");
659
660    close();
661  } else {
662    MPI_Gather(&RHLocal, sizeof(RHLocal), MPI_BYTE, 0, 0, MPI_BYTE, 0, SplitComm);
663    if (NeedsBlockHeaders)
664      MPI_Gather(&LocalBlockHeaders[0], Vars.size()*sizeof(BlockHeader<IsBigEndian>),
665                 MPI_BYTE, 0, 0, MPI_BYTE, 0, SplitComm);
666    MPI_Scatter(0, 0, MPI_BYTE, &RHLocal, sizeof(RHLocal), MPI_BYTE, 0, SplitComm);
667    if (NeedsBlockHeaders)
668      MPI_Scatter(0, 0, MPI_BYTE, &LocalBlockHeaders[0], sizeof(BlockHeader<IsBigEndian>)*Vars.size(),
669                  MPI_BYTE, 0, SplitComm);
670  }
671
672  MPI_Barrier(SplitComm);
673
674  if (FileIOType == FileIOMPI)
675    FH.get() = new GenericFileIO_MPI(SplitComm);
676  else if (FileIOType == FileIOMPICollective)
677    FH.get() = new GenericFileIO_MPICollective(SplitComm);
678  else
679    FH.get() = new GenericFileIO_POSIX();
680
681  FH.get()->open(LocalFileName);
682
683  uint64_t Offset = RHLocal.Start;
684  for (size_t i = 0; i < Vars.size(); ++i) {
685    uint64_t WriteSize = NeedsBlockHeaders ?
686                         LocalBlockHeaders[i].Size : NElems*Vars[i].Size;
687    void *Data = NeedsBlockHeaders ? LocalData[i] : Vars[i].Data;
688    uint64_t CRC = crc64_omp(Data, WriteSize);
689    bool HasExtraSpace = NeedsBlockHeaders ?
690                         LocalHasExtraSpace[i] : Vars[i].HasExtraSpace;
691    char *CRCLoc = HasExtraSpace ?  ((char *) Data) + WriteSize : (char *) &CRC;
692
693    if (NeedsBlockHeaders)
694      Offset = LocalBlockHeaders[i].Start;
695
696    // When using extra space for the CRC write, preserve the original contents.
697    char CRCSave[CRCSize];
698    if (HasExtraSpace)
699      std::copy(CRCLoc, CRCLoc + CRCSize, CRCSave);
700
701    crc64_invert(CRC, CRCLoc);
702
703    if (HasExtraSpace) {
704      FH.get()->write(Data, WriteSize + CRCSize, Offset, Vars[i].Name + " with CRC");
705    } else {
706      FH.get()->write(Data, WriteSize, Offset, Vars[i].Name);
707      FH.get()->write(CRCLoc, CRCSize, Offset + WriteSize, Vars[i].Name + " CRC");
708    }
709
710    if (HasExtraSpace)
711       std::copy(CRCSave, CRCSave + CRCSize, CRCLoc);
712
713    Offset += WriteSize + CRCSize;
714  }
715
716  close();
717  MPI_Barrier(Comm);
718
719  double EndTime = MPI_Wtime();
720  double TotalTime = EndTime - StartTime;
721  double MaxTotalTime;
722  MPI_Reduce(&TotalTime, &MaxTotalTime, 1, MPI_DOUBLE, MPI_MAX, 0, Comm);
723
724  if (SplitNRanks != NRanks) {
725    uint64_t ContribFileSize = (SplitRank == 0) ? FileSize : 0;
726    MPI_Reduce(&ContribFileSize, &FileSize, 1, MPI_UINT64_T, MPI_SUM, 0, Comm);
727  }
728
729  if (Rank == 0) {
730    double Rate = ((double) FileSize) / MaxTotalTime / (1024.*1024.);
731    cout << "Wrote " << Vars.size() << " variables to " << FileName <<
732            " (" << FileSize << " bytes) in " << MaxTotalTime << "s: " <<
733            Rate << " MB/s" << endl;
734  }
735
736  MPI_Comm_free(&SplitComm);
737  SplitComm = MPI_COMM_NULL;
738}
739#endif // GENERICIO_NO_MPI
740
741template <bool IsBigEndian>
742void GenericIO::readHeaderLeader(void *GHPtr, MismatchBehavior MB, int NRanks,
743                                 int Rank, int SplitNRanks,
744                                 string &LocalFileName, uint64_t &HeaderSize,
745                                 vector<char> &Header) {
746  GlobalHeader<IsBigEndian> &GH = *(GlobalHeader<IsBigEndian> *) GHPtr;
747
748  if (MB == MismatchDisallowed) {
749    if (SplitNRanks != (int) GH.NRanks) {
750      stringstream ss;
751      ss << "Won't read " << LocalFileName << ": communicator-size mismatch: " <<
752            "current: " << SplitNRanks << ", file: " << GH.NRanks;
753      throw runtime_error(ss.str());
754    }
755
756#ifndef GENERICIO_NO_MPI
757    int TopoStatus;
758    MPI_Topo_test(Comm, &TopoStatus);
759    if (TopoStatus == MPI_CART) {
760      int Dims[3], Periods[3], Coords[3];
761      MPI_Cart_get(Comm, 3, Dims, Periods, Coords);
762
763      bool DimsMatch = true;
764      for (int i = 0; i < 3; ++i) {
765        if ((uint64_t) Dims[i] != GH.Dims[i]) {
766          DimsMatch = false;
767          break;
768        }
769      }
770
771      if (!DimsMatch) {
772        stringstream ss;
773        ss << "Won't read " << LocalFileName <<
774              ": communicator-decomposition mismatch: " <<
775              "current: " << Dims[0] << "x" << Dims[1] << "x" << Dims[2] <<
776              ", file: " << GH.Dims[0] << "x" << GH.Dims[1] << "x" <<
777              GH.Dims[2];
778        throw runtime_error(ss.str());
779      }
780    }
781#endif
782  } else if (MB == MismatchRedistribute && !Redistributing) {
783    Redistributing = true;
784
785    int NFileRanks = RankMap.empty() ? (int) GH.NRanks : (int) RankMap.size();
786    int NFileRanksPerRank = NFileRanks/NRanks;
787    int NRemFileRank = NFileRanks % NRanks;
788
789    if (!NFileRanksPerRank) {
790      // We have only the remainder, so the last NRemFileRank ranks get one
791      // file rank, and the others don't.
792      if (NRemFileRank && NRanks - Rank <= NRemFileRank)
793        SourceRanks.push_back(NRanks - (Rank + 1));
794    } else {
795      // Since NRemFileRank < NRanks, and we don't want to put any extra memory
796      // load on rank 0 (because rank 0's memory load is normally higher than
797      // the other ranks anyway), the last NRemFileRank will each take
798      // (NFileRanksPerRank+1) file ranks.
799
800      int FirstFileRank = 0, LastFileRank = NFileRanksPerRank - 1;
801      for (int i = 1; i <= Rank; ++i) {
802        FirstFileRank = LastFileRank + 1;
803        LastFileRank  = FirstFileRank + NFileRanksPerRank - 1;
804
805        if (NRemFileRank && NRanks - i <= NRemFileRank)
806          ++LastFileRank;
807      }
808
809      for (int i = FirstFileRank; i <= LastFileRank; ++i)
810        SourceRanks.push_back(i);
811    }
812  }
813
814  HeaderSize = GH.HeaderSize;
815  Header.resize(HeaderSize + CRCSize, 0xFE /* poison */);
816  FH.get()->read(&Header[0], HeaderSize + CRCSize, 0, "header");
817
818  uint64_t CRC = crc64_omp(&Header[0], HeaderSize + CRCSize);
819  if (CRC != (uint64_t) -1) {
820    throw runtime_error("Header CRC check failed: " + LocalFileName);
821  }
822}
823
824// Note: Errors from this function should be recoverable. This means that if
825// one rank throws an exception, then all ranks should.
826void GenericIO::openAndReadHeader(MismatchBehavior MB, int EffRank, bool CheckPartMap) {
827  int NRanks, Rank;
828#ifndef GENERICIO_NO_MPI
829  MPI_Comm_rank(Comm, &Rank);
830  MPI_Comm_size(Comm, &NRanks);
831#else
832  Rank = 0;
833  NRanks = 1;
834#endif
835
836  if (EffRank == -1)
837    EffRank = MB == MismatchRedistribute ? 0 : Rank;
838
839  if (RankMap.empty() && CheckPartMap) {
840    // First, check to see if the file is a rank map.
841    unsigned long RanksInMap = 0;
842    if (Rank == 0) {
843      try {
844#ifndef GENERICIO_NO_MPI
845        GenericIO GIO(MPI_COMM_SELF, FileName, FileIOType);
846#else
847        GenericIO GIO(FileName, FileIOType);
848#endif
849        GIO.openAndReadHeader(MismatchDisallowed, 0, false);
850        RanksInMap = GIO.readNumElems();
851
852        RankMap.resize(RanksInMap + GIO.requestedExtraSpace()/sizeof(int));
853        GIO.addVariable("$partition", RankMap, true);
854
855        GIO.readData(0, false);
856        RankMap.resize(RanksInMap);
857      } catch (...) {
858        RankMap.clear();
859        RanksInMap = 0;
860      }
861    }
862
863#ifndef GENERICIO_NO_MPI
864    MPI_Bcast(&RanksInMap, 1, MPI_UNSIGNED_LONG, 0, Comm);
865    if (RanksInMap > 0) {
866      RankMap.resize(RanksInMap);
867      MPI_Bcast(&RankMap[0], RanksInMap, MPI_INT, 0, Comm);
868    }
869#endif
870  }
871
872#ifndef GENERICIO_NO_MPI
873  if (SplitComm != MPI_COMM_NULL)
874    MPI_Comm_free(&SplitComm);
875#endif
876
877  string LocalFileName;
878  if (RankMap.empty()) {
879    LocalFileName = FileName;
880#ifndef GENERICIO_NO_MPI
881    MPI_Comm_dup(MB == MismatchRedistribute ? MPI_COMM_SELF : Comm, &SplitComm);
882#endif
883  } else {
884    stringstream ss;
885    ss << FileName << "#" << RankMap[EffRank];
886    LocalFileName = ss.str();
887#ifndef GENERICIO_NO_MPI
888    if (MB == MismatchRedistribute) {
889      MPI_Comm_dup(MPI_COMM_SELF, &SplitComm);
890    } else {
891#ifdef __bgq__
892      MPI_Barrier(Comm);
893#endif
894      MPI_Comm_split(Comm, RankMap[EffRank], Rank, &SplitComm);
895    }
896#endif
897  }
898
899  if (LocalFileName == OpenFileName)
900    return;
901  FH.close();
902
903  int SplitNRanks, SplitRank;
904#ifndef GENERICIO_NO_MPI
905  MPI_Comm_rank(SplitComm, &SplitRank);
906  MPI_Comm_size(SplitComm, &SplitNRanks);
907#else
908  SplitRank = 0;
909  SplitNRanks = 1;
910#endif
911
912  uint64_t HeaderSize;
913  vector<char> Header;
914
915  if (SplitRank == 0) {
916#ifndef GENERICIO_NO_MPI
917    if (FileIOType == FileIOMPI)
918      FH.get() = new GenericFileIO_MPI(MPI_COMM_SELF);
919    else if (FileIOType == FileIOMPICollective)
920      FH.get() = new GenericFileIO_MPICollective(MPI_COMM_SELF);
921    else
922#endif
923      FH.get() = new GenericFileIO_POSIX();
924
925#ifndef GENERICIO_NO_MPI
926    char True = 1, False = 0;
927#endif
928
929    try {
930      FH.get()->open(LocalFileName, true);
931
932      GlobalHeader<false> GH; // endianness does not matter yet...
933      FH.get()->read(&GH, sizeof(GlobalHeader<false>), 0, "global header");
934
935      if (string(GH.Magic, GH.Magic + MagicSize - 1) == MagicLE) {
936        readHeaderLeader<false>(&GH, MB, NRanks, Rank, SplitNRanks, LocalFileName,
937                                HeaderSize, Header);
938      } else if (string(GH.Magic, GH.Magic + MagicSize - 1) == MagicBE) {
939        readHeaderLeader<true>(&GH, MB, NRanks, Rank, SplitNRanks, LocalFileName,
940                               HeaderSize, Header);
941      } else {
942        string Error = "invalid file-type identifier";
943        throw runtime_error("Won't read " + LocalFileName + ": " + Error);
944      }
945
946#ifndef GENERICIO_NO_MPI
947      close();
948      MPI_Bcast(&True, 1, MPI_BYTE, 0, SplitComm);
949#endif
950    } catch (...) {
951#ifndef GENERICIO_NO_MPI
952      MPI_Bcast(&False, 1, MPI_BYTE, 0, SplitComm);
953#endif
954      close();
955      throw;
956    }
957  } else {
958#ifndef GENERICIO_NO_MPI
959    char Okay;
960    MPI_Bcast(&Okay, 1, MPI_BYTE, 0, SplitComm);
961    if (!Okay)
962      throw runtime_error("Failure broadcast from rank 0");
963#endif
964  }
965
966#ifndef GENERICIO_NO_MPI
967  MPI_Bcast(&HeaderSize, 1, MPI_UINT64_T, 0, SplitComm);
968#endif
969
970  Header.resize(HeaderSize, 0xFD /* poison */);
971#ifndef GENERICIO_NO_MPI
972  MPI_Bcast(&Header[0], HeaderSize, MPI_BYTE, 0, SplitComm);
973#endif
974
975  FH.getHeaderCache().clear();
976
977  GlobalHeader<false> *GH = (GlobalHeader<false> *) &Header[0];
978  FH.setIsBigEndian(string(GH->Magic, GH->Magic + MagicSize - 1) == MagicBE);
979
980  FH.getHeaderCache().swap(Header);
981  OpenFileName = LocalFileName;
982
983#ifndef GENERICIO_NO_MPI
984  if (!DisableCollErrChecking)
985    MPI_Barrier(Comm);
986
987  if (FileIOType == FileIOMPI)
988    FH.get() = new GenericFileIO_MPI(SplitComm);
989  else if (FileIOType == FileIOMPICollective)
990    FH.get() = new GenericFileIO_MPICollective(SplitComm);
991  else
992    FH.get() = new GenericFileIO_POSIX();
993
994  int OpenErr = 0, TotOpenErr;
995  try {
996    FH.get()->open(LocalFileName, true);
997    MPI_Allreduce(&OpenErr, &TotOpenErr, 1, MPI_INT, MPI_SUM,
998                  DisableCollErrChecking ? MPI_COMM_SELF : Comm);
999  } catch (...) {
1000    OpenErr = 1;
1001    MPI_Allreduce(&OpenErr, &TotOpenErr, 1, MPI_INT, MPI_SUM,
1002                  DisableCollErrChecking ? MPI_COMM_SELF : Comm);
1003    throw;
1004  }
1005
1006  if (TotOpenErr > 0) {
1007    stringstream ss;
1008    ss << TotOpenErr << " ranks failed to open file: " << LocalFileName;
1009    throw runtime_error(ss.str());
1010  }
1011#endif
1012}
1013
1014int GenericIO::readNRanks() {
1015  if (FH.isBigEndian())
1016    return readNRanks<true>();
1017  return readNRanks<false>();
1018}
1019
1020template <bool IsBigEndian>
1021int GenericIO::readNRanks() {
1022  if (RankMap.size())
1023    return RankMap.size();
1024
1025  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1026  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1027  return (int) GH->NRanks;
1028}
1029
1030void GenericIO::readDims(int Dims[3]) {
1031  if (FH.isBigEndian())
1032    readDims<true>(Dims);
1033  else
1034    readDims<false>(Dims);
1035}
1036
1037template <bool IsBigEndian>
1038void GenericIO::readDims(int Dims[3]) {
1039  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1040  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1041  std::copy(GH->Dims, GH->Dims + 3, Dims);
1042}
1043
1044uint64_t GenericIO::readTotalNumElems() {
1045  if (FH.isBigEndian())
1046    return readTotalNumElems<true>();
1047  return readTotalNumElems<false>();
1048}
1049
1050template <bool IsBigEndian>
1051uint64_t GenericIO::readTotalNumElems() {
1052  if (RankMap.size())
1053    return (uint64_t) -1;
1054
1055  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1056  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1057  return GH->NElems;
1058}
1059
1060void GenericIO::readPhysOrigin(double Origin[3]) {
1061  if (FH.isBigEndian())
1062    readPhysOrigin<true>(Origin);
1063  else
1064    readPhysOrigin<false>(Origin);
1065}
1066
1067// Define a "safe" version of offsetof (offsetof itself might not work for
1068// non-POD types, and at least xlC v12.1 will complain about this if you try).
1069#define offsetof_safe(S, F) (size_t(&(S)->F) - size_t(S))
1070
1071template <bool IsBigEndian>
1072void GenericIO::readPhysOrigin(double Origin[3]) {
1073  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1074  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1075  if (offsetof_safe(GH, PhysOrigin) >= GH->GlobalHeaderSize) {
1076    std::fill(Origin, Origin + 3, 0.0);
1077    return;
1078  }
1079
1080  std::copy(GH->PhysOrigin, GH->PhysOrigin + 3, Origin);
1081}
1082
1083void GenericIO::readPhysScale(double Scale[3]) {
1084  if (FH.isBigEndian())
1085    readPhysScale<true>(Scale);
1086  else
1087    readPhysScale<false>(Scale);
1088}
1089
1090template <bool IsBigEndian>
1091void GenericIO::readPhysScale(double Scale[3]) {
1092  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1093  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1094  if (offsetof_safe(GH, PhysScale) >= GH->GlobalHeaderSize) {
1095    std::fill(Scale, Scale + 3, 0.0);
1096    return;
1097  }
1098
1099  std::copy(GH->PhysScale, GH->PhysScale + 3, Scale);
1100}
1101
1102template <bool IsBigEndian>
1103static size_t getRankIndex(int EffRank, GlobalHeader<IsBigEndian> *GH,
1104                           vector<int> &RankMap, vector<char> &HeaderCache) {
1105  if (RankMap.empty())
1106    return EffRank;
1107
1108  for (size_t i = 0; i < GH->NRanks; ++i) {
1109    RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &HeaderCache[GH->RanksStart +
1110                                                 i*GH->RanksSize];
1111    if (offsetof_safe(RH, GlobalRank) >= GH->RanksSize)
1112      return EffRank;
1113
1114    if ((int) RH->GlobalRank == EffRank)
1115      return i;
1116  }
1117
1118  assert(false && "Index requested of an invalid rank");
1119  return (size_t) -1;
1120}
1121
1122int GenericIO::readGlobalRankNumber(int EffRank) {
1123  if (FH.isBigEndian())
1124    return readGlobalRankNumber<true>(EffRank);
1125  return readGlobalRankNumber<false>(EffRank);
1126}
1127
1128template <bool IsBigEndian>
1129int GenericIO::readGlobalRankNumber(int EffRank) {
1130  if (EffRank == -1) {
1131#ifndef GENERICIO_NO_MPI
1132    MPI_Comm_rank(Comm, &EffRank);
1133#else
1134    EffRank = 0;
1135#endif
1136  }
1137
1138  openAndReadHeader(MismatchAllowed, EffRank, false);
1139
1140  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1141
1142  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1143  size_t RankIndex = getRankIndex<IsBigEndian>(EffRank, GH, RankMap, FH.getHeaderCache());
1144
1145  assert(RankIndex < GH->NRanks && "Invalid rank specified");
1146
1147  RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->RanksStart +
1148                                               RankIndex*GH->RanksSize];
1149
1150  if (offsetof_safe(RH, GlobalRank) >= GH->RanksSize)
1151    return EffRank;
1152
1153  return (int) RH->GlobalRank;
1154}
1155
1156void GenericIO::getSourceRanks(vector<int> &SR) {
1157  SR.clear();
1158
1159  if (Redistributing) {
1160    std::copy(SourceRanks.begin(), SourceRanks.end(), std::back_inserter(SR));
1161    return;
1162  }
1163
1164  int Rank;
1165#ifndef GENERICIO_NO_MPI
1166  MPI_Comm_rank(Comm, &Rank);
1167#else
1168  Rank = 0;
1169#endif
1170
1171  SR.push_back(Rank);
1172}
1173
1174size_t GenericIO::readNumElems(int EffRank) {
1175  if (EffRank == -1 && Redistributing) {
1176    DisableCollErrChecking = true;
1177
1178    size_t TotalSize = 0;
1179    for (int i = 0, ie = SourceRanks.size(); i != ie; ++i)
1180      TotalSize += readNumElems(SourceRanks[i]);
1181
1182    DisableCollErrChecking = false;
1183    return TotalSize;
1184  }
1185
1186  if (FH.isBigEndian())
1187    return readNumElems<true>(EffRank);
1188  return readNumElems<false>(EffRank);
1189}
1190
1191template <bool IsBigEndian>
1192size_t GenericIO::readNumElems(int EffRank) {
1193  if (EffRank == -1) {
1194#ifndef GENERICIO_NO_MPI
1195    MPI_Comm_rank(Comm, &EffRank);
1196#else
1197    EffRank = 0;
1198#endif
1199  }
1200
1201  openAndReadHeader(Redistributing ? MismatchRedistribute : MismatchAllowed,
1202                    EffRank, false);
1203
1204  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1205
1206  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1207  size_t RankIndex = getRankIndex<IsBigEndian>(EffRank, GH, RankMap, FH.getHeaderCache());
1208
1209  assert(RankIndex < GH->NRanks && "Invalid rank specified");
1210
1211  RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->RanksStart +
1212                                               RankIndex*GH->RanksSize];
1213  return (size_t) RH->NElems;
1214}
1215
1216void GenericIO::readCoords(int Coords[3], int EffRank) {
1217  if (EffRank == -1 && Redistributing) {
1218    std::fill(Coords, Coords + 3, 0);
1219    return;
1220  }
1221
1222  if (FH.isBigEndian())
1223    readCoords<true>(Coords, EffRank);
1224  else
1225    readCoords<false>(Coords, EffRank);
1226}
1227
1228template <bool IsBigEndian>
1229void GenericIO::readCoords(int Coords[3], int EffRank) {
1230  if (EffRank == -1) {
1231#ifndef GENERICIO_NO_MPI
1232    MPI_Comm_rank(Comm, &EffRank);
1233#else
1234    EffRank = 0;
1235#endif
1236  }
1237
1238  openAndReadHeader(MismatchAllowed, EffRank, false);
1239
1240  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1241
1242  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1243  size_t RankIndex = getRankIndex<IsBigEndian>(EffRank, GH, RankMap, FH.getHeaderCache());
1244
1245  assert(RankIndex < GH->NRanks && "Invalid rank specified");
1246
1247  RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->RanksStart +
1248                                               RankIndex*GH->RanksSize];
1249
1250  std::copy(RH->Coords, RH->Coords + 3, Coords);
1251}
1252
1253void GenericIO::readData(int EffRank, bool PrintStats, bool CollStats) {
1254  int Rank;
1255#ifndef GENERICIO_NO_MPI
1256  MPI_Comm_rank(Comm, &Rank);
1257#else
1258  Rank = 0;
1259#endif
1260
1261  uint64_t TotalReadSize = 0;
1262#ifndef GENERICIO_NO_MPI
1263  double StartTime = MPI_Wtime();
1264#else
1265  double StartTime = double(clock())/CLOCKS_PER_SEC;
1266#endif
1267
1268  int NErrs[3] = { 0, 0, 0 };
1269
1270  if (EffRank == -1 && Redistributing) {
1271    DisableCollErrChecking = true;
1272
1273    size_t RowOffset = 0;
1274    for (int i = 0, ie = SourceRanks.size(); i != ie; ++i) {
1275      readData(SourceRanks[i], RowOffset, Rank, TotalReadSize, NErrs);
1276      RowOffset += readNumElems(SourceRanks[i]);
1277    }
1278
1279    DisableCollErrChecking = false;
1280  } else {
1281    readData(EffRank, 0, Rank, TotalReadSize, NErrs);
1282  }
1283
1284  int AllNErrs[3];
1285#ifndef GENERICIO_NO_MPI
1286  MPI_Allreduce(NErrs, AllNErrs, 3, MPI_INT, MPI_SUM, Comm);
1287#else
1288  AllNErrs[0] = NErrs[0]; AllNErrs[1] = NErrs[1]; AllNErrs[2] = NErrs[2];
1289#endif
1290
1291  if (AllNErrs[0] > 0 || AllNErrs[1] > 0 || AllNErrs[2] > 0) {
1292    stringstream ss;
1293    ss << "Experienced " << AllNErrs[0] << " I/O error(s), " <<
1294          AllNErrs[1] << " CRC error(s) and " << AllNErrs[2] <<
1295          " decompression CRC error(s) reading: " << OpenFileName;
1296    throw runtime_error(ss.str());
1297  }
1298
1299#ifndef GENERICIO_NO_MPI
1300  MPI_Barrier(Comm);
1301#endif
1302
1303#ifndef GENERICIO_NO_MPI
1304  double EndTime = MPI_Wtime();
1305#else
1306  double EndTime = double(clock())/CLOCKS_PER_SEC;
1307#endif
1308
1309  double TotalTime = EndTime - StartTime;
1310  double MaxTotalTime;
1311#ifndef GENERICIO_NO_MPI
1312  if (CollStats)
1313    MPI_Reduce(&TotalTime, &MaxTotalTime, 1, MPI_DOUBLE, MPI_MAX, 0, Comm);
1314  else
1315#endif
1316  MaxTotalTime = TotalTime;
1317
1318  uint64_t AllTotalReadSize;
1319#ifndef GENERICIO_NO_MPI
1320  if (CollStats)
1321    MPI_Reduce(&TotalReadSize, &AllTotalReadSize, 1, MPI_UINT64_T, MPI_SUM, 0, Comm);
1322  else
1323#endif
1324  AllTotalReadSize = TotalReadSize;
1325
1326  if (Rank == 0 && PrintStats) {
1327    double Rate = ((double) AllTotalReadSize) / MaxTotalTime / (1024.*1024.);
1328    cout << "Read " << Vars.size() << " variables from " << FileName <<
1329            " (" << AllTotalReadSize << " bytes) in " << MaxTotalTime << "s: " <<
1330            Rate << " MB/s [excluding header read]" << endl;
1331  }
1332}
1333
1334void GenericIO::readData(int EffRank, size_t RowOffset, int Rank,
1335                         uint64_t &TotalReadSize, int NErrs[3]) {
1336  if (FH.isBigEndian())
1337    readData<true>(EffRank, RowOffset, Rank, TotalReadSize, NErrs);
1338  else
1339    readData<false>(EffRank, RowOffset, Rank, TotalReadSize, NErrs);
1340}
1341
1342// Note: Errors from this function should be recoverable. This means that if
1343// one rank throws an exception, then all ranks should.
1344template <bool IsBigEndian>
1345void GenericIO::readData(int EffRank, size_t RowOffset, int Rank,
1346                         uint64_t &TotalReadSize, int NErrs[3]) {
1347  openAndReadHeader(Redistributing ? MismatchRedistribute : MismatchAllowed,
1348                    EffRank, false);
1349
1350  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1351
1352  if (EffRank == -1)
1353    EffRank = Rank;
1354
1355  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1356  size_t RankIndex = getRankIndex<IsBigEndian>(EffRank, GH, RankMap, FH.getHeaderCache());
1357
1358  assert(RankIndex < GH->NRanks && "Invalid rank specified");
1359
1360  RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->RanksStart +
1361                                               RankIndex*GH->RanksSize];
1362
1363  for (size_t i = 0; i < Vars.size(); ++i) {
1364    uint64_t Offset = RH->Start;
1365    bool VarFound = false;
1366    for (uint64_t j = 0; j < GH->NVars; ++j) {
1367      VariableHeader<IsBigEndian> *VH = (VariableHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->VarsStart +
1368                                                           j*GH->VarsSize];
1369
1370      string VName(VH->Name, VH->Name + NameSize);
1371      size_t VNameNull = VName.find('\0');
1372      if (VNameNull < NameSize)
1373        VName.resize(VNameNull);
1374
1375      uint64_t ReadSize = RH->NElems*VH->Size + CRCSize;
1376      if (VName != Vars[i].Name) {
1377        Offset += ReadSize;
1378        continue;
1379      }
1380
1381      VarFound = true;
1382      bool IsFloat = (bool) (VH->Flags & FloatValue),
1383           IsSigned = (bool) (VH->Flags & SignedValue);
1384      if (VH->Size != Vars[i].Size) {
1385        stringstream ss;
1386        ss << "Size mismatch for variable " << Vars[i].Name <<
1387              " in: " << OpenFileName << ": current: " << Vars[i].Size <<
1388              ", file: " << VH->Size;
1389        throw runtime_error(ss.str());
1390      } else if (IsFloat != Vars[i].IsFloat) {
1391        string Float("float"), Int("integer");
1392        stringstream ss;
1393        ss << "Type mismatch for variable " << Vars[i].Name <<
1394              " in: " << OpenFileName << ": current: " <<
1395              (Vars[i].IsFloat ? Float : Int) <<
1396              ", file: " << (IsFloat ? Float : Int);
1397        throw runtime_error(ss.str());
1398      } else if (IsSigned != Vars[i].IsSigned) {
1399        string Signed("signed"), Uns("unsigned");
1400        stringstream ss;
1401        ss << "Type mismatch for variable " << Vars[i].Name <<
1402              " in: " << OpenFileName << ": current: " <<
1403              (Vars[i].IsSigned ? Signed : Uns) <<
1404              ", file: " << (IsSigned ? Signed : Uns);
1405        throw runtime_error(ss.str());
1406      }
1407
1408      size_t VarOffset = RowOffset*Vars[i].Size;
1409      void *VarData = ((char *) Vars[i].Data) + VarOffset;
1410
1411      vector<unsigned char> LData;
1412      void *Data = VarData;
1413      bool HasExtraSpace = Vars[i].HasExtraSpace;
1414      if (offsetof_safe(GH, BlocksStart) < GH->GlobalHeaderSize &&
1415          GH->BlocksSize > 0) {
1416        BlockHeader<IsBigEndian> *BH = (BlockHeader<IsBigEndian> *)
1417          &FH.getHeaderCache()[GH->BlocksStart +
1418                               (RankIndex*GH->NVars + j)*GH->BlocksSize];
1419        ReadSize = BH->Size + CRCSize;
1420        Offset = BH->Start;
1421
1422        if (strncmp(BH->Filters[0], CompressName, FilterNameSize) == 0) {
1423          LData.resize(ReadSize);
1424          Data = &LData[0];
1425          HasExtraSpace = true;
1426        } else if (BH->Filters[0][0] != '\0') {
1427          stringstream ss;
1428          ss << "Unknown filter \"" << BH->Filters[0] << "\" on variable " << Vars[i].Name;
1429          throw runtime_error(ss.str());
1430        }
1431      }
1432
1433      assert(HasExtraSpace && "Extra space required for reading");
1434
1435      char CRCSave[CRCSize];
1436      char *CRCLoc = ((char *) Data) + ReadSize - CRCSize;
1437      if (HasExtraSpace)
1438        std::copy(CRCLoc, CRCLoc + CRCSize, CRCSave);
1439
1440      int Retry = 0;
1441      {
1442        int RetryCount = 300;
1443        const char *EnvStr = getenv("GENERICIO_RETRY_COUNT");
1444        if (EnvStr)
1445          RetryCount = atoi(EnvStr);
1446
1447        int RetrySleep = 100; // ms
1448        EnvStr = getenv("GENERICIO_RETRY_SLEEP");
1449        if (EnvStr)
1450          RetrySleep = atoi(EnvStr);
1451
1452        for (; Retry < RetryCount; ++Retry) {
1453          try {
1454            FH.get()->read(Data, ReadSize, Offset, Vars[i].Name);
1455            break;
1456          } catch (...) { }
1457
1458          usleep(1000*RetrySleep);
1459        }
1460
1461        if (Retry == RetryCount) {
1462          ++NErrs[0];
1463          break;
1464        } else if (Retry > 0) {
1465          EnvStr = getenv("GENERICIO_VERBOSE");
1466          if (EnvStr) {
1467            int Mod = atoi(EnvStr);
1468            if (Mod > 0) {
1469              int Rank;
1470#ifndef GENERICIO_NO_MPI
1471              MPI_Comm_rank(MPI_COMM_WORLD, &Rank);
1472#else
1473              Rank = 0;
1474#endif
1475
1476              std::cerr << "Rank " << Rank << ": " << Retry <<
1477                           " I/O retries were necessary for reading " <<
1478                           Vars[i].Name << " from: " << OpenFileName << "\n";
1479
1480              std::cerr.flush();
1481            }
1482          }
1483        }
1484      }
1485
1486      TotalReadSize += ReadSize;
1487
1488      uint64_t CRC = crc64_omp(Data, ReadSize);
1489      if (CRC != (uint64_t) -1) {
1490        ++NErrs[1];
1491
1492        int Rank;
1493#ifndef GENERICIO_NO_MPI
1494        MPI_Comm_rank(MPI_COMM_WORLD, &Rank);
1495#else
1496        Rank = 0;
1497#endif
1498
1499        // All ranks will do this and have a good time!
1500        string dn = "gio_crc_errors";
1501        mkdir(dn.c_str(), 0777);
1502
1503        srand(time(0));
1504        int DumpNum = rand();
1505        stringstream ssd;
1506        ssd << dn << "/gio_crc_error_dump." << Rank << "." << DumpNum << ".bin";
1507
1508        stringstream ss;
1509        ss << dn << "/gio_crc_error_log." << Rank << ".txt";
1510
1511        ofstream ofs(ss.str().c_str(), ofstream::out | ofstream::app);
1512        ofs << "On-Disk CRC Error Report:\n";
1513        ofs << "Variable: " << Vars[i].Name << "\n";
1514        ofs << "File: " << OpenFileName << "\n";
1515        ofs << "I/O Retries: " << Retry << "\n";
1516        ofs << "Size: " << ReadSize << " bytes\n";
1517        ofs << "Offset: " << Offset << " bytes\n";
1518        ofs << "CRC: " << CRC << " (expected is -1)\n";
1519        ofs << "Dump file: " << ssd.str() << "\n";
1520        ofs << "\n";
1521        ofs.close();
1522
1523        ofstream dofs(ssd.str().c_str(), ofstream::out);
1524        dofs.write((const char *) Data, ReadSize);
1525        dofs.close();
1526
1527        uint64_t RawCRC = crc64_omp(Data, ReadSize - CRCSize);
1528        unsigned char *UData = (unsigned char *) Data;
1529        crc64_invert(RawCRC, &UData[ReadSize - CRCSize]);
1530        uint64_t NewCRC = crc64_omp(Data, ReadSize);
1531        std::cerr << "Recalulated CRC: " << NewCRC << ((NewCRC == -1) ? "ok" : "bad") << "\n";
1532        break;
1533      }
1534
1535      if (HasExtraSpace)
1536        std::copy(CRCSave, CRCSave + CRCSize, CRCLoc);
1537
1538      if (LData.size()) {
1539        CompressHeader<IsBigEndian> *CH = (CompressHeader<IsBigEndian>*) &LData[0];
1540
1541#ifdef _OPENMP
1542#pragma omp master
1543  {
1544#endif
1545
1546       if (!blosc_initialized) {
1547         blosc_init();
1548         blosc_initialized = true;
1549       }
1550
1551#ifdef _OPENMP
1552       blosc_set_nthreads(omp_get_max_threads());
1553  }
1554#endif
1555
1556        blosc_decompress(&LData[0] + sizeof(CompressHeader<IsBigEndian>),
1557                         VarData, Vars[i].Size*RH->NElems);
1558
1559        if (CH->OrigCRC != crc64_omp(VarData, Vars[i].Size*RH->NElems)) {
1560          ++NErrs[2];
1561          break;
1562        }
1563      }
1564
1565      // Byte swap the data if necessary.
1566      if (IsBigEndian != isBigEndian())
1567        for (size_t j = 0; j < RH->NElems; ++j) {
1568          char *Offset = ((char *) VarData) + j*Vars[i].Size;
1569          bswap(Offset, Vars[i].Size);
1570        }
1571
1572      break;
1573    }
1574
1575    if (!VarFound)
1576      throw runtime_error("Variable " + Vars[i].Name +
1577                          " not found in: " + OpenFileName);
1578
1579    // This is for debugging.
1580    if (NErrs[0] || NErrs[1] || NErrs[2]) {
1581      const char *EnvStr = getenv("GENERICIO_VERBOSE");
1582      if (EnvStr) {
1583        int Mod = atoi(EnvStr);
1584        if (Mod > 0) {
1585          int Rank;
1586#ifndef GENERICIO_NO_MPI
1587          MPI_Comm_rank(MPI_COMM_WORLD, &Rank);
1588#else
1589          Rank = 0;
1590#endif
1591
1592          std::cerr << "Rank " << Rank << ": " << NErrs[0] << " I/O error(s), " <<
1593          NErrs[1] << " CRC error(s) and " << NErrs[2] <<
1594          " decompression CRC error(s) reading: " << Vars[i].Name <<
1595          " from: " << OpenFileName << "\n";
1596
1597          std::cerr.flush();
1598        }
1599      }
1600    }
1601
1602    if (NErrs[0] || NErrs[1] || NErrs[2])
1603      break;
1604  }
1605}
1606
1607void GenericIO::getVariableInfo(vector<VariableInfo> &VI) {
1608  if (FH.isBigEndian())
1609    getVariableInfo<true>(VI);
1610  else
1611    getVariableInfo<false>(VI);
1612}
1613
1614template <bool IsBigEndian>
1615void GenericIO::getVariableInfo(vector<VariableInfo> &VI) {
1616  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1617
1618  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1619  for (uint64_t j = 0; j < GH->NVars; ++j) {
1620    VariableHeader<IsBigEndian> *VH = (VariableHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->VarsStart +
1621                                                         j*GH->VarsSize];
1622
1623    string VName(VH->Name, VH->Name + NameSize);
1624    size_t VNameNull = VName.find('\0');
1625    if (VNameNull < NameSize)
1626      VName.resize(VNameNull);
1627
1628    bool IsFloat = (bool) (VH->Flags & FloatValue),
1629         IsSigned = (bool) (VH->Flags & SignedValue),
1630         IsPhysCoordX = (bool) (VH->Flags & ValueIsPhysCoordX),
1631         IsPhysCoordY = (bool) (VH->Flags & ValueIsPhysCoordY),
1632         IsPhysCoordZ = (bool) (VH->Flags & ValueIsPhysCoordZ),
1633         MaybePhysGhost = (bool) (VH->Flags & ValueMaybePhysGhost);
1634    VI.push_back(VariableInfo(VName, (size_t) VH->Size, IsFloat, IsSigned,
1635                              IsPhysCoordX, IsPhysCoordY, IsPhysCoordZ,
1636                              MaybePhysGhost));
1637  }
1638}
1639
1640void GenericIO::setNaturalDefaultPartition() {
1641#ifdef __bgq__
1642  DefaultPartition = MPIX_IO_link_id();
1643#else
1644#ifndef GENERICIO_NO_MPI
1645  bool UseName = true;
1646  const char *EnvStr = getenv("GENERICIO_PARTITIONS_USE_NAME");
1647  if (EnvStr) {
1648    int Mod = atoi(EnvStr);
1649    UseName = (Mod != 0);
1650  }
1651
1652  if (UseName) {
1653    // This is a heuristic to generate ~256 partitions based on the
1654    // names of the nodes.
1655    char Name[MPI_MAX_PROCESSOR_NAME];
1656    int Len = 0;
1657
1658    MPI_Get_processor_name(Name, &Len);
1659    unsigned char color = 0;
1660    for (int i = 0; i < Len; ++i)
1661      color += (unsigned char) Name[i];
1662
1663    DefaultPartition = color;
1664  }
1665
1666  // This is for debugging.
1667  EnvStr = getenv("GENERICIO_RANK_PARTITIONS");
1668  if (EnvStr) {
1669    int Mod = atoi(EnvStr);
1670    if (Mod > 0) {
1671      int Rank;
1672      MPI_Comm_rank(MPI_COMM_WORLD, &Rank);
1673      DefaultPartition += Rank % Mod;
1674    }
1675  }
1676#endif
1677#endif
1678}
1679
1680} /* END namespace cosmotk */
Note: See TracBrowser for help on using the repository browser.