source: GenericIO.cxx @ 2c47b73

Revision 2c47b73, 52.3 KB checked in by Hal Finkel <hfinkel@…>, 6 years ago (diff)

more work on adding SZ (latest version)

  • Property mode set to 100644
Line 
1/*
2 *                    Copyright (C) 2015, UChicago Argonne, LLC
3 *                               All Rights Reserved
4 *
5 *                               Generic IO (ANL-15-066)
6 *                     Hal Finkel, Argonne National Laboratory
7 *
8 *                              OPEN SOURCE LICENSE
9 *
10 * Under the terms of Contract No. DE-AC02-06CH11357 with UChicago Argonne,
11 * LLC, the U.S. Government retains certain rights in this software.
12 *
13 * Redistribution and use in source and binary forms, with or without
14 * modification, are permitted provided that the following conditions are met:
15 *
16 *   1. Redistributions of source code must retain the above copyright notice,
17 *      this list of conditions and the following disclaimer.
18 *
19 *   2. Redistributions in binary form must reproduce the above copyright
20 *      notice, this list of conditions and the following disclaimer in the
21 *      documentation and/or other materials provided with the distribution.
22 *
23 *   3. Neither the names of UChicago Argonne, LLC or the Department of Energy
24 *      nor the names of its contributors may be used to endorse or promote
25 *      products derived from this software without specific prior written
26 *      permission.
27 *
28 * *****************************************************************************
29 *
30 *                                  DISCLAIMER
31 * THE SOFTWARE IS SUPPLIED “AS IS” WITHOUT WARRANTY OF ANY KIND.  NEITHER THE
32 * UNTED STATES GOVERNMENT, NOR THE UNITED STATES DEPARTMENT OF ENERGY, NOR
33 * UCHICAGO ARGONNE, LLC, NOR ANY OF THEIR EMPLOYEES, MAKES ANY WARRANTY,
34 * EXPRESS OR IMPLIED, OR ASSUMES ANY LEGAL LIABILITY OR RESPONSIBILITY FOR THE
35 * ACCURACY, COMPLETENESS, OR USEFULNESS OF ANY INFORMATION, DATA, APPARATUS,
36 * PRODUCT, OR PROCESS DISCLOSED, OR REPRESENTS THAT ITS USE WOULD NOT INFRINGE
37 * PRIVATELY OWNED RIGHTS.
38 *
39 * *****************************************************************************
40 */
41
42#define _XOPEN_SOURCE 600
43#include "CRC64.h"
44#include "GenericIO.h"
45
46extern "C" {
47#include "blosc.h"
48}
49#include "sz.h"
50
51#include <sstream>
52#include <fstream>
53#include <stdexcept>
54#include <iterator>
55#include <algorithm>
56#include <cassert>
57#include <cstddef>
58#include <cstring>
59
60#ifndef GENERICIO_NO_MPI
61#include <ctime>
62#endif
63
64#include <sys/types.h>
65#include <sys/stat.h>
66#include <fcntl.h>
67#include <errno.h>
68
69#ifdef __bgq__
70#include <mpix.h>
71#endif
72
73#ifndef MPI_UINT64_T
74#define MPI_UINT64_T (sizeof(long) == 8 ? MPI_LONG : MPI_LONG_LONG)
75#endif
76
77using namespace std;
78
79namespace gio {
80
81
82#ifndef GENERICIO_NO_MPI
83GenericFileIO_MPI::~GenericFileIO_MPI() {
84  (void) MPI_File_close(&FH);
85}
86
87void GenericFileIO_MPI::open(const std::string &FN, bool ForReading) {
88  FileName = FN;
89
90  int amode = ForReading ? MPI_MODE_RDONLY : (MPI_MODE_WRONLY | MPI_MODE_CREATE);
91  if (MPI_File_open(Comm, const_cast<char *>(FileName.c_str()), amode,
92                    MPI_INFO_NULL, &FH) != MPI_SUCCESS)
93    throw runtime_error((!ForReading ? "Unable to create the file: " :
94                                       "Unable to open the file: ") +
95                        FileName);
96}
97
98void GenericFileIO_MPI::setSize(size_t sz) {
99  if (MPI_File_set_size(FH, sz) != MPI_SUCCESS)
100    throw runtime_error("Unable to set size for file: " + FileName);
101}
102
103void GenericFileIO_MPI::read(void *buf, size_t count, off_t offset,
104                             const std::string &D) {
105  while (count > 0) {
106    MPI_Status status;
107    if (MPI_File_read_at(FH, offset, buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
108      throw runtime_error("Unable to read " + D + " from file: " + FileName);
109
110    int scount;
111    (void) MPI_Get_count(&status, MPI_BYTE, &scount);
112
113    count -= scount;
114    buf = ((char *) buf) + scount;
115    offset += scount;
116  }
117}
118
119void GenericFileIO_MPI::write(const void *buf, size_t count, off_t offset,
120                              const std::string &D) {
121  while (count > 0) {
122    MPI_Status status;
123    if (MPI_File_write_at(FH, offset, (void *) buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
124      throw runtime_error("Unable to write " + D + " to file: " + FileName);
125
126    int scount = 0;
127    // On some systems, MPI_Get_count will not return zero even when count is zero.
128    if (count > 0)
129      (void) MPI_Get_count(&status, MPI_BYTE, &scount);
130
131    count -= scount;
132    buf = ((char *) buf) + scount;
133    offset += scount;
134  }
135}
136
137void GenericFileIO_MPICollective::read(void *buf, size_t count, off_t offset,
138                             const std::string &D) {
139  int Continue = 0;
140
141  do {
142    MPI_Status status;
143    if (MPI_File_read_at_all(FH, offset, buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
144      throw runtime_error("Unable to read " + D + " from file: " + FileName);
145
146    int scount = 0;
147    // On some systems, MPI_Get_count will not return zero even when count is zero.
148    if (count > 0)
149      (void) MPI_Get_count(&status, MPI_BYTE, &scount);
150
151    count -= scount;
152    buf = ((char *) buf) + scount;
153    offset += scount;
154
155    int NeedContinue = (count > 0);
156    MPI_Allreduce(&NeedContinue, &Continue, 1, MPI_INT, MPI_SUM, Comm);
157  } while (Continue);
158}
159
160void GenericFileIO_MPICollective::write(const void *buf, size_t count, off_t offset,
161                              const std::string &D) {
162  int Continue = 0;
163
164  do {
165    MPI_Status status;
166    if (MPI_File_write_at_all(FH, offset, (void *) buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
167      throw runtime_error("Unable to write " + D + " to file: " + FileName);
168
169    int scount;
170    (void) MPI_Get_count(&status, MPI_BYTE, &scount);
171
172    count -= scount;
173    buf = ((char *) buf) + scount;
174    offset += scount;
175
176    int NeedContinue = (count > 0);
177    MPI_Allreduce(&NeedContinue, &Continue, 1, MPI_INT, MPI_SUM, Comm);
178  } while (Continue);
179}
180#endif
181
182GenericFileIO_POSIX::~GenericFileIO_POSIX() {
183  if (FH != -1) close(FH);
184}
185
186void GenericFileIO_POSIX::open(const std::string &FN, bool ForReading) {
187  FileName = FN;
188
189  int flags = ForReading ? O_RDONLY : (O_WRONLY | O_CREAT);
190  int mode = S_IRUSR | S_IWUSR | S_IRGRP;
191  errno = 0;
192  if ((FH = ::open(FileName.c_str(), flags, mode)) == -1)
193    throw runtime_error((!ForReading ? "Unable to create the file: " :
194                                       "Unable to open the file: ") +
195                        FileName + ": " + strerror(errno));
196}
197
198void GenericFileIO_POSIX::setSize(size_t sz) {
199  if (ftruncate(FH, sz) == -1)
200    throw runtime_error("Unable to set size for file: " + FileName);
201}
202
203void GenericFileIO_POSIX::read(void *buf, size_t count, off_t offset,
204                               const std::string &D) {
205  while (count > 0) {
206    ssize_t scount;
207    errno = 0;
208    if ((scount = pread(FH, buf, count, offset)) == -1) {
209      if (errno == EINTR)
210        continue;
211
212      throw runtime_error("Unable to read " + D + " from file: " + FileName +
213                          ": " + strerror(errno));
214    }
215
216    count -= scount;
217    buf = ((char *) buf) + scount;
218    offset += scount;
219  }
220}
221
222void GenericFileIO_POSIX::write(const void *buf, size_t count, off_t offset,
223                                const std::string &D) {
224  while (count > 0) {
225    ssize_t scount;
226    errno = 0;
227    if ((scount = pwrite(FH, buf, count, offset)) == -1) {
228      if (errno == EINTR)
229        continue;
230
231      throw runtime_error("Unable to write " + D + " to file: " + FileName +
232                          ": " + strerror(errno));
233    }
234
235    count -= scount;
236    buf = ((char *) buf) + scount;
237    offset += scount;
238  }
239}
240
241static bool isBigEndian() {
242  const uint32_t one = 1;
243  return !(*((char *)(&one)));
244}
245
246static void bswap(void *v, size_t s) {
247  char *p = (char *) v;
248  for (size_t i = 0; i < s/2; ++i)
249    std::swap(p[i], p[s - (i+1)]);
250}
251
252// Using #pragma pack here, instead of __attribute__((packed)) because xlc, at
253// least as of v12.1, won't take __attribute__((packed)) on non-POD and/or
254// templated types.
255#pragma pack(1)
256
257template <typename T, bool IsBigEndian>
258struct endian_specific_value {
259  operator T() const {
260    T rvalue = value;
261    if (IsBigEndian != isBigEndian())
262      bswap(&rvalue, sizeof(T));
263
264    return rvalue;
265  };
266
267  endian_specific_value &operator = (T nvalue) {
268    if (IsBigEndian != isBigEndian())
269      bswap(&nvalue, sizeof(T));
270
271    value = nvalue;
272    return *this;
273  }
274
275  endian_specific_value &operator += (T nvalue) {
276    *this = *this + nvalue;
277    return *this;
278  }
279
280  endian_specific_value &operator -= (T nvalue) {
281    *this = *this - nvalue;
282    return *this;
283  }
284
285private:
286  T value;
287};
288
289static const size_t CRCSize = 8;
290
291static const size_t MagicSize = 8;
292static const char *MagicBE = "HACC01B";
293static const char *MagicLE = "HACC01L";
294
295template <bool IsBigEndian>
296struct GlobalHeader {
297  char Magic[MagicSize];
298  endian_specific_value<uint64_t, IsBigEndian> HeaderSize;
299  endian_specific_value<uint64_t, IsBigEndian> NElems; // The global total
300  endian_specific_value<uint64_t, IsBigEndian> Dims[3];
301  endian_specific_value<uint64_t, IsBigEndian> NVars;
302  endian_specific_value<uint64_t, IsBigEndian> VarsSize;
303  endian_specific_value<uint64_t, IsBigEndian> VarsStart;
304  endian_specific_value<uint64_t, IsBigEndian> NRanks;
305  endian_specific_value<uint64_t, IsBigEndian> RanksSize;
306  endian_specific_value<uint64_t, IsBigEndian> RanksStart;
307  endian_specific_value<uint64_t, IsBigEndian> GlobalHeaderSize;
308  endian_specific_value<double,   IsBigEndian> PhysOrigin[3];
309  endian_specific_value<double,   IsBigEndian> PhysScale[3];
310  endian_specific_value<uint64_t, IsBigEndian> BlocksSize;
311  endian_specific_value<uint64_t, IsBigEndian> BlocksStart;
312};
313
314enum {
315  FloatValue          = (1 << 0),
316  SignedValue         = (1 << 1),
317  ValueIsPhysCoordX   = (1 << 2),
318  ValueIsPhysCoordY   = (1 << 3),
319  ValueIsPhysCoordZ   = (1 << 4),
320  ValueMaybePhysGhost = (1 << 5)
321};
322
323static const size_t NameSize = 256;
324template <bool IsBigEndian>
325struct VariableHeader {
326  char Name[NameSize];
327  endian_specific_value<uint64_t, IsBigEndian> Flags;
328  endian_specific_value<uint64_t, IsBigEndian> Size;
329  endian_specific_value<uint64_t, IsBigEndian> ElementSize;
330};
331
332template <bool IsBigEndian>
333struct RankHeader {
334  endian_specific_value<uint64_t, IsBigEndian> Coords[3];
335  endian_specific_value<uint64_t, IsBigEndian> NElems;
336  endian_specific_value<uint64_t, IsBigEndian> Start;
337  endian_specific_value<uint64_t, IsBigEndian> GlobalRank;
338};
339
340static const size_t FilterNameSize = 8;
341static const size_t MaxFilters = 4;
342template <bool IsBigEndian>
343struct BlockHeader {
344  char Filters[MaxFilters][FilterNameSize];
345  endian_specific_value<uint64_t, IsBigEndian> Start;
346  endian_specific_value<uint64_t, IsBigEndian> Size;
347};
348
349template <bool IsBigEndian>
350struct CompressHeader {
351  endian_specific_value<uint64_t, IsBigEndian> OrigCRC;
352};
353const char *CompressName = "BLOSC";
354
355const char *LossyCompressName = "SZ";
356
357#pragma pack()
358
359unsigned GenericIO::DefaultFileIOType = FileIOPOSIX;
360int GenericIO::DefaultPartition = 0;
361bool GenericIO::DefaultShouldCompress = false;
362
363#ifndef GENERICIO_NO_MPI
364std::size_t GenericIO::CollectiveMPIIOThreshold = 0;
365#endif
366
367static bool blosc_initialized = false;
368static bool sz_initialized = false;
369
370#ifndef GENERICIO_NO_MPI
371void GenericIO::write() {
372  if (isBigEndian())
373    write<true>();
374  else
375    write<false>();
376}
377
378// Note: writing errors are not currently recoverable (one rank may fail
379// while the others don't).
380template <bool IsBigEndian>
381void GenericIO::write() {
382  const char *Magic = IsBigEndian ? MagicBE : MagicLE;
383
384  uint64_t FileSize = 0;
385
386  int NRanks, Rank;
387  MPI_Comm_rank(Comm, &Rank);
388  MPI_Comm_size(Comm, &NRanks);
389
390#ifdef __bgq__
391  MPI_Barrier(Comm);
392#endif
393  MPI_Comm_split(Comm, Partition, Rank, &SplitComm);
394
395  int SplitNRanks, SplitRank;
396  MPI_Comm_rank(SplitComm, &SplitRank);
397  MPI_Comm_size(SplitComm, &SplitNRanks);
398
399  string LocalFileName;
400  if (SplitNRanks != NRanks) {
401    if (Rank == 0) {
402      // In split mode, the specified file becomes the rank map, and the real
403      // data is partitioned.
404
405      vector<int> MapRank, MapPartition;
406      MapRank.resize(NRanks);
407      for (int i = 0; i < NRanks; ++i) MapRank[i] = i;
408
409      MapPartition.resize(NRanks);
410      MPI_Gather(&Partition, 1, MPI_INT, &MapPartition[0], 1, MPI_INT, 0, Comm);
411
412      GenericIO GIO(MPI_COMM_SELF, FileName, FileIOType);
413      GIO.setNumElems(NRanks);
414      GIO.addVariable("$rank", MapRank); /* this is for use by humans; the reading
415                                            code assumes that the partitions are in
416                                            rank order */
417      GIO.addVariable("$partition", MapPartition);
418
419      vector<int> CX, CY, CZ;
420      int TopoStatus;
421      MPI_Topo_test(Comm, &TopoStatus);
422      if (TopoStatus == MPI_CART) {
423        CX.resize(NRanks);
424        CY.resize(NRanks);
425        CZ.resize(NRanks);
426
427        for (int i = 0; i < NRanks; ++i) {
428          int C[3];
429          MPI_Cart_coords(Comm, i, 3, C);
430
431          CX[i] = C[0];
432          CY[i] = C[1];
433          CZ[i] = C[2];
434        }
435
436        GIO.addVariable("$x", CX);
437        GIO.addVariable("$y", CY);
438        GIO.addVariable("$z", CZ);
439      }
440
441      GIO.write();
442    } else {
443      MPI_Gather(&Partition, 1, MPI_INT, 0, 0, MPI_INT, 0, Comm);
444    }
445
446    stringstream ss;
447    ss << FileName << "#" << Partition;
448    LocalFileName = ss.str();
449  } else {
450    LocalFileName = FileName;
451  }
452
453  RankHeader<IsBigEndian> RHLocal;
454  int Dims[3], Periods[3], Coords[3];
455
456  int TopoStatus;
457  MPI_Topo_test(Comm, &TopoStatus);
458  if (TopoStatus == MPI_CART) {
459    MPI_Cart_get(Comm, 3, Dims, Periods, Coords);
460  } else {
461    Dims[0] = NRanks;
462    std::fill(Dims + 1, Dims + 3, 1);
463    std::fill(Periods, Periods + 3, 0);
464    Coords[0] = Rank;
465    std::fill(Coords + 1, Coords + 3, 0);
466  }
467
468  std::copy(Coords, Coords + 3, RHLocal.Coords);
469  RHLocal.NElems = NElems;
470  RHLocal.Start = 0;
471  RHLocal.GlobalRank = Rank;
472
473  bool ShouldCompress = DefaultShouldCompress;
474  const char *EnvStr = getenv("GENERICIO_COMPRESS");
475  if (EnvStr) {
476    int Mod = atoi(EnvStr);
477    ShouldCompress = (Mod > 0);
478  }
479
480  bool NeedsBlockHeaders = ShouldCompress;
481  EnvStr = getenv("GENERICIO_FORCE_BLOCKS");
482  if (!NeedsBlockHeaders && EnvStr) {
483    int Mod = atoi(EnvStr);
484    NeedsBlockHeaders = (Mod > 0);
485  }
486
487  vector<BlockHeader<IsBigEndian> > LocalBlockHeaders;
488  vector<void *> LocalData;
489  vector<bool> LocalHasExtraSpace;
490  vector<vector<unsigned char> > LocalCData;
491  if (NeedsBlockHeaders) {
492    LocalBlockHeaders.resize(Vars.size());
493    LocalData.resize(Vars.size());
494    LocalHasExtraSpace.resize(Vars.size());
495    if (ShouldCompress)
496      LocalCData.resize(Vars.size());
497
498    for (size_t i = 0; i < Vars.size(); ++i) {
499      // Filters null by default, leave null starting address (needs to be
500      // calculated by the header-writing rank).
501      memset(&LocalBlockHeaders[i], 0, sizeof(BlockHeader<IsBigEndian>));
502      if (ShouldCompress) {
503        LocalCData[i].resize(sizeof(CompressHeader<IsBigEndian>));
504
505        CompressHeader<IsBigEndian> *CH = (CompressHeader<IsBigEndian>*) &LocalCData[i][0];
506        CH->OrigCRC = crc64_omp(Vars[i].Data, Vars[i].Size*NElems);
507
508#ifdef _OPENMP
509#pragma omp master
510  {
511#endif
512
513       if (!blosc_initialized) {
514         blosc_init();
515         blosc_initialized = true;
516       }
517
518       if (!sz_initialized) {
519         SZ_Init(NULL);
520         sz_initialized = true;
521       }
522
523#ifdef _OPENMP
524       blosc_set_nthreads(omp_get_max_threads());
525  }
526#endif
527
528        LocalCData[i].resize(LocalCData[i].size() + NElems*Vars[i].Size);
529        if (blosc_compress(9, 1, Vars[i].Size, NElems*Vars[i].Size, Vars[i].Data,
530                           &LocalCData[i][0] + sizeof(CompressHeader<IsBigEndian>),
531                           NElems*Vars[i].Size) <= 0)
532          goto nocomp;
533
534        strncpy(LocalBlockHeaders[i].Filters[0], CompressName, FilterNameSize);
535        size_t CNBytes, CCBytes, CBlockSize;
536        blosc_cbuffer_sizes(&LocalCData[i][0] + sizeof(CompressHeader<IsBigEndian>),
537                            &CNBytes, &CCBytes, &CBlockSize);
538        LocalCData[i].resize(CCBytes + sizeof(CompressHeader<IsBigEndian>));
539
540        LocalBlockHeaders[i].Size = LocalCData[i].size();
541        LocalCData[i].resize(LocalCData[i].size() + CRCSize);
542        LocalData[i] = &LocalCData[i][0];
543        LocalHasExtraSpace[i] = true;
544      } else {
545nocomp:
546        LocalBlockHeaders[i].Size = NElems*Vars[i].Size;
547        LocalData[i] = Vars[i].Data;
548        LocalHasExtraSpace[i] = Vars[i].HasExtraSpace;
549      }
550    }
551  }
552
553  double StartTime = MPI_Wtime();
554
555  if (SplitRank == 0) {
556    uint64_t HeaderSize = sizeof(GlobalHeader<IsBigEndian>) + Vars.size()*sizeof(VariableHeader<IsBigEndian>) +
557                          SplitNRanks*sizeof(RankHeader<IsBigEndian>) + CRCSize;
558    if (NeedsBlockHeaders)
559      HeaderSize += SplitNRanks*Vars.size()*sizeof(BlockHeader<IsBigEndian>);
560
561    vector<char> Header(HeaderSize, 0);
562    GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &Header[0];
563    std::copy(Magic, Magic + MagicSize, GH->Magic);
564    GH->HeaderSize = HeaderSize - CRCSize;
565    GH->NElems = NElems; // This will be updated later
566    std::copy(Dims, Dims + 3, GH->Dims);
567    GH->NVars = Vars.size();
568    GH->VarsSize = sizeof(VariableHeader<IsBigEndian>);
569    GH->VarsStart = sizeof(GlobalHeader<IsBigEndian>);
570    GH->NRanks = SplitNRanks;
571    GH->RanksSize = sizeof(RankHeader<IsBigEndian>);
572    GH->RanksStart = GH->VarsStart + Vars.size()*sizeof(VariableHeader<IsBigEndian>);
573    GH->GlobalHeaderSize = sizeof(GlobalHeader<IsBigEndian>);
574    std::copy(PhysOrigin, PhysOrigin + 3, GH->PhysOrigin);
575    std::copy(PhysScale,  PhysScale  + 3, GH->PhysScale);
576    if (!NeedsBlockHeaders) {
577      GH->BlocksSize = GH->BlocksStart = 0;
578    } else {
579      GH->BlocksSize = sizeof(BlockHeader<IsBigEndian>);
580      GH->BlocksStart = GH->RanksStart + SplitNRanks*sizeof(RankHeader<IsBigEndian>);
581    }
582
583    uint64_t RecordSize = 0;
584    VariableHeader<IsBigEndian> *VH = (VariableHeader<IsBigEndian> *) &Header[GH->VarsStart];
585    for (size_t i = 0; i < Vars.size(); ++i, ++VH) {
586      string VName(Vars[i].Name);
587      VName.resize(NameSize);
588
589      std::copy(VName.begin(), VName.end(), VH->Name);
590      uint64_t VFlags = 0;
591      if (Vars[i].IsFloat)  VFlags |= FloatValue;
592      if (Vars[i].IsSigned) VFlags |= SignedValue;
593      if (Vars[i].IsPhysCoordX) VFlags |= ValueIsPhysCoordX;
594      if (Vars[i].IsPhysCoordY) VFlags |= ValueIsPhysCoordY;
595      if (Vars[i].IsPhysCoordZ) VFlags |= ValueIsPhysCoordZ;
596      if (Vars[i].MaybePhysGhost) VFlags |= ValueMaybePhysGhost;
597      VH->Flags = VFlags;
598      RecordSize += VH->Size = Vars[i].Size;
599      VH->ElementSize = Vars[i].ElementSize;
600    }
601
602    MPI_Gather(&RHLocal, sizeof(RHLocal), MPI_BYTE,
603               &Header[GH->RanksStart], sizeof(RHLocal),
604               MPI_BYTE, 0, SplitComm);
605
606    if (NeedsBlockHeaders) {
607      MPI_Gather(&LocalBlockHeaders[0],
608                 Vars.size()*sizeof(BlockHeader<IsBigEndian>), MPI_BYTE,
609                 &Header[GH->BlocksStart],
610                 Vars.size()*sizeof(BlockHeader<IsBigEndian>), MPI_BYTE,
611                 0, SplitComm);
612
613      BlockHeader<IsBigEndian> *BH = (BlockHeader<IsBigEndian> *) &Header[GH->BlocksStart];
614      for (int i = 0; i < SplitNRanks; ++i)
615      for (size_t j = 0; j < Vars.size(); ++j, ++BH) {
616        if (i == 0 && j == 0)
617          BH->Start = HeaderSize;
618        else
619          BH->Start = BH[-1].Start + BH[-1].Size + CRCSize;
620      }
621
622      RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &Header[GH->RanksStart];
623      RH->Start = HeaderSize; ++RH;
624      for (int i = 1; i < SplitNRanks; ++i, ++RH) {
625        RH->Start =
626          ((BlockHeader<IsBigEndian> *) &Header[GH->BlocksStart])[i*Vars.size()].Start;
627        GH->NElems += RH->NElems;
628      }
629
630      // Compute the total file size.
631      uint64_t LastData = BH[-1].Size + CRCSize;
632      FileSize = BH[-1].Start + LastData;
633    } else {
634      RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &Header[GH->RanksStart];
635      RH->Start = HeaderSize; ++RH;
636      for (int i = 1; i < SplitNRanks; ++i, ++RH) {
637        uint64_t PrevNElems = RH[-1].NElems;
638        uint64_t PrevData = PrevNElems*RecordSize + CRCSize*Vars.size();
639        RH->Start = RH[-1].Start + PrevData;
640        GH->NElems += RH->NElems;
641      }
642
643      // Compute the total file size.
644      uint64_t LastNElems = RH[-1].NElems;
645      uint64_t LastData = LastNElems*RecordSize + CRCSize*Vars.size();
646      FileSize = RH[-1].Start + LastData;
647    }
648
649    // Now that the starting offset has been computed, send it back to each rank.
650    MPI_Scatter(&Header[GH->RanksStart], sizeof(RHLocal),
651                MPI_BYTE, &RHLocal, sizeof(RHLocal),
652                MPI_BYTE, 0, SplitComm);
653
654    if (NeedsBlockHeaders)
655      MPI_Scatter(&Header[GH->BlocksStart],
656                  sizeof(BlockHeader<IsBigEndian>)*Vars.size(), MPI_BYTE,
657                  &LocalBlockHeaders[0],
658                  sizeof(BlockHeader<IsBigEndian>)*Vars.size(), MPI_BYTE,
659                  0, SplitComm);
660
661    uint64_t HeaderCRC = crc64_omp(&Header[0], HeaderSize - CRCSize);
662    crc64_invert(HeaderCRC, &Header[HeaderSize - CRCSize]);
663
664    if (FileIOType == FileIOMPI)
665      FH.get() = new GenericFileIO_MPI(MPI_COMM_SELF);
666    else if (FileIOType == FileIOMPICollective)
667      FH.get() = new GenericFileIO_MPICollective(MPI_COMM_SELF);
668    else
669      FH.get() = new GenericFileIO_POSIX();
670
671    FH.get()->open(LocalFileName);
672    FH.get()->setSize(FileSize);
673    FH.get()->write(&Header[0], HeaderSize, 0, "header");
674
675    close();
676  } else {
677    MPI_Gather(&RHLocal, sizeof(RHLocal), MPI_BYTE, 0, 0, MPI_BYTE, 0, SplitComm);
678    if (NeedsBlockHeaders)
679      MPI_Gather(&LocalBlockHeaders[0], Vars.size()*sizeof(BlockHeader<IsBigEndian>),
680                 MPI_BYTE, 0, 0, MPI_BYTE, 0, SplitComm);
681    MPI_Scatter(0, 0, MPI_BYTE, &RHLocal, sizeof(RHLocal), MPI_BYTE, 0, SplitComm);
682    if (NeedsBlockHeaders)
683      MPI_Scatter(0, 0, MPI_BYTE, &LocalBlockHeaders[0], sizeof(BlockHeader<IsBigEndian>)*Vars.size(),
684                  MPI_BYTE, 0, SplitComm);
685  }
686
687  MPI_Barrier(SplitComm);
688
689  if (FileIOType == FileIOMPI)
690    FH.get() = new GenericFileIO_MPI(SplitComm);
691  else if (FileIOType == FileIOMPICollective)
692    FH.get() = new GenericFileIO_MPICollective(SplitComm);
693  else
694    FH.get() = new GenericFileIO_POSIX();
695
696  FH.get()->open(LocalFileName);
697
698  uint64_t Offset = RHLocal.Start;
699  for (size_t i = 0; i < Vars.size(); ++i) {
700    uint64_t WriteSize = NeedsBlockHeaders ?
701                         LocalBlockHeaders[i].Size : NElems*Vars[i].Size;
702    void *Data = NeedsBlockHeaders ? LocalData[i] : Vars[i].Data;
703    uint64_t CRC = crc64_omp(Data, WriteSize);
704    bool HasExtraSpace = NeedsBlockHeaders ?
705                         LocalHasExtraSpace[i] : Vars[i].HasExtraSpace;
706    char *CRCLoc = HasExtraSpace ?  ((char *) Data) + WriteSize : (char *) &CRC;
707
708    if (NeedsBlockHeaders)
709      Offset = LocalBlockHeaders[i].Start;
710
711    // When using extra space for the CRC write, preserve the original contents.
712    char CRCSave[CRCSize];
713    if (HasExtraSpace)
714      std::copy(CRCLoc, CRCLoc + CRCSize, CRCSave);
715
716    crc64_invert(CRC, CRCLoc);
717
718    if (HasExtraSpace) {
719      FH.get()->write(Data, WriteSize + CRCSize, Offset, Vars[i].Name + " with CRC");
720    } else {
721      FH.get()->write(Data, WriteSize, Offset, Vars[i].Name);
722      FH.get()->write(CRCLoc, CRCSize, Offset + WriteSize, Vars[i].Name + " CRC");
723    }
724
725    if (HasExtraSpace)
726       std::copy(CRCSave, CRCSave + CRCSize, CRCLoc);
727
728    Offset += WriteSize + CRCSize;
729  }
730
731  close();
732  MPI_Barrier(Comm);
733
734  double EndTime = MPI_Wtime();
735  double TotalTime = EndTime - StartTime;
736  double MaxTotalTime;
737  MPI_Reduce(&TotalTime, &MaxTotalTime, 1, MPI_DOUBLE, MPI_MAX, 0, Comm);
738
739  if (SplitNRanks != NRanks) {
740    uint64_t ContribFileSize = (SplitRank == 0) ? FileSize : 0;
741    MPI_Reduce(&ContribFileSize, &FileSize, 1, MPI_UINT64_T, MPI_SUM, 0, Comm);
742  }
743
744  if (Rank == 0) {
745    double Rate = ((double) FileSize) / MaxTotalTime / (1024.*1024.);
746    std::cout << "Wrote " << Vars.size() << " variables to " << FileName <<
747                  " (" << FileSize << " bytes) in " << MaxTotalTime << "s: " <<
748                  Rate << " MB/s" << std::endl;
749  }
750
751  MPI_Comm_free(&SplitComm);
752  SplitComm = MPI_COMM_NULL;
753}
754#endif // GENERICIO_NO_MPI
755
756template <bool IsBigEndian>
757void GenericIO::readHeaderLeader(void *GHPtr, MismatchBehavior MB, int NRanks,
758                                 int Rank, int SplitNRanks,
759                                 string &LocalFileName, uint64_t &HeaderSize,
760                                 vector<char> &Header) {
761  GlobalHeader<IsBigEndian> &GH = *(GlobalHeader<IsBigEndian> *) GHPtr;
762
763  if (MB == MismatchDisallowed) {
764    if (SplitNRanks != (int) GH.NRanks) {
765      stringstream ss;
766      ss << "Won't read " << LocalFileName << ": communicator-size mismatch: " <<
767            "current: " << SplitNRanks << ", file: " << GH.NRanks;
768      throw runtime_error(ss.str());
769    }
770
771#ifndef GENERICIO_NO_MPI
772    int TopoStatus;
773    MPI_Topo_test(Comm, &TopoStatus);
774    if (TopoStatus == MPI_CART) {
775      int Dims[3], Periods[3], Coords[3];
776      MPI_Cart_get(Comm, 3, Dims, Periods, Coords);
777
778      bool DimsMatch = true;
779      for (int i = 0; i < 3; ++i) {
780        if ((uint64_t) Dims[i] != GH.Dims[i]) {
781          DimsMatch = false;
782          break;
783        }
784      }
785
786      if (!DimsMatch) {
787        stringstream ss;
788        ss << "Won't read " << LocalFileName <<
789              ": communicator-decomposition mismatch: " <<
790              "current: " << Dims[0] << "x" << Dims[1] << "x" << Dims[2] <<
791              ", file: " << GH.Dims[0] << "x" << GH.Dims[1] << "x" <<
792              GH.Dims[2];
793        throw runtime_error(ss.str());
794      }
795    }
796#endif
797  } else if (MB == MismatchRedistribute && !Redistributing) {
798    Redistributing = true;
799
800    int NFileRanks = RankMap.empty() ? (int) GH.NRanks : (int) RankMap.size();
801    int NFileRanksPerRank = NFileRanks/NRanks;
802    int NRemFileRank = NFileRanks % NRanks;
803
804    if (!NFileRanksPerRank) {
805      // We have only the remainder, so the last NRemFileRank ranks get one
806      // file rank, and the others don't.
807      if (NRemFileRank && NRanks - Rank <= NRemFileRank)
808        SourceRanks.push_back(NRanks - (Rank + 1));
809    } else {
810      // Since NRemFileRank < NRanks, and we don't want to put any extra memory
811      // load on rank 0 (because rank 0's memory load is normally higher than
812      // the other ranks anyway), the last NRemFileRank will each take
813      // (NFileRanksPerRank+1) file ranks.
814
815      int FirstFileRank = 0, LastFileRank = NFileRanksPerRank - 1;
816      for (int i = 1; i <= Rank; ++i) {
817        FirstFileRank = LastFileRank + 1;
818        LastFileRank  = FirstFileRank + NFileRanksPerRank - 1;
819
820        if (NRemFileRank && NRanks - i <= NRemFileRank)
821          ++LastFileRank;
822      }
823
824      for (int i = FirstFileRank; i <= LastFileRank; ++i)
825        SourceRanks.push_back(i);
826    }
827  }
828
829  HeaderSize = GH.HeaderSize;
830  Header.resize(HeaderSize + CRCSize, 0xFE /* poison */);
831  FH.get()->read(&Header[0], HeaderSize + CRCSize, 0, "header");
832
833  uint64_t CRC = crc64_omp(&Header[0], HeaderSize + CRCSize);
834  if (CRC != (uint64_t) -1) {
835    throw runtime_error("Header CRC check failed: " + LocalFileName);
836  }
837}
838
839// Note: Errors from this function should be recoverable. This means that if
840// one rank throws an exception, then all ranks should.
841void GenericIO::openAndReadHeader(MismatchBehavior MB, int EffRank, bool CheckPartMap) {
842  int NRanks, Rank;
843#ifndef GENERICIO_NO_MPI
844  MPI_Comm_rank(Comm, &Rank);
845  MPI_Comm_size(Comm, &NRanks);
846#else
847  Rank = 0;
848  NRanks = 1;
849#endif
850
851  if (EffRank == -1)
852    EffRank = MB == MismatchRedistribute ? 0 : Rank;
853
854  if (RankMap.empty() && CheckPartMap) {
855    // First, check to see if the file is a rank map.
856    unsigned long RanksInMap = 0;
857    if (Rank == 0) {
858      try {
859#ifndef GENERICIO_NO_MPI
860        GenericIO GIO(MPI_COMM_SELF, FileName, FileIOType);
861#else
862        GenericIO GIO(FileName, FileIOType);
863#endif
864        GIO.openAndReadHeader(MismatchDisallowed, 0, false);
865        RanksInMap = GIO.readNumElems();
866
867        RankMap.resize(RanksInMap + GIO.requestedExtraSpace()/sizeof(int));
868        GIO.addVariable("$partition", RankMap, true);
869
870        GIO.readData(0, false);
871        RankMap.resize(RanksInMap);
872      } catch (...) {
873        RankMap.clear();
874        RanksInMap = 0;
875      }
876    }
877
878#ifndef GENERICIO_NO_MPI
879    MPI_Bcast(&RanksInMap, 1, MPI_UNSIGNED_LONG, 0, Comm);
880    if (RanksInMap > 0) {
881      RankMap.resize(RanksInMap);
882      MPI_Bcast(&RankMap[0], RanksInMap, MPI_INT, 0, Comm);
883    }
884#endif
885  }
886
887#ifndef GENERICIO_NO_MPI
888  if (SplitComm != MPI_COMM_NULL)
889    MPI_Comm_free(&SplitComm);
890#endif
891
892  string LocalFileName;
893  if (RankMap.empty()) {
894    LocalFileName = FileName;
895#ifndef GENERICIO_NO_MPI
896    MPI_Comm_dup(MB == MismatchRedistribute ? MPI_COMM_SELF : Comm, &SplitComm);
897#endif
898  } else {
899    stringstream ss;
900    ss << FileName << "#" << RankMap[EffRank];
901    LocalFileName = ss.str();
902#ifndef GENERICIO_NO_MPI
903    if (MB == MismatchRedistribute) {
904      MPI_Comm_dup(MPI_COMM_SELF, &SplitComm);
905    } else {
906#ifdef __bgq__
907      MPI_Barrier(Comm);
908#endif
909      MPI_Comm_split(Comm, RankMap[EffRank], Rank, &SplitComm);
910    }
911#endif
912  }
913
914  if (LocalFileName == OpenFileName)
915    return;
916  FH.close();
917
918  int SplitNRanks, SplitRank;
919#ifndef GENERICIO_NO_MPI
920  MPI_Comm_rank(SplitComm, &SplitRank);
921  MPI_Comm_size(SplitComm, &SplitNRanks);
922#else
923  SplitRank = 0;
924  SplitNRanks = 1;
925#endif
926
927  uint64_t HeaderSize;
928  vector<char> Header;
929
930  if (SplitRank == 0) {
931#ifndef GENERICIO_NO_MPI
932    if (FileIOType == FileIOMPI)
933      FH.get() = new GenericFileIO_MPI(MPI_COMM_SELF);
934    else if (FileIOType == FileIOMPICollective)
935      FH.get() = new GenericFileIO_MPICollective(MPI_COMM_SELF);
936    else
937#endif
938      FH.get() = new GenericFileIO_POSIX();
939
940#ifndef GENERICIO_NO_MPI
941    char True = 1, False = 0;
942#endif
943
944    try {
945      FH.get()->open(LocalFileName, true);
946
947      GlobalHeader<false> GH; // endianness does not matter yet...
948      FH.get()->read(&GH, sizeof(GlobalHeader<false>), 0, "global header");
949
950      if (string(GH.Magic, GH.Magic + MagicSize - 1) == MagicLE) {
951        readHeaderLeader<false>(&GH, MB, NRanks, Rank, SplitNRanks, LocalFileName,
952                                HeaderSize, Header);
953      } else if (string(GH.Magic, GH.Magic + MagicSize - 1) == MagicBE) {
954        readHeaderLeader<true>(&GH, MB, NRanks, Rank, SplitNRanks, LocalFileName,
955                               HeaderSize, Header);
956      } else {
957        string Error = "invalid file-type identifier";
958        throw runtime_error("Won't read " + LocalFileName + ": " + Error);
959      }
960
961#ifndef GENERICIO_NO_MPI
962      close();
963      MPI_Bcast(&True, 1, MPI_BYTE, 0, SplitComm);
964#endif
965    } catch (...) {
966#ifndef GENERICIO_NO_MPI
967      MPI_Bcast(&False, 1, MPI_BYTE, 0, SplitComm);
968#endif
969      close();
970      throw;
971    }
972  } else {
973#ifndef GENERICIO_NO_MPI
974    char Okay;
975    MPI_Bcast(&Okay, 1, MPI_BYTE, 0, SplitComm);
976    if (!Okay)
977      throw runtime_error("Failure broadcast from rank 0");
978#endif
979  }
980
981#ifndef GENERICIO_NO_MPI
982  MPI_Bcast(&HeaderSize, 1, MPI_UINT64_T, 0, SplitComm);
983#endif
984
985  Header.resize(HeaderSize, 0xFD /* poison */);
986#ifndef GENERICIO_NO_MPI
987  MPI_Bcast(&Header[0], HeaderSize, MPI_BYTE, 0, SplitComm);
988#endif
989
990  FH.getHeaderCache().clear();
991
992  GlobalHeader<false> *GH = (GlobalHeader<false> *) &Header[0];
993  FH.setIsBigEndian(string(GH->Magic, GH->Magic + MagicSize - 1) == MagicBE);
994
995  FH.getHeaderCache().swap(Header);
996  OpenFileName = LocalFileName;
997
998#ifndef GENERICIO_NO_MPI
999  if (!DisableCollErrChecking)
1000    MPI_Barrier(Comm);
1001
1002  if (FileIOType == FileIOMPI)
1003    FH.get() = new GenericFileIO_MPI(SplitComm);
1004  else if (FileIOType == FileIOMPICollective)
1005    FH.get() = new GenericFileIO_MPICollective(SplitComm);
1006  else
1007    FH.get() = new GenericFileIO_POSIX();
1008
1009  int OpenErr = 0, TotOpenErr;
1010  try {
1011    FH.get()->open(LocalFileName, true);
1012    MPI_Allreduce(&OpenErr, &TotOpenErr, 1, MPI_INT, MPI_SUM,
1013                  DisableCollErrChecking ? MPI_COMM_SELF : Comm);
1014  } catch (...) {
1015    OpenErr = 1;
1016    MPI_Allreduce(&OpenErr, &TotOpenErr, 1, MPI_INT, MPI_SUM,
1017                  DisableCollErrChecking ? MPI_COMM_SELF : Comm);
1018    throw;
1019  }
1020
1021  if (TotOpenErr > 0) {
1022    stringstream ss;
1023    ss << TotOpenErr << " ranks failed to open file: " << LocalFileName;
1024    throw runtime_error(ss.str());
1025  }
1026#endif
1027}
1028
1029int GenericIO::readNRanks() {
1030  if (FH.isBigEndian())
1031    return readNRanks<true>();
1032  return readNRanks<false>();
1033}
1034
1035template <bool IsBigEndian>
1036int GenericIO::readNRanks() {
1037  if (RankMap.size())
1038    return RankMap.size();
1039
1040  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1041  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1042  return (int) GH->NRanks;
1043}
1044
1045void GenericIO::readDims(int Dims[3]) {
1046  if (FH.isBigEndian())
1047    readDims<true>(Dims);
1048  else
1049    readDims<false>(Dims);
1050}
1051
1052template <bool IsBigEndian>
1053void GenericIO::readDims(int Dims[3]) {
1054  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1055  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1056  std::copy(GH->Dims, GH->Dims + 3, Dims);
1057}
1058
1059uint64_t GenericIO::readTotalNumElems() {
1060  if (FH.isBigEndian())
1061    return readTotalNumElems<true>();
1062  return readTotalNumElems<false>();
1063}
1064
1065template <bool IsBigEndian>
1066uint64_t GenericIO::readTotalNumElems() {
1067  if (RankMap.size())
1068    return (uint64_t) -1;
1069
1070  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1071  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1072  return GH->NElems;
1073}
1074
1075void GenericIO::readPhysOrigin(double Origin[3]) {
1076  if (FH.isBigEndian())
1077    readPhysOrigin<true>(Origin);
1078  else
1079    readPhysOrigin<false>(Origin);
1080}
1081
1082// Define a "safe" version of offsetof (offsetof itself might not work for
1083// non-POD types, and at least xlC v12.1 will complain about this if you try).
1084#define offsetof_safe(S, F) (size_t(&(S)->F) - size_t(S))
1085
1086template <bool IsBigEndian>
1087void GenericIO::readPhysOrigin(double Origin[3]) {
1088  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1089  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1090  if (offsetof_safe(GH, PhysOrigin) >= GH->GlobalHeaderSize) {
1091    std::fill(Origin, Origin + 3, 0.0);
1092    return;
1093  }
1094
1095  std::copy(GH->PhysOrigin, GH->PhysOrigin + 3, Origin);
1096}
1097
1098void GenericIO::readPhysScale(double Scale[3]) {
1099  if (FH.isBigEndian())
1100    readPhysScale<true>(Scale);
1101  else
1102    readPhysScale<false>(Scale);
1103}
1104
1105template <bool IsBigEndian>
1106void GenericIO::readPhysScale(double Scale[3]) {
1107  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1108  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1109  if (offsetof_safe(GH, PhysScale) >= GH->GlobalHeaderSize) {
1110    std::fill(Scale, Scale + 3, 0.0);
1111    return;
1112  }
1113
1114  std::copy(GH->PhysScale, GH->PhysScale + 3, Scale);
1115}
1116
1117template <bool IsBigEndian>
1118static size_t getRankIndex(int EffRank, GlobalHeader<IsBigEndian> *GH,
1119                           vector<int> &RankMap, vector<char> &HeaderCache) {
1120  if (RankMap.empty())
1121    return EffRank;
1122
1123  for (size_t i = 0; i < GH->NRanks; ++i) {
1124    RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &HeaderCache[GH->RanksStart +
1125                                                 i*GH->RanksSize];
1126    if (offsetof_safe(RH, GlobalRank) >= GH->RanksSize)
1127      return EffRank;
1128
1129    if ((int) RH->GlobalRank == EffRank)
1130      return i;
1131  }
1132
1133  assert(false && "Index requested of an invalid rank");
1134  return (size_t) -1;
1135}
1136
1137int GenericIO::readGlobalRankNumber(int EffRank) {
1138  if (FH.isBigEndian())
1139    return readGlobalRankNumber<true>(EffRank);
1140  return readGlobalRankNumber<false>(EffRank);
1141}
1142
1143template <bool IsBigEndian>
1144int GenericIO::readGlobalRankNumber(int EffRank) {
1145  if (EffRank == -1) {
1146#ifndef GENERICIO_NO_MPI
1147    MPI_Comm_rank(Comm, &EffRank);
1148#else
1149    EffRank = 0;
1150#endif
1151  }
1152
1153  openAndReadHeader(MismatchAllowed, EffRank, false);
1154
1155  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1156
1157  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1158  size_t RankIndex = getRankIndex<IsBigEndian>(EffRank, GH, RankMap, FH.getHeaderCache());
1159
1160  assert(RankIndex < GH->NRanks && "Invalid rank specified");
1161
1162  RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->RanksStart +
1163                                               RankIndex*GH->RanksSize];
1164
1165  if (offsetof_safe(RH, GlobalRank) >= GH->RanksSize)
1166    return EffRank;
1167
1168  return (int) RH->GlobalRank;
1169}
1170
1171void GenericIO::getSourceRanks(vector<int> &SR) {
1172  SR.clear();
1173
1174  if (Redistributing) {
1175    std::copy(SourceRanks.begin(), SourceRanks.end(), std::back_inserter(SR));
1176    return;
1177  }
1178
1179  int Rank;
1180#ifndef GENERICIO_NO_MPI
1181  MPI_Comm_rank(Comm, &Rank);
1182#else
1183  Rank = 0;
1184#endif
1185
1186  SR.push_back(Rank);
1187}
1188
1189size_t GenericIO::readNumElems(int EffRank) {
1190  if (EffRank == -1 && Redistributing) {
1191    DisableCollErrChecking = true;
1192
1193    size_t TotalSize = 0;
1194    for (int i = 0, ie = SourceRanks.size(); i != ie; ++i)
1195      TotalSize += readNumElems(SourceRanks[i]);
1196
1197    DisableCollErrChecking = false;
1198    return TotalSize;
1199  }
1200
1201  if (FH.isBigEndian())
1202    return readNumElems<true>(EffRank);
1203  return readNumElems<false>(EffRank);
1204}
1205
1206template <bool IsBigEndian>
1207size_t GenericIO::readNumElems(int EffRank) {
1208  if (EffRank == -1) {
1209#ifndef GENERICIO_NO_MPI
1210    MPI_Comm_rank(Comm, &EffRank);
1211#else
1212    EffRank = 0;
1213#endif
1214  }
1215
1216  openAndReadHeader(Redistributing ? MismatchRedistribute : MismatchAllowed,
1217                    EffRank, false);
1218
1219  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1220
1221  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1222  size_t RankIndex = getRankIndex<IsBigEndian>(EffRank, GH, RankMap, FH.getHeaderCache());
1223
1224  assert(RankIndex < GH->NRanks && "Invalid rank specified");
1225
1226  RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->RanksStart +
1227                                               RankIndex*GH->RanksSize];
1228  return (size_t) RH->NElems;
1229}
1230
1231void GenericIO::readCoords(int Coords[3], int EffRank) {
1232  if (EffRank == -1 && Redistributing) {
1233    std::fill(Coords, Coords + 3, 0);
1234    return;
1235  }
1236
1237  if (FH.isBigEndian())
1238    readCoords<true>(Coords, EffRank);
1239  else
1240    readCoords<false>(Coords, EffRank);
1241}
1242
1243template <bool IsBigEndian>
1244void GenericIO::readCoords(int Coords[3], int EffRank) {
1245  if (EffRank == -1) {
1246#ifndef GENERICIO_NO_MPI
1247    MPI_Comm_rank(Comm, &EffRank);
1248#else
1249    EffRank = 0;
1250#endif
1251  }
1252
1253  openAndReadHeader(MismatchAllowed, EffRank, false);
1254
1255  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1256
1257  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1258  size_t RankIndex = getRankIndex<IsBigEndian>(EffRank, GH, RankMap, FH.getHeaderCache());
1259
1260  assert(RankIndex < GH->NRanks && "Invalid rank specified");
1261
1262  RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->RanksStart +
1263                                               RankIndex*GH->RanksSize];
1264
1265  std::copy(RH->Coords, RH->Coords + 3, Coords);
1266}
1267
1268void GenericIO::readData(int EffRank, bool PrintStats, bool CollStats) {
1269  int Rank;
1270#ifndef GENERICIO_NO_MPI
1271  MPI_Comm_rank(Comm, &Rank);
1272#else
1273  Rank = 0;
1274#endif
1275
1276  uint64_t TotalReadSize = 0;
1277#ifndef GENERICIO_NO_MPI
1278  double StartTime = MPI_Wtime();
1279#else
1280  double StartTime = double(clock())/CLOCKS_PER_SEC;
1281#endif
1282
1283  int NErrs[3] = { 0, 0, 0 };
1284
1285  if (EffRank == -1 && Redistributing) {
1286    DisableCollErrChecking = true;
1287
1288    size_t RowOffset = 0;
1289    for (int i = 0, ie = SourceRanks.size(); i != ie; ++i) {
1290      readData(SourceRanks[i], RowOffset, Rank, TotalReadSize, NErrs);
1291      RowOffset += readNumElems(SourceRanks[i]);
1292    }
1293
1294    DisableCollErrChecking = false;
1295  } else {
1296    readData(EffRank, 0, Rank, TotalReadSize, NErrs);
1297  }
1298
1299  int AllNErrs[3];
1300#ifndef GENERICIO_NO_MPI
1301  MPI_Allreduce(NErrs, AllNErrs, 3, MPI_INT, MPI_SUM, Comm);
1302#else
1303  AllNErrs[0] = NErrs[0]; AllNErrs[1] = NErrs[1]; AllNErrs[2] = NErrs[2];
1304#endif
1305
1306  if (AllNErrs[0] > 0 || AllNErrs[1] > 0 || AllNErrs[2] > 0) {
1307    stringstream ss;
1308    ss << "Experienced " << AllNErrs[0] << " I/O error(s), " <<
1309          AllNErrs[1] << " CRC error(s) and " << AllNErrs[2] <<
1310          " decompression CRC error(s) reading: " << OpenFileName;
1311    throw runtime_error(ss.str());
1312  }
1313
1314#ifndef GENERICIO_NO_MPI
1315  MPI_Barrier(Comm);
1316#endif
1317
1318#ifndef GENERICIO_NO_MPI
1319  double EndTime = MPI_Wtime();
1320#else
1321  double EndTime = double(clock())/CLOCKS_PER_SEC;
1322#endif
1323
1324  double TotalTime = EndTime - StartTime;
1325  double MaxTotalTime;
1326#ifndef GENERICIO_NO_MPI
1327  if (CollStats)
1328    MPI_Reduce(&TotalTime, &MaxTotalTime, 1, MPI_DOUBLE, MPI_MAX, 0, Comm);
1329  else
1330#endif
1331  MaxTotalTime = TotalTime;
1332
1333  uint64_t AllTotalReadSize;
1334#ifndef GENERICIO_NO_MPI
1335  if (CollStats)
1336    MPI_Reduce(&TotalReadSize, &AllTotalReadSize, 1, MPI_UINT64_T, MPI_SUM, 0, Comm);
1337  else
1338#endif
1339  AllTotalReadSize = TotalReadSize;
1340
1341  if (Rank == 0 && PrintStats) {
1342    double Rate = ((double) AllTotalReadSize) / MaxTotalTime / (1024.*1024.);
1343    std::cout << "Read " << Vars.size() << " variables from " << FileName <<
1344                 " (" << AllTotalReadSize << " bytes) in " << MaxTotalTime << "s: " <<
1345                 Rate << " MB/s [excluding header read]" << std::endl;
1346  }
1347}
1348
1349void GenericIO::readData(int EffRank, size_t RowOffset, int Rank,
1350                         uint64_t &TotalReadSize, int NErrs[3]) {
1351  if (FH.isBigEndian())
1352    readData<true>(EffRank, RowOffset, Rank, TotalReadSize, NErrs);
1353  else
1354    readData<false>(EffRank, RowOffset, Rank, TotalReadSize, NErrs);
1355}
1356
1357// Note: Errors from this function should be recoverable. This means that if
1358// one rank throws an exception, then all ranks should.
1359template <bool IsBigEndian>
1360void GenericIO::readData(int EffRank, size_t RowOffset, int Rank,
1361                         uint64_t &TotalReadSize, int NErrs[3]) {
1362  openAndReadHeader(Redistributing ? MismatchRedistribute : MismatchAllowed,
1363                    EffRank, false);
1364
1365  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1366
1367  if (EffRank == -1)
1368    EffRank = Rank;
1369
1370  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1371  size_t RankIndex = getRankIndex<IsBigEndian>(EffRank, GH, RankMap, FH.getHeaderCache());
1372
1373  assert(RankIndex < GH->NRanks && "Invalid rank specified");
1374
1375  RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->RanksStart +
1376                                               RankIndex*GH->RanksSize];
1377
1378  for (size_t i = 0; i < Vars.size(); ++i) {
1379    uint64_t Offset = RH->Start;
1380    bool VarFound = false;
1381    for (uint64_t j = 0; j < GH->NVars; ++j) {
1382      VariableHeader<IsBigEndian> *VH = (VariableHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->VarsStart +
1383                                                           j*GH->VarsSize];
1384
1385      string VName(VH->Name, VH->Name + NameSize);
1386      size_t VNameNull = VName.find('\0');
1387      if (VNameNull < NameSize)
1388        VName.resize(VNameNull);
1389
1390      uint64_t ReadSize = RH->NElems*VH->Size + CRCSize;
1391      if (VName != Vars[i].Name) {
1392        Offset += ReadSize;
1393        continue;
1394      }
1395
1396      size_t ElementSize = VH->Size;
1397      if (offsetof_safe(VH, ElementSize) < GH->VarsSize)
1398        ElementSize = VH->ElementSize;
1399
1400      VarFound = true;
1401      bool IsFloat = (bool) (VH->Flags & FloatValue),
1402           IsSigned = (bool) (VH->Flags & SignedValue);
1403      if (VH->Size != Vars[i].Size) {
1404        stringstream ss;
1405        ss << "Size mismatch for variable " << Vars[i].Name <<
1406              " in: " << OpenFileName << ": current: " << Vars[i].Size <<
1407              ", file: " << VH->Size;
1408        throw runtime_error(ss.str());
1409      } else if (ElementSize != Vars[i].ElementSize) {
1410        stringstream ss;
1411        ss << "Element size mismatch for variable " << Vars[i].Name <<
1412              " in: " << OpenFileName << ": current: " << Vars[i].ElementSize <<
1413              ", file: " << ElementSize;
1414        throw runtime_error(ss.str());
1415      } else if (IsFloat != Vars[i].IsFloat) {
1416        string Float("float"), Int("integer");
1417        stringstream ss;
1418        ss << "Type mismatch for variable " << Vars[i].Name <<
1419              " in: " << OpenFileName << ": current: " <<
1420              (Vars[i].IsFloat ? Float : Int) <<
1421              ", file: " << (IsFloat ? Float : Int);
1422        throw runtime_error(ss.str());
1423      } else if (IsSigned != Vars[i].IsSigned) {
1424        string Signed("signed"), Uns("unsigned");
1425        stringstream ss;
1426        ss << "Type mismatch for variable " << Vars[i].Name <<
1427              " in: " << OpenFileName << ": current: " <<
1428              (Vars[i].IsSigned ? Signed : Uns) <<
1429              ", file: " << (IsSigned ? Signed : Uns);
1430        throw runtime_error(ss.str());
1431      }
1432
1433      size_t VarOffset = RowOffset*Vars[i].Size;
1434      void *VarData = ((char *) Vars[i].Data) + VarOffset;
1435
1436      vector<unsigned char> LData;
1437      void *Data = VarData;
1438      bool HasExtraSpace = Vars[i].HasExtraSpace;
1439      if (offsetof_safe(GH, BlocksStart) < GH->GlobalHeaderSize &&
1440          GH->BlocksSize > 0) {
1441        BlockHeader<IsBigEndian> *BH = (BlockHeader<IsBigEndian> *)
1442          &FH.getHeaderCache()[GH->BlocksStart +
1443                               (RankIndex*GH->NVars + j)*GH->BlocksSize];
1444        ReadSize = BH->Size + CRCSize;
1445        Offset = BH->Start;
1446
1447        if (strncmp(BH->Filters[0], CompressName, FilterNameSize) == 0) {
1448          LData.resize(ReadSize);
1449          Data = &LData[0];
1450          HasExtraSpace = true;
1451        } else if (BH->Filters[0][0] != '\0') {
1452          stringstream ss;
1453          ss << "Unknown filter \"" << BH->Filters[0] << "\" on variable " << Vars[i].Name;
1454          throw runtime_error(ss.str());
1455        }
1456      }
1457
1458      assert(HasExtraSpace && "Extra space required for reading");
1459
1460      char CRCSave[CRCSize];
1461      char *CRCLoc = ((char *) Data) + ReadSize - CRCSize;
1462      if (HasExtraSpace)
1463        std::copy(CRCLoc, CRCLoc + CRCSize, CRCSave);
1464
1465      int Retry = 0;
1466      {
1467        int RetryCount = 300;
1468        const char *EnvStr = getenv("GENERICIO_RETRY_COUNT");
1469        if (EnvStr)
1470          RetryCount = atoi(EnvStr);
1471
1472        int RetrySleep = 100; // ms
1473        EnvStr = getenv("GENERICIO_RETRY_SLEEP");
1474        if (EnvStr)
1475          RetrySleep = atoi(EnvStr);
1476
1477        for (; Retry < RetryCount; ++Retry) {
1478          try {
1479            FH.get()->read(Data, ReadSize, Offset, Vars[i].Name);
1480            break;
1481          } catch (...) { }
1482
1483          usleep(1000*RetrySleep);
1484        }
1485
1486        if (Retry == RetryCount) {
1487          ++NErrs[0];
1488          break;
1489        } else if (Retry > 0) {
1490          EnvStr = getenv("GENERICIO_VERBOSE");
1491          if (EnvStr) {
1492            int Mod = atoi(EnvStr);
1493            if (Mod > 0) {
1494              int Rank;
1495#ifndef GENERICIO_NO_MPI
1496              MPI_Comm_rank(MPI_COMM_WORLD, &Rank);
1497#else
1498              Rank = 0;
1499#endif
1500
1501              std::cerr << "Rank " << Rank << ": " << Retry <<
1502                           " I/O retries were necessary for reading " <<
1503                           Vars[i].Name << " from: " << OpenFileName << "\n";
1504
1505              std::cerr.flush();
1506            }
1507          }
1508        }
1509      }
1510
1511      TotalReadSize += ReadSize;
1512
1513      uint64_t CRC = crc64_omp(Data, ReadSize);
1514      if (CRC != (uint64_t) -1) {
1515        ++NErrs[1];
1516
1517        int Rank;
1518#ifndef GENERICIO_NO_MPI
1519        MPI_Comm_rank(MPI_COMM_WORLD, &Rank);
1520#else
1521        Rank = 0;
1522#endif
1523
1524        // All ranks will do this and have a good time!
1525        string dn = "gio_crc_errors";
1526        mkdir(dn.c_str(), 0777);
1527
1528        srand(time(0));
1529        int DumpNum = rand();
1530        stringstream ssd;
1531        ssd << dn << "/gio_crc_error_dump." << Rank << "." << DumpNum << ".bin";
1532
1533        stringstream ss;
1534        ss << dn << "/gio_crc_error_log." << Rank << ".txt";
1535
1536        ofstream ofs(ss.str().c_str(), ofstream::out | ofstream::app);
1537        ofs << "On-Disk CRC Error Report:\n";
1538        ofs << "Variable: " << Vars[i].Name << "\n";
1539        ofs << "File: " << OpenFileName << "\n";
1540        ofs << "I/O Retries: " << Retry << "\n";
1541        ofs << "Size: " << ReadSize << " bytes\n";
1542        ofs << "Offset: " << Offset << " bytes\n";
1543        ofs << "CRC: " << CRC << " (expected is -1)\n";
1544        ofs << "Dump file: " << ssd.str() << "\n";
1545        ofs << "\n";
1546        ofs.close();
1547
1548        ofstream dofs(ssd.str().c_str(), ofstream::out);
1549        dofs.write((const char *) Data, ReadSize);
1550        dofs.close();
1551
1552        uint64_t RawCRC = crc64_omp(Data, ReadSize - CRCSize);
1553        unsigned char *UData = (unsigned char *) Data;
1554        crc64_invert(RawCRC, &UData[ReadSize - CRCSize]);
1555        uint64_t NewCRC = crc64_omp(Data, ReadSize);
1556        std::cerr << "Recalulated CRC: " << NewCRC << ((NewCRC == -1) ? "ok" : "bad") << "\n";
1557        break;
1558      }
1559
1560      if (HasExtraSpace)
1561        std::copy(CRCSave, CRCSave + CRCSize, CRCLoc);
1562
1563      if (LData.size()) {
1564        CompressHeader<IsBigEndian> *CH = (CompressHeader<IsBigEndian>*) &LData[0];
1565
1566#ifdef _OPENMP
1567#pragma omp master
1568  {
1569#endif
1570
1571       if (!blosc_initialized) {
1572         blosc_init();
1573         blosc_initialized = true;
1574       }
1575
1576#ifdef _OPENMP
1577       blosc_set_nthreads(omp_get_max_threads());
1578  }
1579#endif
1580
1581        blosc_decompress(&LData[0] + sizeof(CompressHeader<IsBigEndian>),
1582                         VarData, Vars[i].Size*RH->NElems);
1583
1584        if (CH->OrigCRC != crc64_omp(VarData, Vars[i].Size*RH->NElems)) {
1585          ++NErrs[2];
1586          break;
1587        }
1588      }
1589
1590      // Byte swap the data if necessary.
1591      if (IsBigEndian != isBigEndian())
1592        for (size_t j = 0;
1593             j < RH->NElems*(Vars[i].Size/Vars[i].ElementSize); ++j) {
1594          char *Offset = ((char *) VarData) + j*Vars[i].ElementSize;
1595          bswap(Offset, Vars[i].ElementSize);
1596        }
1597
1598      break;
1599    }
1600
1601    if (!VarFound)
1602      throw runtime_error("Variable " + Vars[i].Name +
1603                          " not found in: " + OpenFileName);
1604
1605    // This is for debugging.
1606    if (NErrs[0] || NErrs[1] || NErrs[2]) {
1607      const char *EnvStr = getenv("GENERICIO_VERBOSE");
1608      if (EnvStr) {
1609        int Mod = atoi(EnvStr);
1610        if (Mod > 0) {
1611          int Rank;
1612#ifndef GENERICIO_NO_MPI
1613          MPI_Comm_rank(MPI_COMM_WORLD, &Rank);
1614#else
1615          Rank = 0;
1616#endif
1617
1618          std::cerr << "Rank " << Rank << ": " << NErrs[0] << " I/O error(s), " <<
1619          NErrs[1] << " CRC error(s) and " << NErrs[2] <<
1620          " decompression CRC error(s) reading: " << Vars[i].Name <<
1621          " from: " << OpenFileName << "\n";
1622
1623          std::cerr.flush();
1624        }
1625      }
1626    }
1627
1628    if (NErrs[0] || NErrs[1] || NErrs[2])
1629      break;
1630  }
1631}
1632
1633void GenericIO::getVariableInfo(vector<VariableInfo> &VI) {
1634  if (FH.isBigEndian())
1635    getVariableInfo<true>(VI);
1636  else
1637    getVariableInfo<false>(VI);
1638}
1639
1640template <bool IsBigEndian>
1641void GenericIO::getVariableInfo(vector<VariableInfo> &VI) {
1642  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1643
1644  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1645  for (uint64_t j = 0; j < GH->NVars; ++j) {
1646    VariableHeader<IsBigEndian> *VH = (VariableHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->VarsStart +
1647                                                         j*GH->VarsSize];
1648
1649    string VName(VH->Name, VH->Name + NameSize);
1650    size_t VNameNull = VName.find('\0');
1651    if (VNameNull < NameSize)
1652      VName.resize(VNameNull);
1653
1654    size_t ElementSize = VH->Size;
1655    if (offsetof_safe(VH, ElementSize) < GH->VarsSize)
1656      ElementSize = VH->ElementSize;
1657
1658    bool IsFloat = (bool) (VH->Flags & FloatValue),
1659         IsSigned = (bool) (VH->Flags & SignedValue),
1660         IsPhysCoordX = (bool) (VH->Flags & ValueIsPhysCoordX),
1661         IsPhysCoordY = (bool) (VH->Flags & ValueIsPhysCoordY),
1662         IsPhysCoordZ = (bool) (VH->Flags & ValueIsPhysCoordZ),
1663         MaybePhysGhost = (bool) (VH->Flags & ValueMaybePhysGhost);
1664    VI.push_back(VariableInfo(VName, (size_t) VH->Size, IsFloat, IsSigned,
1665                              IsPhysCoordX, IsPhysCoordY, IsPhysCoordZ,
1666                              MaybePhysGhost, ElementSize));
1667  }
1668}
1669
1670void GenericIO::setNaturalDefaultPartition() {
1671#ifdef __bgq__
1672  DefaultPartition = MPIX_IO_link_id();
1673#else
1674#ifndef GENERICIO_NO_MPI
1675  bool UseName = true;
1676  const char *EnvStr = getenv("GENERICIO_PARTITIONS_USE_NAME");
1677  if (EnvStr) {
1678    int Mod = atoi(EnvStr);
1679    UseName = (Mod != 0);
1680  }
1681
1682  if (UseName) {
1683    // This is a heuristic to generate ~256 partitions based on the
1684    // names of the nodes.
1685    char Name[MPI_MAX_PROCESSOR_NAME];
1686    int Len = 0;
1687
1688    MPI_Get_processor_name(Name, &Len);
1689    unsigned char color = 0;
1690    for (int i = 0; i < Len; ++i)
1691      color += (unsigned char) Name[i];
1692
1693    DefaultPartition = color;
1694  }
1695
1696  // This is for debugging.
1697  EnvStr = getenv("GENERICIO_RANK_PARTITIONS");
1698  if (EnvStr) {
1699    int Mod = atoi(EnvStr);
1700    if (Mod > 0) {
1701      int Rank;
1702      MPI_Comm_rank(MPI_COMM_WORLD, &Rank);
1703      DefaultPartition += Rank % Mod;
1704    }
1705  }
1706#endif
1707#endif
1708}
1709
1710} /* END namespace cosmotk */
Note: See TracBrowser for help on using the repository browser.