source: GenericIO.cxx @ 22c1c0a

Revision 22c1c0a, 56.0 KB checked in by Hal Finkel <hfinkel@…>, 6 years ago (diff)

Allow Blosc to increase the size over the sz output (only a little bit, hopefully)

  • Property mode set to 100644
RevLine 
[da65757]1/*
2 *                    Copyright (C) 2015, UChicago Argonne, LLC
3 *                               All Rights Reserved
4 *
5 *                               Generic IO (ANL-15-066)
6 *                     Hal Finkel, Argonne National Laboratory
7 *
8 *                              OPEN SOURCE LICENSE
9 *
10 * Under the terms of Contract No. DE-AC02-06CH11357 with UChicago Argonne,
11 * LLC, the U.S. Government retains certain rights in this software.
12 *
13 * Redistribution and use in source and binary forms, with or without
14 * modification, are permitted provided that the following conditions are met:
15 *
16 *   1. Redistributions of source code must retain the above copyright notice,
17 *      this list of conditions and the following disclaimer.
18 *
19 *   2. Redistributions in binary form must reproduce the above copyright
20 *      notice, this list of conditions and the following disclaimer in the
21 *      documentation and/or other materials provided with the distribution.
22 *
23 *   3. Neither the names of UChicago Argonne, LLC or the Department of Energy
24 *      nor the names of its contributors may be used to endorse or promote
25 *      products derived from this software without specific prior written
26 *      permission.
27 *
28 * *****************************************************************************
29 *
30 *                                  DISCLAIMER
31 * THE SOFTWARE IS SUPPLIED “AS IS” WITHOUT WARRANTY OF ANY KIND.  NEITHER THE
32 * UNTED STATES GOVERNMENT, NOR THE UNITED STATES DEPARTMENT OF ENERGY, NOR
33 * UCHICAGO ARGONNE, LLC, NOR ANY OF THEIR EMPLOYEES, MAKES ANY WARRANTY,
34 * EXPRESS OR IMPLIED, OR ASSUMES ANY LEGAL LIABILITY OR RESPONSIBILITY FOR THE
35 * ACCURACY, COMPLETENESS, OR USEFULNESS OF ANY INFORMATION, DATA, APPARATUS,
36 * PRODUCT, OR PROCESS DISCLOSED, OR REPRESENTS THAT ITS USE WOULD NOT INFRINGE
37 * PRIVATELY OWNED RIGHTS.
38 *
39 * *****************************************************************************
40 */
41
[00587dc]42#define _XOPEN_SOURCE 600
43#include "CRC64.h"
44#include "GenericIO.h"
45
46extern "C" {
47#include "blosc.h"
48}
[6461f57]49#include "sz.h"
[00587dc]50
51#include <sstream>
52#include <fstream>
53#include <stdexcept>
[14e73bb]54#include <iterator>
[00587dc]55#include <algorithm>
56#include <cassert>
57#include <cstddef>
58#include <cstring>
59
60#ifndef GENERICIO_NO_MPI
61#include <ctime>
62#endif
63
64#include <sys/types.h>
65#include <sys/stat.h>
66#include <fcntl.h>
67#include <errno.h>
68
69#ifdef __bgq__
[14e73bb]70#include <mpix.h>
[00587dc]71#endif
72
73#ifndef MPI_UINT64_T
74#define MPI_UINT64_T (sizeof(long) == 8 ? MPI_LONG : MPI_LONG_LONG)
75#endif
76
77using namespace std;
78
79namespace gio {
80
81
82#ifndef GENERICIO_NO_MPI
83GenericFileIO_MPI::~GenericFileIO_MPI() {
84  (void) MPI_File_close(&FH);
85}
86
87void GenericFileIO_MPI::open(const std::string &FN, bool ForReading) {
88  FileName = FN;
89
90  int amode = ForReading ? MPI_MODE_RDONLY : (MPI_MODE_WRONLY | MPI_MODE_CREATE);
91  if (MPI_File_open(Comm, const_cast<char *>(FileName.c_str()), amode,
92                    MPI_INFO_NULL, &FH) != MPI_SUCCESS)
93    throw runtime_error((!ForReading ? "Unable to create the file: " :
94                                       "Unable to open the file: ") +
95                        FileName);
96}
97
98void GenericFileIO_MPI::setSize(size_t sz) {
99  if (MPI_File_set_size(FH, sz) != MPI_SUCCESS)
100    throw runtime_error("Unable to set size for file: " + FileName);
101}
102
103void GenericFileIO_MPI::read(void *buf, size_t count, off_t offset,
104                             const std::string &D) {
105  while (count > 0) {
106    MPI_Status status;
107    if (MPI_File_read_at(FH, offset, buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
108      throw runtime_error("Unable to read " + D + " from file: " + FileName);
109
110    int scount;
111    (void) MPI_Get_count(&status, MPI_BYTE, &scount);
112
113    count -= scount;
114    buf = ((char *) buf) + scount;
115    offset += scount;
116  }
117}
118
119void GenericFileIO_MPI::write(const void *buf, size_t count, off_t offset,
120                              const std::string &D) {
121  while (count > 0) {
122    MPI_Status status;
123    if (MPI_File_write_at(FH, offset, (void *) buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
124      throw runtime_error("Unable to write " + D + " to file: " + FileName);
125
[56b997e]126    int scount = 0;
127    // On some systems, MPI_Get_count will not return zero even when count is zero.
128    if (count > 0)
129      (void) MPI_Get_count(&status, MPI_BYTE, &scount);
[00587dc]130
131    count -= scount;
132    buf = ((char *) buf) + scount;
133    offset += scount;
134  }
135}
136
137void GenericFileIO_MPICollective::read(void *buf, size_t count, off_t offset,
138                             const std::string &D) {
139  int Continue = 0;
140
141  do {
142    MPI_Status status;
143    if (MPI_File_read_at_all(FH, offset, buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
144      throw runtime_error("Unable to read " + D + " from file: " + FileName);
145
[56b997e]146    int scount = 0;
147    // On some systems, MPI_Get_count will not return zero even when count is zero.
148    if (count > 0)
149      (void) MPI_Get_count(&status, MPI_BYTE, &scount);
[00587dc]150
151    count -= scount;
152    buf = ((char *) buf) + scount;
153    offset += scount;
154
155    int NeedContinue = (count > 0);
156    MPI_Allreduce(&NeedContinue, &Continue, 1, MPI_INT, MPI_SUM, Comm);
157  } while (Continue);
158}
159
160void GenericFileIO_MPICollective::write(const void *buf, size_t count, off_t offset,
161                              const std::string &D) {
162  int Continue = 0;
163
164  do {
165    MPI_Status status;
166    if (MPI_File_write_at_all(FH, offset, (void *) buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
167      throw runtime_error("Unable to write " + D + " to file: " + FileName);
168
169    int scount;
170    (void) MPI_Get_count(&status, MPI_BYTE, &scount);
171
172    count -= scount;
173    buf = ((char *) buf) + scount;
174    offset += scount;
175
176    int NeedContinue = (count > 0);
177    MPI_Allreduce(&NeedContinue, &Continue, 1, MPI_INT, MPI_SUM, Comm);
178  } while (Continue);
179}
180#endif
181
182GenericFileIO_POSIX::~GenericFileIO_POSIX() {
183  if (FH != -1) close(FH);
184}
185
186void GenericFileIO_POSIX::open(const std::string &FN, bool ForReading) {
187  FileName = FN;
188
189  int flags = ForReading ? O_RDONLY : (O_WRONLY | O_CREAT);
190  int mode = S_IRUSR | S_IWUSR | S_IRGRP;
191  errno = 0;
192  if ((FH = ::open(FileName.c_str(), flags, mode)) == -1)
193    throw runtime_error((!ForReading ? "Unable to create the file: " :
194                                       "Unable to open the file: ") +
195                        FileName + ": " + strerror(errno));
196}
197
198void GenericFileIO_POSIX::setSize(size_t sz) {
199  if (ftruncate(FH, sz) == -1)
200    throw runtime_error("Unable to set size for file: " + FileName);
201}
202
203void GenericFileIO_POSIX::read(void *buf, size_t count, off_t offset,
204                               const std::string &D) {
205  while (count > 0) {
206    ssize_t scount;
207    errno = 0;
208    if ((scount = pread(FH, buf, count, offset)) == -1) {
209      if (errno == EINTR)
210        continue;
211
[14e73bb]212      throw runtime_error("Unable to read " + D + " from file: " + FileName +
213                          ": " + strerror(errno));
[00587dc]214    }
215
216    count -= scount;
217    buf = ((char *) buf) + scount;
218    offset += scount;
219  }
220}
221
222void GenericFileIO_POSIX::write(const void *buf, size_t count, off_t offset,
223                                const std::string &D) {
224  while (count > 0) {
225    ssize_t scount;
226    errno = 0;
227    if ((scount = pwrite(FH, buf, count, offset)) == -1) {
228      if (errno == EINTR)
229        continue;
230
[14e73bb]231      throw runtime_error("Unable to write " + D + " to file: " + FileName +
232                          ": " + strerror(errno));
[00587dc]233    }
234
235    count -= scount;
236    buf = ((char *) buf) + scount;
237    offset += scount;
238  }
239}
240
241static bool isBigEndian() {
242  const uint32_t one = 1;
243  return !(*((char *)(&one)));
244}
245
[a4fee13]246static void bswap(void *v, size_t s) {
247  char *p = (char *) v;
248  for (size_t i = 0; i < s/2; ++i)
249    std::swap(p[i], p[s - (i+1)]);
250}
251
252// Using #pragma pack here, instead of __attribute__((packed)) because xlc, at
253// least as of v12.1, won't take __attribute__((packed)) on non-POD and/or
254// templated types.
255#pragma pack(1)
256
257template <typename T, bool IsBigEndian>
258struct endian_specific_value {
259  operator T() const {
260    T rvalue = value;
261    if (IsBigEndian != isBigEndian())
262      bswap(&rvalue, sizeof(T));
263
264    return rvalue;
265  };
266
267  endian_specific_value &operator = (T nvalue) {
268    if (IsBigEndian != isBigEndian())
269      bswap(&nvalue, sizeof(T));
270
271    value = nvalue;
272    return *this;
273  }
274
275  endian_specific_value &operator += (T nvalue) {
276    *this = *this + nvalue;
277    return *this;
278  }
279
280  endian_specific_value &operator -= (T nvalue) {
281    *this = *this - nvalue;
282    return *this;
283  }
284
285private:
286  T value;
287};
288
[00587dc]289static const size_t CRCSize = 8;
290
291static const size_t MagicSize = 8;
292static const char *MagicBE = "HACC01B";
293static const char *MagicLE = "HACC01L";
294
[a4fee13]295template <bool IsBigEndian>
[00587dc]296struct GlobalHeader {
297  char Magic[MagicSize];
[a4fee13]298  endian_specific_value<uint64_t, IsBigEndian> HeaderSize;
299  endian_specific_value<uint64_t, IsBigEndian> NElems; // The global total
300  endian_specific_value<uint64_t, IsBigEndian> Dims[3];
301  endian_specific_value<uint64_t, IsBigEndian> NVars;
302  endian_specific_value<uint64_t, IsBigEndian> VarsSize;
303  endian_specific_value<uint64_t, IsBigEndian> VarsStart;
304  endian_specific_value<uint64_t, IsBigEndian> NRanks;
305  endian_specific_value<uint64_t, IsBigEndian> RanksSize;
306  endian_specific_value<uint64_t, IsBigEndian> RanksStart;
307  endian_specific_value<uint64_t, IsBigEndian> GlobalHeaderSize;
308  endian_specific_value<double,   IsBigEndian> PhysOrigin[3];
309  endian_specific_value<double,   IsBigEndian> PhysScale[3];
310  endian_specific_value<uint64_t, IsBigEndian> BlocksSize;
311  endian_specific_value<uint64_t, IsBigEndian> BlocksStart;
312};
[00587dc]313
314enum {
315  FloatValue          = (1 << 0),
316  SignedValue         = (1 << 1),
317  ValueIsPhysCoordX   = (1 << 2),
318  ValueIsPhysCoordY   = (1 << 3),
319  ValueIsPhysCoordZ   = (1 << 4),
320  ValueMaybePhysGhost = (1 << 5)
321};
322
323static const size_t NameSize = 256;
[a4fee13]324template <bool IsBigEndian>
[00587dc]325struct VariableHeader {
326  char Name[NameSize];
[a4fee13]327  endian_specific_value<uint64_t, IsBigEndian> Flags;
328  endian_specific_value<uint64_t, IsBigEndian> Size;
[5d57155]329  endian_specific_value<uint64_t, IsBigEndian> ElementSize;
[a4fee13]330};
[00587dc]331
[a4fee13]332template <bool IsBigEndian>
[00587dc]333struct RankHeader {
[a4fee13]334  endian_specific_value<uint64_t, IsBigEndian> Coords[3];
335  endian_specific_value<uint64_t, IsBigEndian> NElems;
336  endian_specific_value<uint64_t, IsBigEndian> Start;
337  endian_specific_value<uint64_t, IsBigEndian> GlobalRank;
338};
[00587dc]339
340static const size_t FilterNameSize = 8;
341static const size_t MaxFilters = 4;
[a4fee13]342template <bool IsBigEndian>
[00587dc]343struct BlockHeader {
344  char Filters[MaxFilters][FilterNameSize];
[a4fee13]345  endian_specific_value<uint64_t, IsBigEndian> Start;
346  endian_specific_value<uint64_t, IsBigEndian> Size;
347};
[00587dc]348
[a4fee13]349template <bool IsBigEndian>
[00587dc]350struct CompressHeader {
[a4fee13]351  endian_specific_value<uint64_t, IsBigEndian> OrigCRC;
352};
[00587dc]353const char *CompressName = "BLOSC";
354
[6461f57]355const char *LossyCompressName = "SZ";
356
[a4fee13]357#pragma pack()
358
[00587dc]359unsigned GenericIO::DefaultFileIOType = FileIOPOSIX;
360int GenericIO::DefaultPartition = 0;
361bool GenericIO::DefaultShouldCompress = false;
362
363#ifndef GENERICIO_NO_MPI
364std::size_t GenericIO::CollectiveMPIIOThreshold = 0;
365#endif
366
367static bool blosc_initialized = false;
[6461f57]368static bool sz_initialized = false;
[00587dc]369
[eeacdad]370static int GetSZDT(GenericIO::Variable &Var) {
371  if (Var.hasElementType<float>())
372    return SZ_FLOAT;
373  else if (Var.hasElementType<double>())
374    return SZ_DOUBLE;
375  else if (Var.hasElementType<uint8_t>())
376    return SZ_UINT8;
377  else if (Var.hasElementType<int8_t>())
378    return SZ_INT8;
379  else if (Var.hasElementType<uint16_t>())
380    return SZ_UINT16;
381  else if (Var.hasElementType<int16_t>())
382    return SZ_INT16;
383  else if (Var.hasElementType<uint32_t>())
384    return SZ_UINT32;
385  else if (Var.hasElementType<int32_t>())
386    return SZ_INT32;
387  else if (Var.hasElementType<uint64_t>())
388    return SZ_UINT64;
389  else if (Var.hasElementType<int64_t>())
390    return SZ_INT64;
391  else
392    return -1;
393}
394
[00587dc]395#ifndef GENERICIO_NO_MPI
[a4fee13]396void GenericIO::write() {
397  if (isBigEndian())
398    write<true>();
399  else
400    write<false>();
401}
402
[00587dc]403// Note: writing errors are not currently recoverable (one rank may fail
404// while the others don't).
[a4fee13]405template <bool IsBigEndian>
[00587dc]406void GenericIO::write() {
[a4fee13]407  const char *Magic = IsBigEndian ? MagicBE : MagicLE;
[00587dc]408
409  uint64_t FileSize = 0;
410
411  int NRanks, Rank;
412  MPI_Comm_rank(Comm, &Rank);
413  MPI_Comm_size(Comm, &NRanks);
414
415#ifdef __bgq__
416  MPI_Barrier(Comm);
417#endif
418  MPI_Comm_split(Comm, Partition, Rank, &SplitComm);
419
420  int SplitNRanks, SplitRank;
421  MPI_Comm_rank(SplitComm, &SplitRank);
422  MPI_Comm_size(SplitComm, &SplitNRanks);
423
424  string LocalFileName;
425  if (SplitNRanks != NRanks) {
426    if (Rank == 0) {
427      // In split mode, the specified file becomes the rank map, and the real
428      // data is partitioned.
429
430      vector<int> MapRank, MapPartition;
431      MapRank.resize(NRanks);
432      for (int i = 0; i < NRanks; ++i) MapRank[i] = i;
433
434      MapPartition.resize(NRanks);
435      MPI_Gather(&Partition, 1, MPI_INT, &MapPartition[0], 1, MPI_INT, 0, Comm);
436
437      GenericIO GIO(MPI_COMM_SELF, FileName, FileIOType);
438      GIO.setNumElems(NRanks);
439      GIO.addVariable("$rank", MapRank); /* this is for use by humans; the reading
440                                            code assumes that the partitions are in
441                                            rank order */
442      GIO.addVariable("$partition", MapPartition);
443
444      vector<int> CX, CY, CZ;
445      int TopoStatus;
446      MPI_Topo_test(Comm, &TopoStatus);
447      if (TopoStatus == MPI_CART) {
448        CX.resize(NRanks);
449        CY.resize(NRanks);
450        CZ.resize(NRanks);
451
452        for (int i = 0; i < NRanks; ++i) {
453          int C[3];
454          MPI_Cart_coords(Comm, i, 3, C);
455
456          CX[i] = C[0];
457          CY[i] = C[1];
458          CZ[i] = C[2];
459        }
460
461        GIO.addVariable("$x", CX);
462        GIO.addVariable("$y", CY);
463        GIO.addVariable("$z", CZ);
464      }
465
466      GIO.write();
467    } else {
468      MPI_Gather(&Partition, 1, MPI_INT, 0, 0, MPI_INT, 0, Comm);
469    }
470
471    stringstream ss;
472    ss << FileName << "#" << Partition;
473    LocalFileName = ss.str();
474  } else {
475    LocalFileName = FileName;
476  }
477
[a4fee13]478  RankHeader<IsBigEndian> RHLocal;
[00587dc]479  int Dims[3], Periods[3], Coords[3];
480
481  int TopoStatus;
482  MPI_Topo_test(Comm, &TopoStatus);
483  if (TopoStatus == MPI_CART) {
484    MPI_Cart_get(Comm, 3, Dims, Periods, Coords);
485  } else {
486    Dims[0] = NRanks;
487    std::fill(Dims + 1, Dims + 3, 1);
488    std::fill(Periods, Periods + 3, 0);
489    Coords[0] = Rank;
490    std::fill(Coords + 1, Coords + 3, 0);
491  }
492
493  std::copy(Coords, Coords + 3, RHLocal.Coords);
494  RHLocal.NElems = NElems;
495  RHLocal.Start = 0;
496  RHLocal.GlobalRank = Rank;
497
498  bool ShouldCompress = DefaultShouldCompress;
499  const char *EnvStr = getenv("GENERICIO_COMPRESS");
500  if (EnvStr) {
501    int Mod = atoi(EnvStr);
502    ShouldCompress = (Mod > 0);
503  }
504
505  bool NeedsBlockHeaders = ShouldCompress;
506  EnvStr = getenv("GENERICIO_FORCE_BLOCKS");
507  if (!NeedsBlockHeaders && EnvStr) {
508    int Mod = atoi(EnvStr);
509    NeedsBlockHeaders = (Mod > 0);
510  }
511
[a4fee13]512  vector<BlockHeader<IsBigEndian> > LocalBlockHeaders;
[00587dc]513  vector<void *> LocalData;
514  vector<bool> LocalHasExtraSpace;
515  vector<vector<unsigned char> > LocalCData;
516  if (NeedsBlockHeaders) {
517    LocalBlockHeaders.resize(Vars.size());
518    LocalData.resize(Vars.size());
519    LocalHasExtraSpace.resize(Vars.size());
520    if (ShouldCompress)
521      LocalCData.resize(Vars.size());
522
523    for (size_t i = 0; i < Vars.size(); ++i) {
524      // Filters null by default, leave null starting address (needs to be
525      // calculated by the header-writing rank).
[a4fee13]526      memset(&LocalBlockHeaders[i], 0, sizeof(BlockHeader<IsBigEndian>));
[00587dc]527      if (ShouldCompress) {
[eeacdad]528        void *OrigData = Vars[i].Data;
529        bool FreeOrigData = false;
530        size_t OrigUnitSize = Vars[i].Size;
531        size_t OrigDataSize = NElems*Vars[i].Size;
532
533        int FilterIdx = 0;
534        if (Vars[i].LCI.Mode != LossyCompressionInfo::LCModeNone) {
[0654f4c]535#ifdef _OPENMP
536#pragma omp master
537  {
538#endif
539         if (!sz_initialized) {
540           SZ_Init(NULL);
[1aa7cbe]541           confparams_cpr->szMode = 0; // Best-speed mode.
[0654f4c]542           sz_initialized = true;
543         }
544
545#ifdef _OPENMP
546  }
547#endif
[eeacdad]548          int SZDT = GetSZDT(Vars[i]);
549          if (SZDT == -1)
550            goto nosz;
551
552          int EBM;
553          switch (Vars[i].LCI.Mode) {
554          case LossyCompressionInfo::LCModeAbs:
555            EBM = ABS;
556            break;
557          case LossyCompressionInfo::LCModeRel:
558            EBM = REL;
559            break;
560          case LossyCompressionInfo::LCModeAbsAndRel:
561            EBM = ABS_AND_REL;
562            break;
563          case LossyCompressionInfo::LCModeAbsOrRel:
564            EBM = ABS_OR_REL;
565            break;
566          case LossyCompressionInfo::LCModePSNR:
567            EBM = PSNR;
568            break;
569          }
570
571          size_t LOutSize;
572          unsigned char *LCompressedData = SZ_compress_args(SZDT, Vars[i].Data, &LOutSize, EBM,
573                                                            Vars[i].LCI.AbsErrThreshold, Vars[i].LCI.RelErrThreshold,
574                                                            Vars[i].LCI.PSNRThreshold, 0, 0, 0, 0, NElems);
575          if (!LCompressedData)
576            goto nosz;
577          if (LOutSize >= NElems*Vars[i].Size) {
578            free(LCompressedData);
579            goto nosz;
580          }
581
582          OrigData = LCompressedData;
583          FreeOrigData = true;
584          OrigUnitSize = 1;
585          OrigDataSize = LOutSize;
586
587          strncpy(LocalBlockHeaders[i].Filters[FilterIdx++], LossyCompressName, FilterNameSize);
588        }
589nosz:
590
[a4fee13]591        LocalCData[i].resize(sizeof(CompressHeader<IsBigEndian>));
[00587dc]592
[a4fee13]593        CompressHeader<IsBigEndian> *CH = (CompressHeader<IsBigEndian>*) &LocalCData[i][0];
[eeacdad]594        CH->OrigCRC = crc64_omp(OrigData, OrigDataSize);
[00587dc]595
596#ifdef _OPENMP
597#pragma omp master
598  {
599#endif
600
601       if (!blosc_initialized) {
602         blosc_init();
603         blosc_initialized = true;
604       }
605
606#ifdef _OPENMP
607       blosc_set_nthreads(omp_get_max_threads());
608  }
609#endif
610
[22c1c0a]611        size_t RealOrigDataSize = NElems*Vars[i].Size;
612        LocalCData[i].resize(LocalCData[i].size() + RealOrigDataSize);
[eeacdad]613        if (blosc_compress(9, 1, OrigUnitSize, OrigDataSize, OrigData,
[a4fee13]614                           &LocalCData[i][0] + sizeof(CompressHeader<IsBigEndian>),
[22c1c0a]615                           RealOrigDataSize) <= 0) {
[eeacdad]616          if (FreeOrigData)
617            free(OrigData);
618
[00587dc]619          goto nocomp;
[eeacdad]620        }
[00587dc]621
[eeacdad]622        if (FreeOrigData)
623          free(OrigData);
624
625        strncpy(LocalBlockHeaders[i].Filters[FilterIdx++], CompressName, FilterNameSize);
[00587dc]626        size_t CNBytes, CCBytes, CBlockSize;
[a4fee13]627        blosc_cbuffer_sizes(&LocalCData[i][0] + sizeof(CompressHeader<IsBigEndian>),
[00587dc]628                            &CNBytes, &CCBytes, &CBlockSize);
[a4fee13]629        LocalCData[i].resize(CCBytes + sizeof(CompressHeader<IsBigEndian>));
[00587dc]630
631        LocalBlockHeaders[i].Size = LocalCData[i].size();
632        LocalCData[i].resize(LocalCData[i].size() + CRCSize);
633        LocalData[i] = &LocalCData[i][0];
634        LocalHasExtraSpace[i] = true;
635      } else {
636nocomp:
637        LocalBlockHeaders[i].Size = NElems*Vars[i].Size;
638        LocalData[i] = Vars[i].Data;
639        LocalHasExtraSpace[i] = Vars[i].HasExtraSpace;
640      }
641    }
642  }
643
644  double StartTime = MPI_Wtime();
645
646  if (SplitRank == 0) {
[a4fee13]647    uint64_t HeaderSize = sizeof(GlobalHeader<IsBigEndian>) + Vars.size()*sizeof(VariableHeader<IsBigEndian>) +
648                          SplitNRanks*sizeof(RankHeader<IsBigEndian>) + CRCSize;
[00587dc]649    if (NeedsBlockHeaders)
[a4fee13]650      HeaderSize += SplitNRanks*Vars.size()*sizeof(BlockHeader<IsBigEndian>);
[00587dc]651
652    vector<char> Header(HeaderSize, 0);
[a4fee13]653    GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &Header[0];
[00587dc]654    std::copy(Magic, Magic + MagicSize, GH->Magic);
655    GH->HeaderSize = HeaderSize - CRCSize;
656    GH->NElems = NElems; // This will be updated later
657    std::copy(Dims, Dims + 3, GH->Dims);
658    GH->NVars = Vars.size();
[a4fee13]659    GH->VarsSize = sizeof(VariableHeader<IsBigEndian>);
660    GH->VarsStart = sizeof(GlobalHeader<IsBigEndian>);
[00587dc]661    GH->NRanks = SplitNRanks;
[a4fee13]662    GH->RanksSize = sizeof(RankHeader<IsBigEndian>);
663    GH->RanksStart = GH->VarsStart + Vars.size()*sizeof(VariableHeader<IsBigEndian>);
664    GH->GlobalHeaderSize = sizeof(GlobalHeader<IsBigEndian>);
[00587dc]665    std::copy(PhysOrigin, PhysOrigin + 3, GH->PhysOrigin);
666    std::copy(PhysScale,  PhysScale  + 3, GH->PhysScale);
667    if (!NeedsBlockHeaders) {
668      GH->BlocksSize = GH->BlocksStart = 0;
669    } else {
[a4fee13]670      GH->BlocksSize = sizeof(BlockHeader<IsBigEndian>);
671      GH->BlocksStart = GH->RanksStart + SplitNRanks*sizeof(RankHeader<IsBigEndian>);
[00587dc]672    }
673
674    uint64_t RecordSize = 0;
[a4fee13]675    VariableHeader<IsBigEndian> *VH = (VariableHeader<IsBigEndian> *) &Header[GH->VarsStart];
[00587dc]676    for (size_t i = 0; i < Vars.size(); ++i, ++VH) {
677      string VName(Vars[i].Name);
678      VName.resize(NameSize);
679
680      std::copy(VName.begin(), VName.end(), VH->Name);
[a4fee13]681      uint64_t VFlags = 0;
682      if (Vars[i].IsFloat)  VFlags |= FloatValue;
683      if (Vars[i].IsSigned) VFlags |= SignedValue;
684      if (Vars[i].IsPhysCoordX) VFlags |= ValueIsPhysCoordX;
685      if (Vars[i].IsPhysCoordY) VFlags |= ValueIsPhysCoordY;
686      if (Vars[i].IsPhysCoordZ) VFlags |= ValueIsPhysCoordZ;
687      if (Vars[i].MaybePhysGhost) VFlags |= ValueMaybePhysGhost;
688      VH->Flags = VFlags;
[00587dc]689      RecordSize += VH->Size = Vars[i].Size;
[5d57155]690      VH->ElementSize = Vars[i].ElementSize;
[00587dc]691    }
692
693    MPI_Gather(&RHLocal, sizeof(RHLocal), MPI_BYTE,
694               &Header[GH->RanksStart], sizeof(RHLocal),
695               MPI_BYTE, 0, SplitComm);
696
697    if (NeedsBlockHeaders) {
698      MPI_Gather(&LocalBlockHeaders[0],
[a4fee13]699                 Vars.size()*sizeof(BlockHeader<IsBigEndian>), MPI_BYTE,
[00587dc]700                 &Header[GH->BlocksStart],
[a4fee13]701                 Vars.size()*sizeof(BlockHeader<IsBigEndian>), MPI_BYTE,
[00587dc]702                 0, SplitComm);
703
[a4fee13]704      BlockHeader<IsBigEndian> *BH = (BlockHeader<IsBigEndian> *) &Header[GH->BlocksStart];
[00587dc]705      for (int i = 0; i < SplitNRanks; ++i)
706      for (size_t j = 0; j < Vars.size(); ++j, ++BH) {
707        if (i == 0 && j == 0)
708          BH->Start = HeaderSize;
709        else
710          BH->Start = BH[-1].Start + BH[-1].Size + CRCSize;
711      }
712
[a4fee13]713      RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &Header[GH->RanksStart];
[00587dc]714      RH->Start = HeaderSize; ++RH;
715      for (int i = 1; i < SplitNRanks; ++i, ++RH) {
716        RH->Start =
[a4fee13]717          ((BlockHeader<IsBigEndian> *) &Header[GH->BlocksStart])[i*Vars.size()].Start;
[00587dc]718        GH->NElems += RH->NElems;
719      }
720
721      // Compute the total file size.
722      uint64_t LastData = BH[-1].Size + CRCSize;
723      FileSize = BH[-1].Start + LastData;
724    } else {
[a4fee13]725      RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &Header[GH->RanksStart];
[00587dc]726      RH->Start = HeaderSize; ++RH;
727      for (int i = 1; i < SplitNRanks; ++i, ++RH) {
728        uint64_t PrevNElems = RH[-1].NElems;
729        uint64_t PrevData = PrevNElems*RecordSize + CRCSize*Vars.size();
730        RH->Start = RH[-1].Start + PrevData;
731        GH->NElems += RH->NElems;
732      }
733
734      // Compute the total file size.
735      uint64_t LastNElems = RH[-1].NElems;
736      uint64_t LastData = LastNElems*RecordSize + CRCSize*Vars.size();
737      FileSize = RH[-1].Start + LastData;
738    }
739
740    // Now that the starting offset has been computed, send it back to each rank.
741    MPI_Scatter(&Header[GH->RanksStart], sizeof(RHLocal),
742                MPI_BYTE, &RHLocal, sizeof(RHLocal),
743                MPI_BYTE, 0, SplitComm);
744
745    if (NeedsBlockHeaders)
746      MPI_Scatter(&Header[GH->BlocksStart],
[a4fee13]747                  sizeof(BlockHeader<IsBigEndian>)*Vars.size(), MPI_BYTE,
[00587dc]748                  &LocalBlockHeaders[0],
[a4fee13]749                  sizeof(BlockHeader<IsBigEndian>)*Vars.size(), MPI_BYTE,
[00587dc]750                  0, SplitComm);
751
752    uint64_t HeaderCRC = crc64_omp(&Header[0], HeaderSize - CRCSize);
753    crc64_invert(HeaderCRC, &Header[HeaderSize - CRCSize]);
754
755    if (FileIOType == FileIOMPI)
756      FH.get() = new GenericFileIO_MPI(MPI_COMM_SELF);
757    else if (FileIOType == FileIOMPICollective)
758      FH.get() = new GenericFileIO_MPICollective(MPI_COMM_SELF);
759    else
760      FH.get() = new GenericFileIO_POSIX();
761
762    FH.get()->open(LocalFileName);
763    FH.get()->setSize(FileSize);
764    FH.get()->write(&Header[0], HeaderSize, 0, "header");
765
766    close();
767  } else {
768    MPI_Gather(&RHLocal, sizeof(RHLocal), MPI_BYTE, 0, 0, MPI_BYTE, 0, SplitComm);
769    if (NeedsBlockHeaders)
[a4fee13]770      MPI_Gather(&LocalBlockHeaders[0], Vars.size()*sizeof(BlockHeader<IsBigEndian>),
[00587dc]771                 MPI_BYTE, 0, 0, MPI_BYTE, 0, SplitComm);
772    MPI_Scatter(0, 0, MPI_BYTE, &RHLocal, sizeof(RHLocal), MPI_BYTE, 0, SplitComm);
773    if (NeedsBlockHeaders)
[a4fee13]774      MPI_Scatter(0, 0, MPI_BYTE, &LocalBlockHeaders[0], sizeof(BlockHeader<IsBigEndian>)*Vars.size(),
[00587dc]775                  MPI_BYTE, 0, SplitComm);
776  }
777
778  MPI_Barrier(SplitComm);
779
780  if (FileIOType == FileIOMPI)
781    FH.get() = new GenericFileIO_MPI(SplitComm);
782  else if (FileIOType == FileIOMPICollective)
783    FH.get() = new GenericFileIO_MPICollective(SplitComm);
784  else
785    FH.get() = new GenericFileIO_POSIX();
786
787  FH.get()->open(LocalFileName);
788
789  uint64_t Offset = RHLocal.Start;
790  for (size_t i = 0; i < Vars.size(); ++i) {
791    uint64_t WriteSize = NeedsBlockHeaders ?
792                         LocalBlockHeaders[i].Size : NElems*Vars[i].Size;
793    void *Data = NeedsBlockHeaders ? LocalData[i] : Vars[i].Data;
794    uint64_t CRC = crc64_omp(Data, WriteSize);
795    bool HasExtraSpace = NeedsBlockHeaders ?
796                         LocalHasExtraSpace[i] : Vars[i].HasExtraSpace;
797    char *CRCLoc = HasExtraSpace ?  ((char *) Data) + WriteSize : (char *) &CRC;
798
799    if (NeedsBlockHeaders)
800      Offset = LocalBlockHeaders[i].Start;
801
802    // When using extra space for the CRC write, preserve the original contents.
803    char CRCSave[CRCSize];
804    if (HasExtraSpace)
805      std::copy(CRCLoc, CRCLoc + CRCSize, CRCSave);
806
807    crc64_invert(CRC, CRCLoc);
808
809    if (HasExtraSpace) {
810      FH.get()->write(Data, WriteSize + CRCSize, Offset, Vars[i].Name + " with CRC");
811    } else {
812      FH.get()->write(Data, WriteSize, Offset, Vars[i].Name);
813      FH.get()->write(CRCLoc, CRCSize, Offset + WriteSize, Vars[i].Name + " CRC");
814    }
815
816    if (HasExtraSpace)
817       std::copy(CRCSave, CRCSave + CRCSize, CRCLoc);
818
819    Offset += WriteSize + CRCSize;
820  }
821
822  close();
823  MPI_Barrier(Comm);
824
825  double EndTime = MPI_Wtime();
826  double TotalTime = EndTime - StartTime;
827  double MaxTotalTime;
828  MPI_Reduce(&TotalTime, &MaxTotalTime, 1, MPI_DOUBLE, MPI_MAX, 0, Comm);
829
830  if (SplitNRanks != NRanks) {
831    uint64_t ContribFileSize = (SplitRank == 0) ? FileSize : 0;
832    MPI_Reduce(&ContribFileSize, &FileSize, 1, MPI_UINT64_T, MPI_SUM, 0, Comm);
833  }
834
835  if (Rank == 0) {
836    double Rate = ((double) FileSize) / MaxTotalTime / (1024.*1024.);
[d6628a6]837    std::cout << "Wrote " << Vars.size() << " variables to " << FileName <<
838                  " (" << FileSize << " bytes) in " << MaxTotalTime << "s: " <<
839                  Rate << " MB/s" << std::endl;
[00587dc]840  }
841
842  MPI_Comm_free(&SplitComm);
843  SplitComm = MPI_COMM_NULL;
844}
845#endif // GENERICIO_NO_MPI
846
[a4fee13]847template <bool IsBigEndian>
[8f0a211]848void GenericIO::readHeaderLeader(void *GHPtr, MismatchBehavior MB, int NRanks,
849                                 int Rank, int SplitNRanks,
850                                 string &LocalFileName, uint64_t &HeaderSize,
851                                 vector<char> &Header) {
[a4fee13]852  GlobalHeader<IsBigEndian> &GH = *(GlobalHeader<IsBigEndian> *) GHPtr;
853
[8f0a211]854  if (MB == MismatchDisallowed) {
[a4fee13]855    if (SplitNRanks != (int) GH.NRanks) {
856      stringstream ss;
857      ss << "Won't read " << LocalFileName << ": communicator-size mismatch: " <<
858            "current: " << SplitNRanks << ", file: " << GH.NRanks;
859      throw runtime_error(ss.str());
860    }
861
862#ifndef GENERICIO_NO_MPI
863    int TopoStatus;
864    MPI_Topo_test(Comm, &TopoStatus);
865    if (TopoStatus == MPI_CART) {
866      int Dims[3], Periods[3], Coords[3];
867      MPI_Cart_get(Comm, 3, Dims, Periods, Coords);
868
869      bool DimsMatch = true;
870      for (int i = 0; i < 3; ++i) {
871        if ((uint64_t) Dims[i] != GH.Dims[i]) {
872          DimsMatch = false;
873          break;
874        }
875      }
876
877      if (!DimsMatch) {
878        stringstream ss;
879        ss << "Won't read " << LocalFileName <<
880              ": communicator-decomposition mismatch: " <<
881              "current: " << Dims[0] << "x" << Dims[1] << "x" << Dims[2] <<
882              ", file: " << GH.Dims[0] << "x" << GH.Dims[1] << "x" <<
883              GH.Dims[2];
884        throw runtime_error(ss.str());
885      }
886    }
887#endif
[8f0a211]888  } else if (MB == MismatchRedistribute && !Redistributing) {
889    Redistributing = true;
890
891    int NFileRanks = RankMap.empty() ? (int) GH.NRanks : (int) RankMap.size();
892    int NFileRanksPerRank = NFileRanks/NRanks;
893    int NRemFileRank = NFileRanks % NRanks;
894
895    if (!NFileRanksPerRank) {
896      // We have only the remainder, so the last NRemFileRank ranks get one
897      // file rank, and the others don't.
898      if (NRemFileRank && NRanks - Rank <= NRemFileRank)
899        SourceRanks.push_back(NRanks - (Rank + 1));
900    } else {
901      // Since NRemFileRank < NRanks, and we don't want to put any extra memory
902      // load on rank 0 (because rank 0's memory load is normally higher than
903      // the other ranks anyway), the last NRemFileRank will each take
904      // (NFileRanksPerRank+1) file ranks.
905
906      int FirstFileRank = 0, LastFileRank = NFileRanksPerRank - 1;
907      for (int i = 1; i <= Rank; ++i) {
908        FirstFileRank = LastFileRank + 1;
909        LastFileRank  = FirstFileRank + NFileRanksPerRank - 1;
910
911        if (NRemFileRank && NRanks - i <= NRemFileRank)
912          ++LastFileRank;
913      }
914
915      for (int i = FirstFileRank; i <= LastFileRank; ++i)
916        SourceRanks.push_back(i);
917    }
[a4fee13]918  }
919
920  HeaderSize = GH.HeaderSize;
921  Header.resize(HeaderSize + CRCSize, 0xFE /* poison */);
922  FH.get()->read(&Header[0], HeaderSize + CRCSize, 0, "header");
923
924  uint64_t CRC = crc64_omp(&Header[0], HeaderSize + CRCSize);
925  if (CRC != (uint64_t) -1) {
926    throw runtime_error("Header CRC check failed: " + LocalFileName);
927  }
928}
929
[00587dc]930// Note: Errors from this function should be recoverable. This means that if
931// one rank throws an exception, then all ranks should.
[8f0a211]932void GenericIO::openAndReadHeader(MismatchBehavior MB, int EffRank, bool CheckPartMap) {
[00587dc]933  int NRanks, Rank;
934#ifndef GENERICIO_NO_MPI
935  MPI_Comm_rank(Comm, &Rank);
936  MPI_Comm_size(Comm, &NRanks);
937#else
938  Rank = 0;
939  NRanks = 1;
940#endif
941
942  if (EffRank == -1)
[8f0a211]943    EffRank = MB == MismatchRedistribute ? 0 : Rank;
[00587dc]944
945  if (RankMap.empty() && CheckPartMap) {
946    // First, check to see if the file is a rank map.
947    unsigned long RanksInMap = 0;
948    if (Rank == 0) {
949      try {
950#ifndef GENERICIO_NO_MPI
951        GenericIO GIO(MPI_COMM_SELF, FileName, FileIOType);
952#else
953        GenericIO GIO(FileName, FileIOType);
954#endif
[8f0a211]955        GIO.openAndReadHeader(MismatchDisallowed, 0, false);
[00587dc]956        RanksInMap = GIO.readNumElems();
957
958        RankMap.resize(RanksInMap + GIO.requestedExtraSpace()/sizeof(int));
959        GIO.addVariable("$partition", RankMap, true);
960
961        GIO.readData(0, false);
962        RankMap.resize(RanksInMap);
963      } catch (...) {
964        RankMap.clear();
965        RanksInMap = 0;
966      }
967    }
968
969#ifndef GENERICIO_NO_MPI
970    MPI_Bcast(&RanksInMap, 1, MPI_UNSIGNED_LONG, 0, Comm);
971    if (RanksInMap > 0) {
972      RankMap.resize(RanksInMap);
973      MPI_Bcast(&RankMap[0], RanksInMap, MPI_INT, 0, Comm);
974    }
975#endif
976  }
977
978#ifndef GENERICIO_NO_MPI
979  if (SplitComm != MPI_COMM_NULL)
980    MPI_Comm_free(&SplitComm);
981#endif
982
983  string LocalFileName;
984  if (RankMap.empty()) {
985    LocalFileName = FileName;
986#ifndef GENERICIO_NO_MPI
[8f0a211]987    MPI_Comm_dup(MB == MismatchRedistribute ? MPI_COMM_SELF : Comm, &SplitComm);
[00587dc]988#endif
989  } else {
990    stringstream ss;
991    ss << FileName << "#" << RankMap[EffRank];
992    LocalFileName = ss.str();
993#ifndef GENERICIO_NO_MPI
[8f0a211]994    if (MB == MismatchRedistribute) {
995      MPI_Comm_dup(MPI_COMM_SELF, &SplitComm);
996    } else {
[00587dc]997#ifdef __bgq__
[8f0a211]998      MPI_Barrier(Comm);
[00587dc]999#endif
[8f0a211]1000      MPI_Comm_split(Comm, RankMap[EffRank], Rank, &SplitComm);
1001    }
[00587dc]1002#endif
1003  }
1004
1005  if (LocalFileName == OpenFileName)
1006    return;
1007  FH.close();
1008
1009  int SplitNRanks, SplitRank;
1010#ifndef GENERICIO_NO_MPI
1011  MPI_Comm_rank(SplitComm, &SplitRank);
1012  MPI_Comm_size(SplitComm, &SplitNRanks);
1013#else
1014  SplitRank = 0;
1015  SplitNRanks = 1;
1016#endif
1017
1018  uint64_t HeaderSize;
1019  vector<char> Header;
1020
1021  if (SplitRank == 0) {
1022#ifndef GENERICIO_NO_MPI
1023    if (FileIOType == FileIOMPI)
1024      FH.get() = new GenericFileIO_MPI(MPI_COMM_SELF);
1025    else if (FileIOType == FileIOMPICollective)
1026      FH.get() = new GenericFileIO_MPICollective(MPI_COMM_SELF);
1027    else
1028#endif
1029      FH.get() = new GenericFileIO_POSIX();
1030
1031#ifndef GENERICIO_NO_MPI
1032    char True = 1, False = 0;
1033#endif
1034
1035    try {
1036      FH.get()->open(LocalFileName, true);
1037
[a4fee13]1038      GlobalHeader<false> GH; // endianness does not matter yet...
1039      FH.get()->read(&GH, sizeof(GlobalHeader<false>), 0, "global header");
[00587dc]1040
[a4fee13]1041      if (string(GH.Magic, GH.Magic + MagicSize - 1) == MagicLE) {
[8f0a211]1042        readHeaderLeader<false>(&GH, MB, NRanks, Rank, SplitNRanks, LocalFileName,
[a4fee13]1043                                HeaderSize, Header);
1044      } else if (string(GH.Magic, GH.Magic + MagicSize - 1) == MagicBE) {
[8f0a211]1045        readHeaderLeader<true>(&GH, MB, NRanks, Rank, SplitNRanks, LocalFileName,
[a4fee13]1046                               HeaderSize, Header);
1047      } else {
1048        string Error = "invalid file-type identifier";
[00587dc]1049        throw runtime_error("Won't read " + LocalFileName + ": " + Error);
1050      }
1051
1052#ifndef GENERICIO_NO_MPI
1053      close();
1054      MPI_Bcast(&True, 1, MPI_BYTE, 0, SplitComm);
1055#endif
1056    } catch (...) {
1057#ifndef GENERICIO_NO_MPI
1058      MPI_Bcast(&False, 1, MPI_BYTE, 0, SplitComm);
1059#endif
1060      close();
1061      throw;
1062    }
1063  } else {
1064#ifndef GENERICIO_NO_MPI
1065    char Okay;
1066    MPI_Bcast(&Okay, 1, MPI_BYTE, 0, SplitComm);
1067    if (!Okay)
1068      throw runtime_error("Failure broadcast from rank 0");
1069#endif
1070  }
1071
1072#ifndef GENERICIO_NO_MPI
1073  MPI_Bcast(&HeaderSize, 1, MPI_UINT64_T, 0, SplitComm);
1074#endif
1075
1076  Header.resize(HeaderSize, 0xFD /* poison */);
1077#ifndef GENERICIO_NO_MPI
1078  MPI_Bcast(&Header[0], HeaderSize, MPI_BYTE, 0, SplitComm);
1079#endif
1080
1081  FH.getHeaderCache().clear();
[a4fee13]1082
1083  GlobalHeader<false> *GH = (GlobalHeader<false> *) &Header[0];
1084  FH.setIsBigEndian(string(GH->Magic, GH->Magic + MagicSize - 1) == MagicBE);
1085
[00587dc]1086  FH.getHeaderCache().swap(Header);
1087  OpenFileName = LocalFileName;
1088
1089#ifndef GENERICIO_NO_MPI
[8f0a211]1090  if (!DisableCollErrChecking)
1091    MPI_Barrier(Comm);
[00587dc]1092
1093  if (FileIOType == FileIOMPI)
1094    FH.get() = new GenericFileIO_MPI(SplitComm);
1095  else if (FileIOType == FileIOMPICollective)
1096    FH.get() = new GenericFileIO_MPICollective(SplitComm);
1097  else
1098    FH.get() = new GenericFileIO_POSIX();
1099
1100  int OpenErr = 0, TotOpenErr;
1101  try {
1102    FH.get()->open(LocalFileName, true);
[8f0a211]1103    MPI_Allreduce(&OpenErr, &TotOpenErr, 1, MPI_INT, MPI_SUM,
1104                  DisableCollErrChecking ? MPI_COMM_SELF : Comm);
[00587dc]1105  } catch (...) {
1106    OpenErr = 1;
[8f0a211]1107    MPI_Allreduce(&OpenErr, &TotOpenErr, 1, MPI_INT, MPI_SUM,
1108                  DisableCollErrChecking ? MPI_COMM_SELF : Comm);
[00587dc]1109    throw;
1110  }
1111
1112  if (TotOpenErr > 0) {
1113    stringstream ss;
1114    ss << TotOpenErr << " ranks failed to open file: " << LocalFileName;
1115    throw runtime_error(ss.str());
1116  }
1117#endif
1118}
1119
1120int GenericIO::readNRanks() {
[a4fee13]1121  if (FH.isBigEndian())
1122    return readNRanks<true>();
1123  return readNRanks<false>();
1124}
1125
1126template <bool IsBigEndian>
1127int GenericIO::readNRanks() {
[00587dc]1128  if (RankMap.size())
1129    return RankMap.size();
1130
1131  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
[a4fee13]1132  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
[00587dc]1133  return (int) GH->NRanks;
1134}
1135
1136void GenericIO::readDims(int Dims[3]) {
[a4fee13]1137  if (FH.isBigEndian())
1138    readDims<true>(Dims);
1139  else
1140    readDims<false>(Dims);
1141}
1142
1143template <bool IsBigEndian>
1144void GenericIO::readDims(int Dims[3]) {
[00587dc]1145  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
[a4fee13]1146  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
[00587dc]1147  std::copy(GH->Dims, GH->Dims + 3, Dims);
1148}
1149
1150uint64_t GenericIO::readTotalNumElems() {
[a4fee13]1151  if (FH.isBigEndian())
1152    return readTotalNumElems<true>();
1153  return readTotalNumElems<false>();
1154}
1155
1156template <bool IsBigEndian>
1157uint64_t GenericIO::readTotalNumElems() {
[00587dc]1158  if (RankMap.size())
1159    return (uint64_t) -1;
1160
1161  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
[a4fee13]1162  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
[00587dc]1163  return GH->NElems;
1164}
1165
1166void GenericIO::readPhysOrigin(double Origin[3]) {
[a4fee13]1167  if (FH.isBigEndian())
1168    readPhysOrigin<true>(Origin);
1169  else
1170    readPhysOrigin<false>(Origin);
1171}
1172
1173// Define a "safe" version of offsetof (offsetof itself might not work for
1174// non-POD types, and at least xlC v12.1 will complain about this if you try).
1175#define offsetof_safe(S, F) (size_t(&(S)->F) - size_t(S))
1176
1177template <bool IsBigEndian>
1178void GenericIO::readPhysOrigin(double Origin[3]) {
[00587dc]1179  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
[a4fee13]1180  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1181  if (offsetof_safe(GH, PhysOrigin) >= GH->GlobalHeaderSize) {
[00587dc]1182    std::fill(Origin, Origin + 3, 0.0);
1183    return;
1184  }
1185
1186  std::copy(GH->PhysOrigin, GH->PhysOrigin + 3, Origin);
1187}
1188
1189void GenericIO::readPhysScale(double Scale[3]) {
[a4fee13]1190  if (FH.isBigEndian())
1191    readPhysScale<true>(Scale);
1192  else
1193    readPhysScale<false>(Scale);
1194}
1195
1196template <bool IsBigEndian>
1197void GenericIO::readPhysScale(double Scale[3]) {
[00587dc]1198  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
[a4fee13]1199  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1200  if (offsetof_safe(GH, PhysScale) >= GH->GlobalHeaderSize) {
[00587dc]1201    std::fill(Scale, Scale + 3, 0.0);
1202    return;
1203  }
1204
1205  std::copy(GH->PhysScale, GH->PhysScale + 3, Scale);
1206}
1207
[a4fee13]1208template <bool IsBigEndian>
1209static size_t getRankIndex(int EffRank, GlobalHeader<IsBigEndian> *GH,
[00587dc]1210                           vector<int> &RankMap, vector<char> &HeaderCache) {
[a4fee13]1211  if (RankMap.empty())
[00587dc]1212    return EffRank;
1213
1214  for (size_t i = 0; i < GH->NRanks; ++i) {
[a4fee13]1215    RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &HeaderCache[GH->RanksStart +
[00587dc]1216                                                 i*GH->RanksSize];
[a4fee13]1217    if (offsetof_safe(RH, GlobalRank) >= GH->RanksSize)
1218      return EffRank;
1219
[00587dc]1220    if ((int) RH->GlobalRank == EffRank)
1221      return i;
1222  }
1223
1224  assert(false && "Index requested of an invalid rank");
1225  return (size_t) -1;
1226}
1227
1228int GenericIO::readGlobalRankNumber(int EffRank) {
[a4fee13]1229  if (FH.isBigEndian())
1230    return readGlobalRankNumber<true>(EffRank);
1231  return readGlobalRankNumber<false>(EffRank);
1232}
1233
1234template <bool IsBigEndian>
1235int GenericIO::readGlobalRankNumber(int EffRank) {
[00587dc]1236  if (EffRank == -1) {
1237#ifndef GENERICIO_NO_MPI
1238    MPI_Comm_rank(Comm, &EffRank);
1239#else
1240    EffRank = 0;
1241#endif
1242  }
1243
[8f0a211]1244  openAndReadHeader(MismatchAllowed, EffRank, false);
[00587dc]1245
1246  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1247
[a4fee13]1248  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1249  size_t RankIndex = getRankIndex<IsBigEndian>(EffRank, GH, RankMap, FH.getHeaderCache());
[00587dc]1250
1251  assert(RankIndex < GH->NRanks && "Invalid rank specified");
1252
[a4fee13]1253  RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->RanksStart +
[00587dc]1254                                               RankIndex*GH->RanksSize];
1255
[a4fee13]1256  if (offsetof_safe(RH, GlobalRank) >= GH->RanksSize)
1257    return EffRank;
1258
[00587dc]1259  return (int) RH->GlobalRank;
1260}
1261
[14e73bb]1262void GenericIO::getSourceRanks(vector<int> &SR) {
1263  SR.clear();
1264
1265  if (Redistributing) {
1266    std::copy(SourceRanks.begin(), SourceRanks.end(), std::back_inserter(SR));
1267    return;
1268  }
1269
1270  int Rank;
1271#ifndef GENERICIO_NO_MPI
1272  MPI_Comm_rank(Comm, &Rank);
1273#else
1274  Rank = 0;
1275#endif
1276
1277  SR.push_back(Rank);
1278}
1279
[00587dc]1280size_t GenericIO::readNumElems(int EffRank) {
[8f0a211]1281  if (EffRank == -1 && Redistributing) {
1282    DisableCollErrChecking = true;
1283
1284    size_t TotalSize = 0;
1285    for (int i = 0, ie = SourceRanks.size(); i != ie; ++i)
1286      TotalSize += readNumElems(SourceRanks[i]);
1287
1288    DisableCollErrChecking = false;
1289    return TotalSize;
1290  }
1291
[a4fee13]1292  if (FH.isBigEndian())
1293    return readNumElems<true>(EffRank);
1294  return readNumElems<false>(EffRank);
1295}
1296
1297template <bool IsBigEndian>
1298size_t GenericIO::readNumElems(int EffRank) {
[00587dc]1299  if (EffRank == -1) {
1300#ifndef GENERICIO_NO_MPI
1301    MPI_Comm_rank(Comm, &EffRank);
1302#else
1303    EffRank = 0;
1304#endif
1305  }
1306
[8f0a211]1307  openAndReadHeader(Redistributing ? MismatchRedistribute : MismatchAllowed,
1308                    EffRank, false);
[00587dc]1309
1310  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1311
[a4fee13]1312  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1313  size_t RankIndex = getRankIndex<IsBigEndian>(EffRank, GH, RankMap, FH.getHeaderCache());
[00587dc]1314
1315  assert(RankIndex < GH->NRanks && "Invalid rank specified");
1316
[a4fee13]1317  RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->RanksStart +
[00587dc]1318                                               RankIndex*GH->RanksSize];
1319  return (size_t) RH->NElems;
1320}
1321
1322void GenericIO::readCoords(int Coords[3], int EffRank) {
[8f0a211]1323  if (EffRank == -1 && Redistributing) {
1324    std::fill(Coords, Coords + 3, 0);
1325    return;
1326  }
1327
[a4fee13]1328  if (FH.isBigEndian())
1329    readCoords<true>(Coords, EffRank);
1330  else
1331    readCoords<false>(Coords, EffRank);
1332}
1333
1334template <bool IsBigEndian>
1335void GenericIO::readCoords(int Coords[3], int EffRank) {
[00587dc]1336  if (EffRank == -1) {
1337#ifndef GENERICIO_NO_MPI
1338    MPI_Comm_rank(Comm, &EffRank);
1339#else
1340    EffRank = 0;
1341#endif
1342  }
1343
[8f0a211]1344  openAndReadHeader(MismatchAllowed, EffRank, false);
[00587dc]1345
1346  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1347
[a4fee13]1348  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1349  size_t RankIndex = getRankIndex<IsBigEndian>(EffRank, GH, RankMap, FH.getHeaderCache());
[00587dc]1350
1351  assert(RankIndex < GH->NRanks && "Invalid rank specified");
1352
[a4fee13]1353  RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->RanksStart +
[00587dc]1354                                               RankIndex*GH->RanksSize];
1355
1356  std::copy(RH->Coords, RH->Coords + 3, Coords);
1357}
1358
[a4fee13]1359void GenericIO::readData(int EffRank, bool PrintStats, bool CollStats) {
[00587dc]1360  int Rank;
1361#ifndef GENERICIO_NO_MPI
1362  MPI_Comm_rank(Comm, &Rank);
1363#else
1364  Rank = 0;
1365#endif
1366
[8f0a211]1367  uint64_t TotalReadSize = 0;
1368#ifndef GENERICIO_NO_MPI
1369  double StartTime = MPI_Wtime();
1370#else
1371  double StartTime = double(clock())/CLOCKS_PER_SEC;
1372#endif
1373
1374  int NErrs[3] = { 0, 0, 0 };
1375
1376  if (EffRank == -1 && Redistributing) {
1377    DisableCollErrChecking = true;
1378
1379    size_t RowOffset = 0;
1380    for (int i = 0, ie = SourceRanks.size(); i != ie; ++i) {
1381      readData(SourceRanks[i], RowOffset, Rank, TotalReadSize, NErrs);
1382      RowOffset += readNumElems(SourceRanks[i]);
1383    }
1384
1385    DisableCollErrChecking = false;
1386  } else {
1387    readData(EffRank, 0, Rank, TotalReadSize, NErrs);
1388  }
1389
1390  int AllNErrs[3];
1391#ifndef GENERICIO_NO_MPI
1392  MPI_Allreduce(NErrs, AllNErrs, 3, MPI_INT, MPI_SUM, Comm);
1393#else
1394  AllNErrs[0] = NErrs[0]; AllNErrs[1] = NErrs[1]; AllNErrs[2] = NErrs[2];
1395#endif
1396
1397  if (AllNErrs[0] > 0 || AllNErrs[1] > 0 || AllNErrs[2] > 0) {
1398    stringstream ss;
1399    ss << "Experienced " << AllNErrs[0] << " I/O error(s), " <<
1400          AllNErrs[1] << " CRC error(s) and " << AllNErrs[2] <<
1401          " decompression CRC error(s) reading: " << OpenFileName;
1402    throw runtime_error(ss.str());
1403  }
1404
1405#ifndef GENERICIO_NO_MPI
1406  MPI_Barrier(Comm);
1407#endif
1408
1409#ifndef GENERICIO_NO_MPI
1410  double EndTime = MPI_Wtime();
1411#else
1412  double EndTime = double(clock())/CLOCKS_PER_SEC;
1413#endif
1414
1415  double TotalTime = EndTime - StartTime;
1416  double MaxTotalTime;
1417#ifndef GENERICIO_NO_MPI
1418  if (CollStats)
1419    MPI_Reduce(&TotalTime, &MaxTotalTime, 1, MPI_DOUBLE, MPI_MAX, 0, Comm);
1420  else
1421#endif
1422  MaxTotalTime = TotalTime;
1423
1424  uint64_t AllTotalReadSize;
1425#ifndef GENERICIO_NO_MPI
1426  if (CollStats)
1427    MPI_Reduce(&TotalReadSize, &AllTotalReadSize, 1, MPI_UINT64_T, MPI_SUM, 0, Comm);
1428  else
1429#endif
1430  AllTotalReadSize = TotalReadSize;
1431
1432  if (Rank == 0 && PrintStats) {
1433    double Rate = ((double) AllTotalReadSize) / MaxTotalTime / (1024.*1024.);
[d6628a6]1434    std::cout << "Read " << Vars.size() << " variables from " << FileName <<
1435                 " (" << AllTotalReadSize << " bytes) in " << MaxTotalTime << "s: " <<
1436                 Rate << " MB/s [excluding header read]" << std::endl;
[8f0a211]1437  }
1438}
1439
1440void GenericIO::readData(int EffRank, size_t RowOffset, int Rank,
1441                         uint64_t &TotalReadSize, int NErrs[3]) {
1442  if (FH.isBigEndian())
1443    readData<true>(EffRank, RowOffset, Rank, TotalReadSize, NErrs);
1444  else
1445    readData<false>(EffRank, RowOffset, Rank, TotalReadSize, NErrs);
1446}
1447
1448// Note: Errors from this function should be recoverable. This means that if
1449// one rank throws an exception, then all ranks should.
1450template <bool IsBigEndian>
1451void GenericIO::readData(int EffRank, size_t RowOffset, int Rank,
1452                         uint64_t &TotalReadSize, int NErrs[3]) {
1453  openAndReadHeader(Redistributing ? MismatchRedistribute : MismatchAllowed,
1454                    EffRank, false);
[00587dc]1455
1456  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1457
1458  if (EffRank == -1)
1459    EffRank = Rank;
1460
[a4fee13]1461  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1462  size_t RankIndex = getRankIndex<IsBigEndian>(EffRank, GH, RankMap, FH.getHeaderCache());
[00587dc]1463
1464  assert(RankIndex < GH->NRanks && "Invalid rank specified");
1465
[a4fee13]1466  RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->RanksStart +
[00587dc]1467                                               RankIndex*GH->RanksSize];
1468
1469  for (size_t i = 0; i < Vars.size(); ++i) {
1470    uint64_t Offset = RH->Start;
1471    bool VarFound = false;
1472    for (uint64_t j = 0; j < GH->NVars; ++j) {
[a4fee13]1473      VariableHeader<IsBigEndian> *VH = (VariableHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->VarsStart +
[00587dc]1474                                                           j*GH->VarsSize];
1475
1476      string VName(VH->Name, VH->Name + NameSize);
1477      size_t VNameNull = VName.find('\0');
1478      if (VNameNull < NameSize)
1479        VName.resize(VNameNull);
1480
1481      uint64_t ReadSize = RH->NElems*VH->Size + CRCSize;
1482      if (VName != Vars[i].Name) {
1483        Offset += ReadSize;
1484        continue;
1485      }
1486
[5d57155]1487      size_t ElementSize = VH->Size;
1488      if (offsetof_safe(VH, ElementSize) < GH->VarsSize)
1489        ElementSize = VH->ElementSize;
1490
[00587dc]1491      VarFound = true;
1492      bool IsFloat = (bool) (VH->Flags & FloatValue),
1493           IsSigned = (bool) (VH->Flags & SignedValue);
1494      if (VH->Size != Vars[i].Size) {
1495        stringstream ss;
1496        ss << "Size mismatch for variable " << Vars[i].Name <<
1497              " in: " << OpenFileName << ": current: " << Vars[i].Size <<
1498              ", file: " << VH->Size;
1499        throw runtime_error(ss.str());
[5d57155]1500      } else if (ElementSize != Vars[i].ElementSize) {
1501        stringstream ss;
1502        ss << "Element size mismatch for variable " << Vars[i].Name <<
1503              " in: " << OpenFileName << ": current: " << Vars[i].ElementSize <<
1504              ", file: " << ElementSize;
1505        throw runtime_error(ss.str());
[00587dc]1506      } else if (IsFloat != Vars[i].IsFloat) {
1507        string Float("float"), Int("integer");
1508        stringstream ss;
1509        ss << "Type mismatch for variable " << Vars[i].Name <<
1510              " in: " << OpenFileName << ": current: " <<
1511              (Vars[i].IsFloat ? Float : Int) <<
1512              ", file: " << (IsFloat ? Float : Int);
1513        throw runtime_error(ss.str());
1514      } else if (IsSigned != Vars[i].IsSigned) {
1515        string Signed("signed"), Uns("unsigned");
1516        stringstream ss;
1517        ss << "Type mismatch for variable " << Vars[i].Name <<
1518              " in: " << OpenFileName << ": current: " <<
1519              (Vars[i].IsSigned ? Signed : Uns) <<
1520              ", file: " << (IsSigned ? Signed : Uns);
1521        throw runtime_error(ss.str());
1522      }
1523
[8f0a211]1524      size_t VarOffset = RowOffset*Vars[i].Size;
1525      void *VarData = ((char *) Vars[i].Data) + VarOffset;
1526
[00587dc]1527      vector<unsigned char> LData;
[eeacdad]1528      bool HasSZ = false;
[8f0a211]1529      void *Data = VarData;
[00587dc]1530      bool HasExtraSpace = Vars[i].HasExtraSpace;
[a4fee13]1531      if (offsetof_safe(GH, BlocksStart) < GH->GlobalHeaderSize &&
[00587dc]1532          GH->BlocksSize > 0) {
[a4fee13]1533        BlockHeader<IsBigEndian> *BH = (BlockHeader<IsBigEndian> *)
[00587dc]1534          &FH.getHeaderCache()[GH->BlocksStart +
1535                               (RankIndex*GH->NVars + j)*GH->BlocksSize];
1536        ReadSize = BH->Size + CRCSize;
1537        Offset = BH->Start;
1538
[eeacdad]1539        int FilterIdx = 0;
1540
1541        if (strncmp(BH->Filters[FilterIdx], LossyCompressName, FilterNameSize) == 0) {
1542          ++FilterIdx;
1543          HasSZ = true;
1544        }
1545
1546        if (strncmp(BH->Filters[FilterIdx], CompressName, FilterNameSize) == 0) {
[00587dc]1547          LData.resize(ReadSize);
1548          Data = &LData[0];
1549          HasExtraSpace = true;
[eeacdad]1550        } else if (BH->Filters[FilterIdx][0] != '\0') {
[00587dc]1551          stringstream ss;
1552          ss << "Unknown filter \"" << BH->Filters[0] << "\" on variable " << Vars[i].Name;
1553          throw runtime_error(ss.str());
1554        }
1555      }
1556
1557      assert(HasExtraSpace && "Extra space required for reading");
1558
1559      char CRCSave[CRCSize];
1560      char *CRCLoc = ((char *) Data) + ReadSize - CRCSize;
1561      if (HasExtraSpace)
1562        std::copy(CRCLoc, CRCLoc + CRCSize, CRCSave);
1563
1564      int Retry = 0;
1565      {
1566        int RetryCount = 300;
1567        const char *EnvStr = getenv("GENERICIO_RETRY_COUNT");
1568        if (EnvStr)
1569          RetryCount = atoi(EnvStr);
1570
1571        int RetrySleep = 100; // ms
1572        EnvStr = getenv("GENERICIO_RETRY_SLEEP");
1573        if (EnvStr)
1574          RetrySleep = atoi(EnvStr);
1575
1576        for (; Retry < RetryCount; ++Retry) {
1577          try {
1578            FH.get()->read(Data, ReadSize, Offset, Vars[i].Name);
1579            break;
1580          } catch (...) { }
1581
1582          usleep(1000*RetrySleep);
1583        }
1584
1585        if (Retry == RetryCount) {
1586          ++NErrs[0];
1587          break;
1588        } else if (Retry > 0) {
1589          EnvStr = getenv("GENERICIO_VERBOSE");
1590          if (EnvStr) {
1591            int Mod = atoi(EnvStr);
1592            if (Mod > 0) {
1593              int Rank;
1594#ifndef GENERICIO_NO_MPI
1595              MPI_Comm_rank(MPI_COMM_WORLD, &Rank);
1596#else
1597              Rank = 0;
1598#endif
1599
1600              std::cerr << "Rank " << Rank << ": " << Retry <<
1601                           " I/O retries were necessary for reading " <<
1602                           Vars[i].Name << " from: " << OpenFileName << "\n";
1603
1604              std::cerr.flush();
1605            }
1606          }
1607        }
1608      }
1609
1610      TotalReadSize += ReadSize;
1611
1612      uint64_t CRC = crc64_omp(Data, ReadSize);
1613      if (CRC != (uint64_t) -1) {
1614        ++NErrs[1];
1615
1616        int Rank;
1617#ifndef GENERICIO_NO_MPI
1618        MPI_Comm_rank(MPI_COMM_WORLD, &Rank);
1619#else
1620        Rank = 0;
1621#endif
1622
1623        // All ranks will do this and have a good time!
1624        string dn = "gio_crc_errors";
1625        mkdir(dn.c_str(), 0777);
1626
1627        srand(time(0));
1628        int DumpNum = rand();
1629        stringstream ssd;
1630        ssd << dn << "/gio_crc_error_dump." << Rank << "." << DumpNum << ".bin";
1631
1632        stringstream ss;
1633        ss << dn << "/gio_crc_error_log." << Rank << ".txt";
1634
1635        ofstream ofs(ss.str().c_str(), ofstream::out | ofstream::app);
1636        ofs << "On-Disk CRC Error Report:\n";
1637        ofs << "Variable: " << Vars[i].Name << "\n";
1638        ofs << "File: " << OpenFileName << "\n";
1639        ofs << "I/O Retries: " << Retry << "\n";
1640        ofs << "Size: " << ReadSize << " bytes\n";
1641        ofs << "Offset: " << Offset << " bytes\n";
1642        ofs << "CRC: " << CRC << " (expected is -1)\n";
1643        ofs << "Dump file: " << ssd.str() << "\n";
1644        ofs << "\n";
1645        ofs.close();
1646
1647        ofstream dofs(ssd.str().c_str(), ofstream::out);
1648        dofs.write((const char *) Data, ReadSize);
1649        dofs.close();
[14e73bb]1650
1651        uint64_t RawCRC = crc64_omp(Data, ReadSize - CRCSize);
1652        unsigned char *UData = (unsigned char *) Data;
1653        crc64_invert(RawCRC, &UData[ReadSize - CRCSize]);
1654        uint64_t NewCRC = crc64_omp(Data, ReadSize);
1655        std::cerr << "Recalulated CRC: " << NewCRC << ((NewCRC == -1) ? "ok" : "bad") << "\n";
[00587dc]1656        break;
1657      }
1658
1659      if (HasExtraSpace)
1660        std::copy(CRCSave, CRCSave + CRCSize, CRCLoc);
1661
1662      if (LData.size()) {
[a4fee13]1663        CompressHeader<IsBigEndian> *CH = (CompressHeader<IsBigEndian>*) &LData[0];
[00587dc]1664
1665#ifdef _OPENMP
1666#pragma omp master
1667  {
1668#endif
1669
1670       if (!blosc_initialized) {
1671         blosc_init();
1672         blosc_initialized = true;
1673       }
1674
[eeacdad]1675       if (!sz_initialized) {
1676         SZ_Init(NULL);
1677         sz_initialized = true;
1678       }
1679
[00587dc]1680#ifdef _OPENMP
1681       blosc_set_nthreads(omp_get_max_threads());
1682  }
1683#endif
1684
[eeacdad]1685        void *OrigData = VarData;
1686        size_t OrigDataSize = Vars[i].Size*RH->NElems;
1687
1688        if (HasSZ) {
1689          size_t CNBytes, CCBytes, CBlockSize;
1690          blosc_cbuffer_sizes(&LData[0] + sizeof(CompressHeader<IsBigEndian>),
1691                              &CNBytes, &CCBytes, &CBlockSize);
1692
1693          OrigData = malloc(CNBytes);
1694          OrigDataSize = CNBytes;
1695        }
1696
[a4fee13]1697        blosc_decompress(&LData[0] + sizeof(CompressHeader<IsBigEndian>),
[eeacdad]1698                         OrigData, OrigDataSize);
[00587dc]1699
[eeacdad]1700        if (CH->OrigCRC != crc64_omp(OrigData, OrigDataSize)) {
[00587dc]1701          ++NErrs[2];
1702          break;
1703        }
[eeacdad]1704
1705        if (HasSZ) {
1706          int SZDT = GetSZDT(Vars[i]);
1707          size_t LDSz = SZ_decompress_args(SZDT, (unsigned char *)OrigData, OrigDataSize,
1708                                           VarData, 0, 0, 0, 0, RH->NElems);
1709          free(OrigData);
1710
[abca157]1711          if (LDSz != RH->NElems)
[eeacdad]1712            throw runtime_error("Variable " + Vars[i].Name +
1713                                ": SZ decompression yielded the wrong amount of data");
1714        }
[00587dc]1715      }
1716
[a4fee13]1717      // Byte swap the data if necessary.
[eeacdad]1718      if (IsBigEndian != isBigEndian() && !HasSZ)
[5d57155]1719        for (size_t j = 0;
1720             j < RH->NElems*(Vars[i].Size/Vars[i].ElementSize); ++j) {
1721          char *Offset = ((char *) VarData) + j*Vars[i].ElementSize;
1722          bswap(Offset, Vars[i].ElementSize);
[a4fee13]1723        }
1724
[00587dc]1725      break;
1726    }
1727
1728    if (!VarFound)
1729      throw runtime_error("Variable " + Vars[i].Name +
1730                          " not found in: " + OpenFileName);
1731
1732    // This is for debugging.
1733    if (NErrs[0] || NErrs[1] || NErrs[2]) {
1734      const char *EnvStr = getenv("GENERICIO_VERBOSE");
1735      if (EnvStr) {
1736        int Mod = atoi(EnvStr);
1737        if (Mod > 0) {
1738          int Rank;
1739#ifndef GENERICIO_NO_MPI
1740          MPI_Comm_rank(MPI_COMM_WORLD, &Rank);
1741#else
1742          Rank = 0;
1743#endif
1744
1745          std::cerr << "Rank " << Rank << ": " << NErrs[0] << " I/O error(s), " <<
1746          NErrs[1] << " CRC error(s) and " << NErrs[2] <<
1747          " decompression CRC error(s) reading: " << Vars[i].Name <<
1748          " from: " << OpenFileName << "\n";
1749
1750          std::cerr.flush();
1751        }
1752      }
1753    }
1754
1755    if (NErrs[0] || NErrs[1] || NErrs[2])
1756      break;
1757  }
1758}
1759
1760void GenericIO::getVariableInfo(vector<VariableInfo> &VI) {
[a4fee13]1761  if (FH.isBigEndian())
1762    getVariableInfo<true>(VI);
1763  else
1764    getVariableInfo<false>(VI);
1765}
1766
1767template <bool IsBigEndian>
1768void GenericIO::getVariableInfo(vector<VariableInfo> &VI) {
[00587dc]1769  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1770
[a4fee13]1771  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
[00587dc]1772  for (uint64_t j = 0; j < GH->NVars; ++j) {
[a4fee13]1773    VariableHeader<IsBigEndian> *VH = (VariableHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->VarsStart +
[00587dc]1774                                                         j*GH->VarsSize];
1775
1776    string VName(VH->Name, VH->Name + NameSize);
1777    size_t VNameNull = VName.find('\0');
1778    if (VNameNull < NameSize)
1779      VName.resize(VNameNull);
1780
[5d57155]1781    size_t ElementSize = VH->Size;
1782    if (offsetof_safe(VH, ElementSize) < GH->VarsSize)
1783      ElementSize = VH->ElementSize;
1784
[00587dc]1785    bool IsFloat = (bool) (VH->Flags & FloatValue),
1786         IsSigned = (bool) (VH->Flags & SignedValue),
1787         IsPhysCoordX = (bool) (VH->Flags & ValueIsPhysCoordX),
1788         IsPhysCoordY = (bool) (VH->Flags & ValueIsPhysCoordY),
1789         IsPhysCoordZ = (bool) (VH->Flags & ValueIsPhysCoordZ),
1790         MaybePhysGhost = (bool) (VH->Flags & ValueMaybePhysGhost);
1791    VI.push_back(VariableInfo(VName, (size_t) VH->Size, IsFloat, IsSigned,
1792                              IsPhysCoordX, IsPhysCoordY, IsPhysCoordZ,
[5d57155]1793                              MaybePhysGhost, ElementSize));
[00587dc]1794  }
1795}
1796
1797void GenericIO::setNaturalDefaultPartition() {
1798#ifdef __bgq__
[14e73bb]1799  DefaultPartition = MPIX_IO_link_id();
[00587dc]1800#else
1801#ifndef GENERICIO_NO_MPI
1802  bool UseName = true;
1803  const char *EnvStr = getenv("GENERICIO_PARTITIONS_USE_NAME");
1804  if (EnvStr) {
1805    int Mod = atoi(EnvStr);
1806    UseName = (Mod != 0);
1807  }
1808
1809  if (UseName) {
1810    // This is a heuristic to generate ~256 partitions based on the
1811    // names of the nodes.
1812    char Name[MPI_MAX_PROCESSOR_NAME];
1813    int Len = 0;
1814
1815    MPI_Get_processor_name(Name, &Len);
1816    unsigned char color = 0;
1817    for (int i = 0; i < Len; ++i)
1818      color += (unsigned char) Name[i];
1819
1820    DefaultPartition = color;
1821  }
1822
1823  // This is for debugging.
1824  EnvStr = getenv("GENERICIO_RANK_PARTITIONS");
1825  if (EnvStr) {
1826    int Mod = atoi(EnvStr);
1827    if (Mod > 0) {
1828      int Rank;
1829      MPI_Comm_rank(MPI_COMM_WORLD, &Rank);
1830      DefaultPartition += Rank % Mod;
1831    }
1832  }
1833#endif
1834#endif
1835}
1836
1837} /* END namespace cosmotk */
Note: See TracBrowser for help on using the repository browser.