source: GenericIO.cxx @ 5d57155

Revision 5d57155, 51.9 KB checked in by Hal Finkel <hfinkel@…>, 7 years ago (diff)

Add support for float4 (and other homogeneous aggregates)

  • Property mode set to 100644
Line 
1/*
2 *                    Copyright (C) 2015, UChicago Argonne, LLC
3 *                               All Rights Reserved
4 *
5 *                               Generic IO (ANL-15-066)
6 *                     Hal Finkel, Argonne National Laboratory
7 *
8 *                              OPEN SOURCE LICENSE
9 *
10 * Under the terms of Contract No. DE-AC02-06CH11357 with UChicago Argonne,
11 * LLC, the U.S. Government retains certain rights in this software.
12 *
13 * Redistribution and use in source and binary forms, with or without
14 * modification, are permitted provided that the following conditions are met:
15 *
16 *   1. Redistributions of source code must retain the above copyright notice,
17 *      this list of conditions and the following disclaimer.
18 *
19 *   2. Redistributions in binary form must reproduce the above copyright
20 *      notice, this list of conditions and the following disclaimer in the
21 *      documentation and/or other materials provided with the distribution.
22 *
23 *   3. Neither the names of UChicago Argonne, LLC or the Department of Energy
24 *      nor the names of its contributors may be used to endorse or promote
25 *      products derived from this software without specific prior written
26 *      permission.
27 *
28 * *****************************************************************************
29 *
30 *                                  DISCLAIMER
31 * THE SOFTWARE IS SUPPLIED “AS IS” WITHOUT WARRANTY OF ANY KIND.  NEITHER THE
32 * UNTED STATES GOVERNMENT, NOR THE UNITED STATES DEPARTMENT OF ENERGY, NOR
33 * UCHICAGO ARGONNE, LLC, NOR ANY OF THEIR EMPLOYEES, MAKES ANY WARRANTY,
34 * EXPRESS OR IMPLIED, OR ASSUMES ANY LEGAL LIABILITY OR RESPONSIBILITY FOR THE
35 * ACCURACY, COMPLETENESS, OR USEFULNESS OF ANY INFORMATION, DATA, APPARATUS,
36 * PRODUCT, OR PROCESS DISCLOSED, OR REPRESENTS THAT ITS USE WOULD NOT INFRINGE
37 * PRIVATELY OWNED RIGHTS.
38 *
39 * *****************************************************************************
40 */
41
42#define _XOPEN_SOURCE 600
43#include "CRC64.h"
44#include "GenericIO.h"
45
46extern "C" {
47#include "blosc.h"
48}
49
50#include <sstream>
51#include <fstream>
52#include <stdexcept>
53#include <iterator>
54#include <algorithm>
55#include <cassert>
56#include <cstddef>
57#include <cstring>
58
59#ifndef GENERICIO_NO_MPI
60#include <ctime>
61#endif
62
63#include <sys/types.h>
64#include <sys/stat.h>
65#include <fcntl.h>
66#include <errno.h>
67
68#ifdef __bgq__
69#include <mpix.h>
70#endif
71
72#ifndef MPI_UINT64_T
73#define MPI_UINT64_T (sizeof(long) == 8 ? MPI_LONG : MPI_LONG_LONG)
74#endif
75
76using namespace std;
77
78namespace gio {
79
80
81#ifndef GENERICIO_NO_MPI
82GenericFileIO_MPI::~GenericFileIO_MPI() {
83  (void) MPI_File_close(&FH);
84}
85
86void GenericFileIO_MPI::open(const std::string &FN, bool ForReading) {
87  FileName = FN;
88
89  int amode = ForReading ? MPI_MODE_RDONLY : (MPI_MODE_WRONLY | MPI_MODE_CREATE);
90  if (MPI_File_open(Comm, const_cast<char *>(FileName.c_str()), amode,
91                    MPI_INFO_NULL, &FH) != MPI_SUCCESS)
92    throw runtime_error((!ForReading ? "Unable to create the file: " :
93                                       "Unable to open the file: ") +
94                        FileName);
95}
96
97void GenericFileIO_MPI::setSize(size_t sz) {
98  if (MPI_File_set_size(FH, sz) != MPI_SUCCESS)
99    throw runtime_error("Unable to set size for file: " + FileName);
100}
101
102void GenericFileIO_MPI::read(void *buf, size_t count, off_t offset,
103                             const std::string &D) {
104  while (count > 0) {
105    MPI_Status status;
106    if (MPI_File_read_at(FH, offset, buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
107      throw runtime_error("Unable to read " + D + " from file: " + FileName);
108
109    int scount;
110    (void) MPI_Get_count(&status, MPI_BYTE, &scount);
111
112    count -= scount;
113    buf = ((char *) buf) + scount;
114    offset += scount;
115  }
116}
117
118void GenericFileIO_MPI::write(const void *buf, size_t count, off_t offset,
119                              const std::string &D) {
120  while (count > 0) {
121    MPI_Status status;
122    if (MPI_File_write_at(FH, offset, (void *) buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
123      throw runtime_error("Unable to write " + D + " to file: " + FileName);
124
125    int scount;
126    (void) MPI_Get_count(&status, MPI_BYTE, &scount);
127
128    count -= scount;
129    buf = ((char *) buf) + scount;
130    offset += scount;
131  }
132}
133
134void GenericFileIO_MPICollective::read(void *buf, size_t count, off_t offset,
135                             const std::string &D) {
136  int Continue = 0;
137
138  do {
139    MPI_Status status;
140    if (MPI_File_read_at_all(FH, offset, buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
141      throw runtime_error("Unable to read " + D + " from file: " + FileName);
142
143    int scount;
144    (void) MPI_Get_count(&status, MPI_BYTE, &scount);
145
146    count -= scount;
147    buf = ((char *) buf) + scount;
148    offset += scount;
149
150    int NeedContinue = (count > 0);
151    MPI_Allreduce(&NeedContinue, &Continue, 1, MPI_INT, MPI_SUM, Comm);
152  } while (Continue);
153}
154
155void GenericFileIO_MPICollective::write(const void *buf, size_t count, off_t offset,
156                              const std::string &D) {
157  int Continue = 0;
158
159  do {
160    MPI_Status status;
161    if (MPI_File_write_at_all(FH, offset, (void *) buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
162      throw runtime_error("Unable to write " + D + " to file: " + FileName);
163
164    int scount;
165    (void) MPI_Get_count(&status, MPI_BYTE, &scount);
166
167    count -= scount;
168    buf = ((char *) buf) + scount;
169    offset += scount;
170
171    int NeedContinue = (count > 0);
172    MPI_Allreduce(&NeedContinue, &Continue, 1, MPI_INT, MPI_SUM, Comm);
173  } while (Continue);
174}
175#endif
176
177GenericFileIO_POSIX::~GenericFileIO_POSIX() {
178  if (FH != -1) close(FH);
179}
180
181void GenericFileIO_POSIX::open(const std::string &FN, bool ForReading) {
182  FileName = FN;
183
184  int flags = ForReading ? O_RDONLY : (O_WRONLY | O_CREAT);
185  int mode = S_IRUSR | S_IWUSR | S_IRGRP;
186  errno = 0;
187  if ((FH = ::open(FileName.c_str(), flags, mode)) == -1)
188    throw runtime_error((!ForReading ? "Unable to create the file: " :
189                                       "Unable to open the file: ") +
190                        FileName + ": " + strerror(errno));
191}
192
193void GenericFileIO_POSIX::setSize(size_t sz) {
194  if (ftruncate(FH, sz) == -1)
195    throw runtime_error("Unable to set size for file: " + FileName);
196}
197
198void GenericFileIO_POSIX::read(void *buf, size_t count, off_t offset,
199                               const std::string &D) {
200  while (count > 0) {
201    ssize_t scount;
202    errno = 0;
203    if ((scount = pread(FH, buf, count, offset)) == -1) {
204      if (errno == EINTR)
205        continue;
206
207      throw runtime_error("Unable to read " + D + " from file: " + FileName +
208                          ": " + strerror(errno));
209    }
210
211    count -= scount;
212    buf = ((char *) buf) + scount;
213    offset += scount;
214  }
215}
216
217void GenericFileIO_POSIX::write(const void *buf, size_t count, off_t offset,
218                                const std::string &D) {
219  while (count > 0) {
220    ssize_t scount;
221    errno = 0;
222    if ((scount = pwrite(FH, buf, count, offset)) == -1) {
223      if (errno == EINTR)
224        continue;
225
226      throw runtime_error("Unable to write " + D + " to file: " + FileName +
227                          ": " + strerror(errno));
228    }
229
230    count -= scount;
231    buf = ((char *) buf) + scount;
232    offset += scount;
233  }
234}
235
236static bool isBigEndian() {
237  const uint32_t one = 1;
238  return !(*((char *)(&one)));
239}
240
241static void bswap(void *v, size_t s) {
242  char *p = (char *) v;
243  for (size_t i = 0; i < s/2; ++i)
244    std::swap(p[i], p[s - (i+1)]);
245}
246
247// Using #pragma pack here, instead of __attribute__((packed)) because xlc, at
248// least as of v12.1, won't take __attribute__((packed)) on non-POD and/or
249// templated types.
250#pragma pack(1)
251
252template <typename T, bool IsBigEndian>
253struct endian_specific_value {
254  operator T() const {
255    T rvalue = value;
256    if (IsBigEndian != isBigEndian())
257      bswap(&rvalue, sizeof(T));
258
259    return rvalue;
260  };
261
262  endian_specific_value &operator = (T nvalue) {
263    if (IsBigEndian != isBigEndian())
264      bswap(&nvalue, sizeof(T));
265
266    value = nvalue;
267    return *this;
268  }
269
270  endian_specific_value &operator += (T nvalue) {
271    *this = *this + nvalue;
272    return *this;
273  }
274
275  endian_specific_value &operator -= (T nvalue) {
276    *this = *this - nvalue;
277    return *this;
278  }
279
280private:
281  T value;
282};
283
284static const size_t CRCSize = 8;
285
286static const size_t MagicSize = 8;
287static const char *MagicBE = "HACC01B";
288static const char *MagicLE = "HACC01L";
289
290template <bool IsBigEndian>
291struct GlobalHeader {
292  char Magic[MagicSize];
293  endian_specific_value<uint64_t, IsBigEndian> HeaderSize;
294  endian_specific_value<uint64_t, IsBigEndian> NElems; // The global total
295  endian_specific_value<uint64_t, IsBigEndian> Dims[3];
296  endian_specific_value<uint64_t, IsBigEndian> NVars;
297  endian_specific_value<uint64_t, IsBigEndian> VarsSize;
298  endian_specific_value<uint64_t, IsBigEndian> VarsStart;
299  endian_specific_value<uint64_t, IsBigEndian> NRanks;
300  endian_specific_value<uint64_t, IsBigEndian> RanksSize;
301  endian_specific_value<uint64_t, IsBigEndian> RanksStart;
302  endian_specific_value<uint64_t, IsBigEndian> GlobalHeaderSize;
303  endian_specific_value<double,   IsBigEndian> PhysOrigin[3];
304  endian_specific_value<double,   IsBigEndian> PhysScale[3];
305  endian_specific_value<uint64_t, IsBigEndian> BlocksSize;
306  endian_specific_value<uint64_t, IsBigEndian> BlocksStart;
307};
308
309enum {
310  FloatValue          = (1 << 0),
311  SignedValue         = (1 << 1),
312  ValueIsPhysCoordX   = (1 << 2),
313  ValueIsPhysCoordY   = (1 << 3),
314  ValueIsPhysCoordZ   = (1 << 4),
315  ValueMaybePhysGhost = (1 << 5)
316};
317
318static const size_t NameSize = 256;
319template <bool IsBigEndian>
320struct VariableHeader {
321  char Name[NameSize];
322  endian_specific_value<uint64_t, IsBigEndian> Flags;
323  endian_specific_value<uint64_t, IsBigEndian> Size;
324  endian_specific_value<uint64_t, IsBigEndian> ElementSize;
325};
326
327template <bool IsBigEndian>
328struct RankHeader {
329  endian_specific_value<uint64_t, IsBigEndian> Coords[3];
330  endian_specific_value<uint64_t, IsBigEndian> NElems;
331  endian_specific_value<uint64_t, IsBigEndian> Start;
332  endian_specific_value<uint64_t, IsBigEndian> GlobalRank;
333};
334
335static const size_t FilterNameSize = 8;
336static const size_t MaxFilters = 4;
337template <bool IsBigEndian>
338struct BlockHeader {
339  char Filters[MaxFilters][FilterNameSize];
340  endian_specific_value<uint64_t, IsBigEndian> Start;
341  endian_specific_value<uint64_t, IsBigEndian> Size;
342};
343
344template <bool IsBigEndian>
345struct CompressHeader {
346  endian_specific_value<uint64_t, IsBigEndian> OrigCRC;
347};
348const char *CompressName = "BLOSC";
349
350#pragma pack()
351
352unsigned GenericIO::DefaultFileIOType = FileIOPOSIX;
353int GenericIO::DefaultPartition = 0;
354bool GenericIO::DefaultShouldCompress = false;
355
356#ifndef GENERICIO_NO_MPI
357std::size_t GenericIO::CollectiveMPIIOThreshold = 0;
358#endif
359
360static bool blosc_initialized = false;
361
362#ifndef GENERICIO_NO_MPI
363void GenericIO::write() {
364  if (isBigEndian())
365    write<true>();
366  else
367    write<false>();
368}
369
370// Note: writing errors are not currently recoverable (one rank may fail
371// while the others don't).
372template <bool IsBigEndian>
373void GenericIO::write() {
374  const char *Magic = IsBigEndian ? MagicBE : MagicLE;
375
376  uint64_t FileSize = 0;
377
378  int NRanks, Rank;
379  MPI_Comm_rank(Comm, &Rank);
380  MPI_Comm_size(Comm, &NRanks);
381
382#ifdef __bgq__
383  MPI_Barrier(Comm);
384#endif
385  MPI_Comm_split(Comm, Partition, Rank, &SplitComm);
386
387  int SplitNRanks, SplitRank;
388  MPI_Comm_rank(SplitComm, &SplitRank);
389  MPI_Comm_size(SplitComm, &SplitNRanks);
390
391  string LocalFileName;
392  if (SplitNRanks != NRanks) {
393    if (Rank == 0) {
394      // In split mode, the specified file becomes the rank map, and the real
395      // data is partitioned.
396
397      vector<int> MapRank, MapPartition;
398      MapRank.resize(NRanks);
399      for (int i = 0; i < NRanks; ++i) MapRank[i] = i;
400
401      MapPartition.resize(NRanks);
402      MPI_Gather(&Partition, 1, MPI_INT, &MapPartition[0], 1, MPI_INT, 0, Comm);
403
404      GenericIO GIO(MPI_COMM_SELF, FileName, FileIOType);
405      GIO.setNumElems(NRanks);
406      GIO.addVariable("$rank", MapRank); /* this is for use by humans; the reading
407                                            code assumes that the partitions are in
408                                            rank order */
409      GIO.addVariable("$partition", MapPartition);
410
411      vector<int> CX, CY, CZ;
412      int TopoStatus;
413      MPI_Topo_test(Comm, &TopoStatus);
414      if (TopoStatus == MPI_CART) {
415        CX.resize(NRanks);
416        CY.resize(NRanks);
417        CZ.resize(NRanks);
418
419        for (int i = 0; i < NRanks; ++i) {
420          int C[3];
421          MPI_Cart_coords(Comm, i, 3, C);
422
423          CX[i] = C[0];
424          CY[i] = C[1];
425          CZ[i] = C[2];
426        }
427
428        GIO.addVariable("$x", CX);
429        GIO.addVariable("$y", CY);
430        GIO.addVariable("$z", CZ);
431      }
432
433      GIO.write();
434    } else {
435      MPI_Gather(&Partition, 1, MPI_INT, 0, 0, MPI_INT, 0, Comm);
436    }
437
438    stringstream ss;
439    ss << FileName << "#" << Partition;
440    LocalFileName = ss.str();
441  } else {
442    LocalFileName = FileName;
443  }
444
445  RankHeader<IsBigEndian> RHLocal;
446  int Dims[3], Periods[3], Coords[3];
447
448  int TopoStatus;
449  MPI_Topo_test(Comm, &TopoStatus);
450  if (TopoStatus == MPI_CART) {
451    MPI_Cart_get(Comm, 3, Dims, Periods, Coords);
452  } else {
453    Dims[0] = NRanks;
454    std::fill(Dims + 1, Dims + 3, 1);
455    std::fill(Periods, Periods + 3, 0);
456    Coords[0] = Rank;
457    std::fill(Coords + 1, Coords + 3, 0);
458  }
459
460  std::copy(Coords, Coords + 3, RHLocal.Coords);
461  RHLocal.NElems = NElems;
462  RHLocal.Start = 0;
463  RHLocal.GlobalRank = Rank;
464
465  bool ShouldCompress = DefaultShouldCompress;
466  const char *EnvStr = getenv("GENERICIO_COMPRESS");
467  if (EnvStr) {
468    int Mod = atoi(EnvStr);
469    ShouldCompress = (Mod > 0);
470  }
471
472  bool NeedsBlockHeaders = ShouldCompress;
473  EnvStr = getenv("GENERICIO_FORCE_BLOCKS");
474  if (!NeedsBlockHeaders && EnvStr) {
475    int Mod = atoi(EnvStr);
476    NeedsBlockHeaders = (Mod > 0);
477  }
478
479  vector<BlockHeader<IsBigEndian> > LocalBlockHeaders;
480  vector<void *> LocalData;
481  vector<bool> LocalHasExtraSpace;
482  vector<vector<unsigned char> > LocalCData;
483  if (NeedsBlockHeaders) {
484    LocalBlockHeaders.resize(Vars.size());
485    LocalData.resize(Vars.size());
486    LocalHasExtraSpace.resize(Vars.size());
487    if (ShouldCompress)
488      LocalCData.resize(Vars.size());
489
490    for (size_t i = 0; i < Vars.size(); ++i) {
491      // Filters null by default, leave null starting address (needs to be
492      // calculated by the header-writing rank).
493      memset(&LocalBlockHeaders[i], 0, sizeof(BlockHeader<IsBigEndian>));
494      if (ShouldCompress) {
495        LocalCData[i].resize(sizeof(CompressHeader<IsBigEndian>));
496
497        CompressHeader<IsBigEndian> *CH = (CompressHeader<IsBigEndian>*) &LocalCData[i][0];
498        CH->OrigCRC = crc64_omp(Vars[i].Data, Vars[i].Size*NElems);
499
500#ifdef _OPENMP
501#pragma omp master
502  {
503#endif
504
505       if (!blosc_initialized) {
506         blosc_init();
507         blosc_initialized = true;
508       }
509
510#ifdef _OPENMP
511       blosc_set_nthreads(omp_get_max_threads());
512  }
513#endif
514
515        LocalCData[i].resize(LocalCData[i].size() + NElems*Vars[i].Size);
516        if (blosc_compress(9, 1, Vars[i].Size, NElems*Vars[i].Size, Vars[i].Data,
517                           &LocalCData[i][0] + sizeof(CompressHeader<IsBigEndian>),
518                           NElems*Vars[i].Size) <= 0)
519          goto nocomp;
520
521        strncpy(LocalBlockHeaders[i].Filters[0], CompressName, FilterNameSize);
522        size_t CNBytes, CCBytes, CBlockSize;
523        blosc_cbuffer_sizes(&LocalCData[i][0] + sizeof(CompressHeader<IsBigEndian>),
524                            &CNBytes, &CCBytes, &CBlockSize);
525        LocalCData[i].resize(CCBytes + sizeof(CompressHeader<IsBigEndian>));
526
527        LocalBlockHeaders[i].Size = LocalCData[i].size();
528        LocalCData[i].resize(LocalCData[i].size() + CRCSize);
529        LocalData[i] = &LocalCData[i][0];
530        LocalHasExtraSpace[i] = true;
531      } else {
532nocomp:
533        LocalBlockHeaders[i].Size = NElems*Vars[i].Size;
534        LocalData[i] = Vars[i].Data;
535        LocalHasExtraSpace[i] = Vars[i].HasExtraSpace;
536      }
537    }
538  }
539
540  double StartTime = MPI_Wtime();
541
542  if (SplitRank == 0) {
543    uint64_t HeaderSize = sizeof(GlobalHeader<IsBigEndian>) + Vars.size()*sizeof(VariableHeader<IsBigEndian>) +
544                          SplitNRanks*sizeof(RankHeader<IsBigEndian>) + CRCSize;
545    if (NeedsBlockHeaders)
546      HeaderSize += SplitNRanks*Vars.size()*sizeof(BlockHeader<IsBigEndian>);
547
548    vector<char> Header(HeaderSize, 0);
549    GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &Header[0];
550    std::copy(Magic, Magic + MagicSize, GH->Magic);
551    GH->HeaderSize = HeaderSize - CRCSize;
552    GH->NElems = NElems; // This will be updated later
553    std::copy(Dims, Dims + 3, GH->Dims);
554    GH->NVars = Vars.size();
555    GH->VarsSize = sizeof(VariableHeader<IsBigEndian>);
556    GH->VarsStart = sizeof(GlobalHeader<IsBigEndian>);
557    GH->NRanks = SplitNRanks;
558    GH->RanksSize = sizeof(RankHeader<IsBigEndian>);
559    GH->RanksStart = GH->VarsStart + Vars.size()*sizeof(VariableHeader<IsBigEndian>);
560    GH->GlobalHeaderSize = sizeof(GlobalHeader<IsBigEndian>);
561    std::copy(PhysOrigin, PhysOrigin + 3, GH->PhysOrigin);
562    std::copy(PhysScale,  PhysScale  + 3, GH->PhysScale);
563    if (!NeedsBlockHeaders) {
564      GH->BlocksSize = GH->BlocksStart = 0;
565    } else {
566      GH->BlocksSize = sizeof(BlockHeader<IsBigEndian>);
567      GH->BlocksStart = GH->RanksStart + SplitNRanks*sizeof(RankHeader<IsBigEndian>);
568    }
569
570    uint64_t RecordSize = 0;
571    VariableHeader<IsBigEndian> *VH = (VariableHeader<IsBigEndian> *) &Header[GH->VarsStart];
572    for (size_t i = 0; i < Vars.size(); ++i, ++VH) {
573      string VName(Vars[i].Name);
574      VName.resize(NameSize);
575
576      std::copy(VName.begin(), VName.end(), VH->Name);
577      uint64_t VFlags = 0;
578      if (Vars[i].IsFloat)  VFlags |= FloatValue;
579      if (Vars[i].IsSigned) VFlags |= SignedValue;
580      if (Vars[i].IsPhysCoordX) VFlags |= ValueIsPhysCoordX;
581      if (Vars[i].IsPhysCoordY) VFlags |= ValueIsPhysCoordY;
582      if (Vars[i].IsPhysCoordZ) VFlags |= ValueIsPhysCoordZ;
583      if (Vars[i].MaybePhysGhost) VFlags |= ValueMaybePhysGhost;
584      VH->Flags = VFlags;
585      RecordSize += VH->Size = Vars[i].Size;
586      VH->ElementSize = Vars[i].ElementSize;
587    }
588
589    MPI_Gather(&RHLocal, sizeof(RHLocal), MPI_BYTE,
590               &Header[GH->RanksStart], sizeof(RHLocal),
591               MPI_BYTE, 0, SplitComm);
592
593    if (NeedsBlockHeaders) {
594      MPI_Gather(&LocalBlockHeaders[0],
595                 Vars.size()*sizeof(BlockHeader<IsBigEndian>), MPI_BYTE,
596                 &Header[GH->BlocksStart],
597                 Vars.size()*sizeof(BlockHeader<IsBigEndian>), MPI_BYTE,
598                 0, SplitComm);
599
600      BlockHeader<IsBigEndian> *BH = (BlockHeader<IsBigEndian> *) &Header[GH->BlocksStart];
601      for (int i = 0; i < SplitNRanks; ++i)
602      for (size_t j = 0; j < Vars.size(); ++j, ++BH) {
603        if (i == 0 && j == 0)
604          BH->Start = HeaderSize;
605        else
606          BH->Start = BH[-1].Start + BH[-1].Size + CRCSize;
607      }
608
609      RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &Header[GH->RanksStart];
610      RH->Start = HeaderSize; ++RH;
611      for (int i = 1; i < SplitNRanks; ++i, ++RH) {
612        RH->Start =
613          ((BlockHeader<IsBigEndian> *) &Header[GH->BlocksStart])[i*Vars.size()].Start;
614        GH->NElems += RH->NElems;
615      }
616
617      // Compute the total file size.
618      uint64_t LastData = BH[-1].Size + CRCSize;
619      FileSize = BH[-1].Start + LastData;
620    } else {
621      RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &Header[GH->RanksStart];
622      RH->Start = HeaderSize; ++RH;
623      for (int i = 1; i < SplitNRanks; ++i, ++RH) {
624        uint64_t PrevNElems = RH[-1].NElems;
625        uint64_t PrevData = PrevNElems*RecordSize + CRCSize*Vars.size();
626        RH->Start = RH[-1].Start + PrevData;
627        GH->NElems += RH->NElems;
628      }
629
630      // Compute the total file size.
631      uint64_t LastNElems = RH[-1].NElems;
632      uint64_t LastData = LastNElems*RecordSize + CRCSize*Vars.size();
633      FileSize = RH[-1].Start + LastData;
634    }
635
636    // Now that the starting offset has been computed, send it back to each rank.
637    MPI_Scatter(&Header[GH->RanksStart], sizeof(RHLocal),
638                MPI_BYTE, &RHLocal, sizeof(RHLocal),
639                MPI_BYTE, 0, SplitComm);
640
641    if (NeedsBlockHeaders)
642      MPI_Scatter(&Header[GH->BlocksStart],
643                  sizeof(BlockHeader<IsBigEndian>)*Vars.size(), MPI_BYTE,
644                  &LocalBlockHeaders[0],
645                  sizeof(BlockHeader<IsBigEndian>)*Vars.size(), MPI_BYTE,
646                  0, SplitComm);
647
648    uint64_t HeaderCRC = crc64_omp(&Header[0], HeaderSize - CRCSize);
649    crc64_invert(HeaderCRC, &Header[HeaderSize - CRCSize]);
650
651    if (FileIOType == FileIOMPI)
652      FH.get() = new GenericFileIO_MPI(MPI_COMM_SELF);
653    else if (FileIOType == FileIOMPICollective)
654      FH.get() = new GenericFileIO_MPICollective(MPI_COMM_SELF);
655    else
656      FH.get() = new GenericFileIO_POSIX();
657
658    FH.get()->open(LocalFileName);
659    FH.get()->setSize(FileSize);
660    FH.get()->write(&Header[0], HeaderSize, 0, "header");
661
662    close();
663  } else {
664    MPI_Gather(&RHLocal, sizeof(RHLocal), MPI_BYTE, 0, 0, MPI_BYTE, 0, SplitComm);
665    if (NeedsBlockHeaders)
666      MPI_Gather(&LocalBlockHeaders[0], Vars.size()*sizeof(BlockHeader<IsBigEndian>),
667                 MPI_BYTE, 0, 0, MPI_BYTE, 0, SplitComm);
668    MPI_Scatter(0, 0, MPI_BYTE, &RHLocal, sizeof(RHLocal), MPI_BYTE, 0, SplitComm);
669    if (NeedsBlockHeaders)
670      MPI_Scatter(0, 0, MPI_BYTE, &LocalBlockHeaders[0], sizeof(BlockHeader<IsBigEndian>)*Vars.size(),
671                  MPI_BYTE, 0, SplitComm);
672  }
673
674  MPI_Barrier(SplitComm);
675
676  if (FileIOType == FileIOMPI)
677    FH.get() = new GenericFileIO_MPI(SplitComm);
678  else if (FileIOType == FileIOMPICollective)
679    FH.get() = new GenericFileIO_MPICollective(SplitComm);
680  else
681    FH.get() = new GenericFileIO_POSIX();
682
683  FH.get()->open(LocalFileName);
684
685  uint64_t Offset = RHLocal.Start;
686  for (size_t i = 0; i < Vars.size(); ++i) {
687    uint64_t WriteSize = NeedsBlockHeaders ?
688                         LocalBlockHeaders[i].Size : NElems*Vars[i].Size;
689    void *Data = NeedsBlockHeaders ? LocalData[i] : Vars[i].Data;
690    uint64_t CRC = crc64_omp(Data, WriteSize);
691    bool HasExtraSpace = NeedsBlockHeaders ?
692                         LocalHasExtraSpace[i] : Vars[i].HasExtraSpace;
693    char *CRCLoc = HasExtraSpace ?  ((char *) Data) + WriteSize : (char *) &CRC;
694
695    if (NeedsBlockHeaders)
696      Offset = LocalBlockHeaders[i].Start;
697
698    // When using extra space for the CRC write, preserve the original contents.
699    char CRCSave[CRCSize];
700    if (HasExtraSpace)
701      std::copy(CRCLoc, CRCLoc + CRCSize, CRCSave);
702
703    crc64_invert(CRC, CRCLoc);
704
705    if (HasExtraSpace) {
706      FH.get()->write(Data, WriteSize + CRCSize, Offset, Vars[i].Name + " with CRC");
707    } else {
708      FH.get()->write(Data, WriteSize, Offset, Vars[i].Name);
709      FH.get()->write(CRCLoc, CRCSize, Offset + WriteSize, Vars[i].Name + " CRC");
710    }
711
712    if (HasExtraSpace)
713       std::copy(CRCSave, CRCSave + CRCSize, CRCLoc);
714
715    Offset += WriteSize + CRCSize;
716  }
717
718  close();
719  MPI_Barrier(Comm);
720
721  double EndTime = MPI_Wtime();
722  double TotalTime = EndTime - StartTime;
723  double MaxTotalTime;
724  MPI_Reduce(&TotalTime, &MaxTotalTime, 1, MPI_DOUBLE, MPI_MAX, 0, Comm);
725
726  if (SplitNRanks != NRanks) {
727    uint64_t ContribFileSize = (SplitRank == 0) ? FileSize : 0;
728    MPI_Reduce(&ContribFileSize, &FileSize, 1, MPI_UINT64_T, MPI_SUM, 0, Comm);
729  }
730
731  if (Rank == 0) {
732    double Rate = ((double) FileSize) / MaxTotalTime / (1024.*1024.);
733    cout << "Wrote " << Vars.size() << " variables to " << FileName <<
734            " (" << FileSize << " bytes) in " << MaxTotalTime << "s: " <<
735            Rate << " MB/s" << endl;
736  }
737
738  MPI_Comm_free(&SplitComm);
739  SplitComm = MPI_COMM_NULL;
740}
741#endif // GENERICIO_NO_MPI
742
743template <bool IsBigEndian>
744void GenericIO::readHeaderLeader(void *GHPtr, MismatchBehavior MB, int NRanks,
745                                 int Rank, int SplitNRanks,
746                                 string &LocalFileName, uint64_t &HeaderSize,
747                                 vector<char> &Header) {
748  GlobalHeader<IsBigEndian> &GH = *(GlobalHeader<IsBigEndian> *) GHPtr;
749
750  if (MB == MismatchDisallowed) {
751    if (SplitNRanks != (int) GH.NRanks) {
752      stringstream ss;
753      ss << "Won't read " << LocalFileName << ": communicator-size mismatch: " <<
754            "current: " << SplitNRanks << ", file: " << GH.NRanks;
755      throw runtime_error(ss.str());
756    }
757
758#ifndef GENERICIO_NO_MPI
759    int TopoStatus;
760    MPI_Topo_test(Comm, &TopoStatus);
761    if (TopoStatus == MPI_CART) {
762      int Dims[3], Periods[3], Coords[3];
763      MPI_Cart_get(Comm, 3, Dims, Periods, Coords);
764
765      bool DimsMatch = true;
766      for (int i = 0; i < 3; ++i) {
767        if ((uint64_t) Dims[i] != GH.Dims[i]) {
768          DimsMatch = false;
769          break;
770        }
771      }
772
773      if (!DimsMatch) {
774        stringstream ss;
775        ss << "Won't read " << LocalFileName <<
776              ": communicator-decomposition mismatch: " <<
777              "current: " << Dims[0] << "x" << Dims[1] << "x" << Dims[2] <<
778              ", file: " << GH.Dims[0] << "x" << GH.Dims[1] << "x" <<
779              GH.Dims[2];
780        throw runtime_error(ss.str());
781      }
782    }
783#endif
784  } else if (MB == MismatchRedistribute && !Redistributing) {
785    Redistributing = true;
786
787    int NFileRanks = RankMap.empty() ? (int) GH.NRanks : (int) RankMap.size();
788    int NFileRanksPerRank = NFileRanks/NRanks;
789    int NRemFileRank = NFileRanks % NRanks;
790
791    if (!NFileRanksPerRank) {
792      // We have only the remainder, so the last NRemFileRank ranks get one
793      // file rank, and the others don't.
794      if (NRemFileRank && NRanks - Rank <= NRemFileRank)
795        SourceRanks.push_back(NRanks - (Rank + 1));
796    } else {
797      // Since NRemFileRank < NRanks, and we don't want to put any extra memory
798      // load on rank 0 (because rank 0's memory load is normally higher than
799      // the other ranks anyway), the last NRemFileRank will each take
800      // (NFileRanksPerRank+1) file ranks.
801
802      int FirstFileRank = 0, LastFileRank = NFileRanksPerRank - 1;
803      for (int i = 1; i <= Rank; ++i) {
804        FirstFileRank = LastFileRank + 1;
805        LastFileRank  = FirstFileRank + NFileRanksPerRank - 1;
806
807        if (NRemFileRank && NRanks - i <= NRemFileRank)
808          ++LastFileRank;
809      }
810
811      for (int i = FirstFileRank; i <= LastFileRank; ++i)
812        SourceRanks.push_back(i);
813    }
814  }
815
816  HeaderSize = GH.HeaderSize;
817  Header.resize(HeaderSize + CRCSize, 0xFE /* poison */);
818  FH.get()->read(&Header[0], HeaderSize + CRCSize, 0, "header");
819
820  uint64_t CRC = crc64_omp(&Header[0], HeaderSize + CRCSize);
821  if (CRC != (uint64_t) -1) {
822    throw runtime_error("Header CRC check failed: " + LocalFileName);
823  }
824}
825
826// Note: Errors from this function should be recoverable. This means that if
827// one rank throws an exception, then all ranks should.
828void GenericIO::openAndReadHeader(MismatchBehavior MB, int EffRank, bool CheckPartMap) {
829  int NRanks, Rank;
830#ifndef GENERICIO_NO_MPI
831  MPI_Comm_rank(Comm, &Rank);
832  MPI_Comm_size(Comm, &NRanks);
833#else
834  Rank = 0;
835  NRanks = 1;
836#endif
837
838  if (EffRank == -1)
839    EffRank = MB == MismatchRedistribute ? 0 : Rank;
840
841  if (RankMap.empty() && CheckPartMap) {
842    // First, check to see if the file is a rank map.
843    unsigned long RanksInMap = 0;
844    if (Rank == 0) {
845      try {
846#ifndef GENERICIO_NO_MPI
847        GenericIO GIO(MPI_COMM_SELF, FileName, FileIOType);
848#else
849        GenericIO GIO(FileName, FileIOType);
850#endif
851        GIO.openAndReadHeader(MismatchDisallowed, 0, false);
852        RanksInMap = GIO.readNumElems();
853
854        RankMap.resize(RanksInMap + GIO.requestedExtraSpace()/sizeof(int));
855        GIO.addVariable("$partition", RankMap, true);
856
857        GIO.readData(0, false);
858        RankMap.resize(RanksInMap);
859      } catch (...) {
860        RankMap.clear();
861        RanksInMap = 0;
862      }
863    }
864
865#ifndef GENERICIO_NO_MPI
866    MPI_Bcast(&RanksInMap, 1, MPI_UNSIGNED_LONG, 0, Comm);
867    if (RanksInMap > 0) {
868      RankMap.resize(RanksInMap);
869      MPI_Bcast(&RankMap[0], RanksInMap, MPI_INT, 0, Comm);
870    }
871#endif
872  }
873
874#ifndef GENERICIO_NO_MPI
875  if (SplitComm != MPI_COMM_NULL)
876    MPI_Comm_free(&SplitComm);
877#endif
878
879  string LocalFileName;
880  if (RankMap.empty()) {
881    LocalFileName = FileName;
882#ifndef GENERICIO_NO_MPI
883    MPI_Comm_dup(MB == MismatchRedistribute ? MPI_COMM_SELF : Comm, &SplitComm);
884#endif
885  } else {
886    stringstream ss;
887    ss << FileName << "#" << RankMap[EffRank];
888    LocalFileName = ss.str();
889#ifndef GENERICIO_NO_MPI
890    if (MB == MismatchRedistribute) {
891      MPI_Comm_dup(MPI_COMM_SELF, &SplitComm);
892    } else {
893#ifdef __bgq__
894      MPI_Barrier(Comm);
895#endif
896      MPI_Comm_split(Comm, RankMap[EffRank], Rank, &SplitComm);
897    }
898#endif
899  }
900
901  if (LocalFileName == OpenFileName)
902    return;
903  FH.close();
904
905  int SplitNRanks, SplitRank;
906#ifndef GENERICIO_NO_MPI
907  MPI_Comm_rank(SplitComm, &SplitRank);
908  MPI_Comm_size(SplitComm, &SplitNRanks);
909#else
910  SplitRank = 0;
911  SplitNRanks = 1;
912#endif
913
914  uint64_t HeaderSize;
915  vector<char> Header;
916
917  if (SplitRank == 0) {
918#ifndef GENERICIO_NO_MPI
919    if (FileIOType == FileIOMPI)
920      FH.get() = new GenericFileIO_MPI(MPI_COMM_SELF);
921    else if (FileIOType == FileIOMPICollective)
922      FH.get() = new GenericFileIO_MPICollective(MPI_COMM_SELF);
923    else
924#endif
925      FH.get() = new GenericFileIO_POSIX();
926
927#ifndef GENERICIO_NO_MPI
928    char True = 1, False = 0;
929#endif
930
931    try {
932      FH.get()->open(LocalFileName, true);
933
934      GlobalHeader<false> GH; // endianness does not matter yet...
935      FH.get()->read(&GH, sizeof(GlobalHeader<false>), 0, "global header");
936
937      if (string(GH.Magic, GH.Magic + MagicSize - 1) == MagicLE) {
938        readHeaderLeader<false>(&GH, MB, NRanks, Rank, SplitNRanks, LocalFileName,
939                                HeaderSize, Header);
940      } else if (string(GH.Magic, GH.Magic + MagicSize - 1) == MagicBE) {
941        readHeaderLeader<true>(&GH, MB, NRanks, Rank, SplitNRanks, LocalFileName,
942                               HeaderSize, Header);
943      } else {
944        string Error = "invalid file-type identifier";
945        throw runtime_error("Won't read " + LocalFileName + ": " + Error);
946      }
947
948#ifndef GENERICIO_NO_MPI
949      close();
950      MPI_Bcast(&True, 1, MPI_BYTE, 0, SplitComm);
951#endif
952    } catch (...) {
953#ifndef GENERICIO_NO_MPI
954      MPI_Bcast(&False, 1, MPI_BYTE, 0, SplitComm);
955#endif
956      close();
957      throw;
958    }
959  } else {
960#ifndef GENERICIO_NO_MPI
961    char Okay;
962    MPI_Bcast(&Okay, 1, MPI_BYTE, 0, SplitComm);
963    if (!Okay)
964      throw runtime_error("Failure broadcast from rank 0");
965#endif
966  }
967
968#ifndef GENERICIO_NO_MPI
969  MPI_Bcast(&HeaderSize, 1, MPI_UINT64_T, 0, SplitComm);
970#endif
971
972  Header.resize(HeaderSize, 0xFD /* poison */);
973#ifndef GENERICIO_NO_MPI
974  MPI_Bcast(&Header[0], HeaderSize, MPI_BYTE, 0, SplitComm);
975#endif
976
977  FH.getHeaderCache().clear();
978
979  GlobalHeader<false> *GH = (GlobalHeader<false> *) &Header[0];
980  FH.setIsBigEndian(string(GH->Magic, GH->Magic + MagicSize - 1) == MagicBE);
981
982  FH.getHeaderCache().swap(Header);
983  OpenFileName = LocalFileName;
984
985#ifndef GENERICIO_NO_MPI
986  if (!DisableCollErrChecking)
987    MPI_Barrier(Comm);
988
989  if (FileIOType == FileIOMPI)
990    FH.get() = new GenericFileIO_MPI(SplitComm);
991  else if (FileIOType == FileIOMPICollective)
992    FH.get() = new GenericFileIO_MPICollective(SplitComm);
993  else
994    FH.get() = new GenericFileIO_POSIX();
995
996  int OpenErr = 0, TotOpenErr;
997  try {
998    FH.get()->open(LocalFileName, true);
999    MPI_Allreduce(&OpenErr, &TotOpenErr, 1, MPI_INT, MPI_SUM,
1000                  DisableCollErrChecking ? MPI_COMM_SELF : Comm);
1001  } catch (...) {
1002    OpenErr = 1;
1003    MPI_Allreduce(&OpenErr, &TotOpenErr, 1, MPI_INT, MPI_SUM,
1004                  DisableCollErrChecking ? MPI_COMM_SELF : Comm);
1005    throw;
1006  }
1007
1008  if (TotOpenErr > 0) {
1009    stringstream ss;
1010    ss << TotOpenErr << " ranks failed to open file: " << LocalFileName;
1011    throw runtime_error(ss.str());
1012  }
1013#endif
1014}
1015
1016int GenericIO::readNRanks() {
1017  if (FH.isBigEndian())
1018    return readNRanks<true>();
1019  return readNRanks<false>();
1020}
1021
1022template <bool IsBigEndian>
1023int GenericIO::readNRanks() {
1024  if (RankMap.size())
1025    return RankMap.size();
1026
1027  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1028  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1029  return (int) GH->NRanks;
1030}
1031
1032void GenericIO::readDims(int Dims[3]) {
1033  if (FH.isBigEndian())
1034    readDims<true>(Dims);
1035  else
1036    readDims<false>(Dims);
1037}
1038
1039template <bool IsBigEndian>
1040void GenericIO::readDims(int Dims[3]) {
1041  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1042  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1043  std::copy(GH->Dims, GH->Dims + 3, Dims);
1044}
1045
1046uint64_t GenericIO::readTotalNumElems() {
1047  if (FH.isBigEndian())
1048    return readTotalNumElems<true>();
1049  return readTotalNumElems<false>();
1050}
1051
1052template <bool IsBigEndian>
1053uint64_t GenericIO::readTotalNumElems() {
1054  if (RankMap.size())
1055    return (uint64_t) -1;
1056
1057  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1058  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1059  return GH->NElems;
1060}
1061
1062void GenericIO::readPhysOrigin(double Origin[3]) {
1063  if (FH.isBigEndian())
1064    readPhysOrigin<true>(Origin);
1065  else
1066    readPhysOrigin<false>(Origin);
1067}
1068
1069// Define a "safe" version of offsetof (offsetof itself might not work for
1070// non-POD types, and at least xlC v12.1 will complain about this if you try).
1071#define offsetof_safe(S, F) (size_t(&(S)->F) - size_t(S))
1072
1073template <bool IsBigEndian>
1074void GenericIO::readPhysOrigin(double Origin[3]) {
1075  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1076  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1077  if (offsetof_safe(GH, PhysOrigin) >= GH->GlobalHeaderSize) {
1078    std::fill(Origin, Origin + 3, 0.0);
1079    return;
1080  }
1081
1082  std::copy(GH->PhysOrigin, GH->PhysOrigin + 3, Origin);
1083}
1084
1085void GenericIO::readPhysScale(double Scale[3]) {
1086  if (FH.isBigEndian())
1087    readPhysScale<true>(Scale);
1088  else
1089    readPhysScale<false>(Scale);
1090}
1091
1092template <bool IsBigEndian>
1093void GenericIO::readPhysScale(double Scale[3]) {
1094  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1095  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1096  if (offsetof_safe(GH, PhysScale) >= GH->GlobalHeaderSize) {
1097    std::fill(Scale, Scale + 3, 0.0);
1098    return;
1099  }
1100
1101  std::copy(GH->PhysScale, GH->PhysScale + 3, Scale);
1102}
1103
1104template <bool IsBigEndian>
1105static size_t getRankIndex(int EffRank, GlobalHeader<IsBigEndian> *GH,
1106                           vector<int> &RankMap, vector<char> &HeaderCache) {
1107  if (RankMap.empty())
1108    return EffRank;
1109
1110  for (size_t i = 0; i < GH->NRanks; ++i) {
1111    RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &HeaderCache[GH->RanksStart +
1112                                                 i*GH->RanksSize];
1113    if (offsetof_safe(RH, GlobalRank) >= GH->RanksSize)
1114      return EffRank;
1115
1116    if ((int) RH->GlobalRank == EffRank)
1117      return i;
1118  }
1119
1120  assert(false && "Index requested of an invalid rank");
1121  return (size_t) -1;
1122}
1123
1124int GenericIO::readGlobalRankNumber(int EffRank) {
1125  if (FH.isBigEndian())
1126    return readGlobalRankNumber<true>(EffRank);
1127  return readGlobalRankNumber<false>(EffRank);
1128}
1129
1130template <bool IsBigEndian>
1131int GenericIO::readGlobalRankNumber(int EffRank) {
1132  if (EffRank == -1) {
1133#ifndef GENERICIO_NO_MPI
1134    MPI_Comm_rank(Comm, &EffRank);
1135#else
1136    EffRank = 0;
1137#endif
1138  }
1139
1140  openAndReadHeader(MismatchAllowed, EffRank, false);
1141
1142  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1143
1144  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1145  size_t RankIndex = getRankIndex<IsBigEndian>(EffRank, GH, RankMap, FH.getHeaderCache());
1146
1147  assert(RankIndex < GH->NRanks && "Invalid rank specified");
1148
1149  RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->RanksStart +
1150                                               RankIndex*GH->RanksSize];
1151
1152  if (offsetof_safe(RH, GlobalRank) >= GH->RanksSize)
1153    return EffRank;
1154
1155  return (int) RH->GlobalRank;
1156}
1157
1158void GenericIO::getSourceRanks(vector<int> &SR) {
1159  SR.clear();
1160
1161  if (Redistributing) {
1162    std::copy(SourceRanks.begin(), SourceRanks.end(), std::back_inserter(SR));
1163    return;
1164  }
1165
1166  int Rank;
1167#ifndef GENERICIO_NO_MPI
1168  MPI_Comm_rank(Comm, &Rank);
1169#else
1170  Rank = 0;
1171#endif
1172
1173  SR.push_back(Rank);
1174}
1175
1176size_t GenericIO::readNumElems(int EffRank) {
1177  if (EffRank == -1 && Redistributing) {
1178    DisableCollErrChecking = true;
1179
1180    size_t TotalSize = 0;
1181    for (int i = 0, ie = SourceRanks.size(); i != ie; ++i)
1182      TotalSize += readNumElems(SourceRanks[i]);
1183
1184    DisableCollErrChecking = false;
1185    return TotalSize;
1186  }
1187
1188  if (FH.isBigEndian())
1189    return readNumElems<true>(EffRank);
1190  return readNumElems<false>(EffRank);
1191}
1192
1193template <bool IsBigEndian>
1194size_t GenericIO::readNumElems(int EffRank) {
1195  if (EffRank == -1) {
1196#ifndef GENERICIO_NO_MPI
1197    MPI_Comm_rank(Comm, &EffRank);
1198#else
1199    EffRank = 0;
1200#endif
1201  }
1202
1203  openAndReadHeader(Redistributing ? MismatchRedistribute : MismatchAllowed,
1204                    EffRank, false);
1205
1206  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1207
1208  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1209  size_t RankIndex = getRankIndex<IsBigEndian>(EffRank, GH, RankMap, FH.getHeaderCache());
1210
1211  assert(RankIndex < GH->NRanks && "Invalid rank specified");
1212
1213  RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->RanksStart +
1214                                               RankIndex*GH->RanksSize];
1215  return (size_t) RH->NElems;
1216}
1217
1218void GenericIO::readCoords(int Coords[3], int EffRank) {
1219  if (EffRank == -1 && Redistributing) {
1220    std::fill(Coords, Coords + 3, 0);
1221    return;
1222  }
1223
1224  if (FH.isBigEndian())
1225    readCoords<true>(Coords, EffRank);
1226  else
1227    readCoords<false>(Coords, EffRank);
1228}
1229
1230template <bool IsBigEndian>
1231void GenericIO::readCoords(int Coords[3], int EffRank) {
1232  if (EffRank == -1) {
1233#ifndef GENERICIO_NO_MPI
1234    MPI_Comm_rank(Comm, &EffRank);
1235#else
1236    EffRank = 0;
1237#endif
1238  }
1239
1240  openAndReadHeader(MismatchAllowed, EffRank, false);
1241
1242  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1243
1244  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1245  size_t RankIndex = getRankIndex<IsBigEndian>(EffRank, GH, RankMap, FH.getHeaderCache());
1246
1247  assert(RankIndex < GH->NRanks && "Invalid rank specified");
1248
1249  RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->RanksStart +
1250                                               RankIndex*GH->RanksSize];
1251
1252  std::copy(RH->Coords, RH->Coords + 3, Coords);
1253}
1254
1255void GenericIO::readData(int EffRank, bool PrintStats, bool CollStats) {
1256  int Rank;
1257#ifndef GENERICIO_NO_MPI
1258  MPI_Comm_rank(Comm, &Rank);
1259#else
1260  Rank = 0;
1261#endif
1262
1263  uint64_t TotalReadSize = 0;
1264#ifndef GENERICIO_NO_MPI
1265  double StartTime = MPI_Wtime();
1266#else
1267  double StartTime = double(clock())/CLOCKS_PER_SEC;
1268#endif
1269
1270  int NErrs[3] = { 0, 0, 0 };
1271
1272  if (EffRank == -1 && Redistributing) {
1273    DisableCollErrChecking = true;
1274
1275    size_t RowOffset = 0;
1276    for (int i = 0, ie = SourceRanks.size(); i != ie; ++i) {
1277      readData(SourceRanks[i], RowOffset, Rank, TotalReadSize, NErrs);
1278      RowOffset += readNumElems(SourceRanks[i]);
1279    }
1280
1281    DisableCollErrChecking = false;
1282  } else {
1283    readData(EffRank, 0, Rank, TotalReadSize, NErrs);
1284  }
1285
1286  int AllNErrs[3];
1287#ifndef GENERICIO_NO_MPI
1288  MPI_Allreduce(NErrs, AllNErrs, 3, MPI_INT, MPI_SUM, Comm);
1289#else
1290  AllNErrs[0] = NErrs[0]; AllNErrs[1] = NErrs[1]; AllNErrs[2] = NErrs[2];
1291#endif
1292
1293  if (AllNErrs[0] > 0 || AllNErrs[1] > 0 || AllNErrs[2] > 0) {
1294    stringstream ss;
1295    ss << "Experienced " << AllNErrs[0] << " I/O error(s), " <<
1296          AllNErrs[1] << " CRC error(s) and " << AllNErrs[2] <<
1297          " decompression CRC error(s) reading: " << OpenFileName;
1298    throw runtime_error(ss.str());
1299  }
1300
1301#ifndef GENERICIO_NO_MPI
1302  MPI_Barrier(Comm);
1303#endif
1304
1305#ifndef GENERICIO_NO_MPI
1306  double EndTime = MPI_Wtime();
1307#else
1308  double EndTime = double(clock())/CLOCKS_PER_SEC;
1309#endif
1310
1311  double TotalTime = EndTime - StartTime;
1312  double MaxTotalTime;
1313#ifndef GENERICIO_NO_MPI
1314  if (CollStats)
1315    MPI_Reduce(&TotalTime, &MaxTotalTime, 1, MPI_DOUBLE, MPI_MAX, 0, Comm);
1316  else
1317#endif
1318  MaxTotalTime = TotalTime;
1319
1320  uint64_t AllTotalReadSize;
1321#ifndef GENERICIO_NO_MPI
1322  if (CollStats)
1323    MPI_Reduce(&TotalReadSize, &AllTotalReadSize, 1, MPI_UINT64_T, MPI_SUM, 0, Comm);
1324  else
1325#endif
1326  AllTotalReadSize = TotalReadSize;
1327
1328  if (Rank == 0 && PrintStats) {
1329    double Rate = ((double) AllTotalReadSize) / MaxTotalTime / (1024.*1024.);
1330    cout << "Read " << Vars.size() << " variables from " << FileName <<
1331            " (" << AllTotalReadSize << " bytes) in " << MaxTotalTime << "s: " <<
1332            Rate << " MB/s [excluding header read]" << endl;
1333  }
1334}
1335
1336void GenericIO::readData(int EffRank, size_t RowOffset, int Rank,
1337                         uint64_t &TotalReadSize, int NErrs[3]) {
1338  if (FH.isBigEndian())
1339    readData<true>(EffRank, RowOffset, Rank, TotalReadSize, NErrs);
1340  else
1341    readData<false>(EffRank, RowOffset, Rank, TotalReadSize, NErrs);
1342}
1343
1344// Note: Errors from this function should be recoverable. This means that if
1345// one rank throws an exception, then all ranks should.
1346template <bool IsBigEndian>
1347void GenericIO::readData(int EffRank, size_t RowOffset, int Rank,
1348                         uint64_t &TotalReadSize, int NErrs[3]) {
1349  openAndReadHeader(Redistributing ? MismatchRedistribute : MismatchAllowed,
1350                    EffRank, false);
1351
1352  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1353
1354  if (EffRank == -1)
1355    EffRank = Rank;
1356
1357  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1358  size_t RankIndex = getRankIndex<IsBigEndian>(EffRank, GH, RankMap, FH.getHeaderCache());
1359
1360  assert(RankIndex < GH->NRanks && "Invalid rank specified");
1361
1362  RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->RanksStart +
1363                                               RankIndex*GH->RanksSize];
1364
1365  for (size_t i = 0; i < Vars.size(); ++i) {
1366    uint64_t Offset = RH->Start;
1367    bool VarFound = false;
1368    for (uint64_t j = 0; j < GH->NVars; ++j) {
1369      VariableHeader<IsBigEndian> *VH = (VariableHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->VarsStart +
1370                                                           j*GH->VarsSize];
1371
1372      string VName(VH->Name, VH->Name + NameSize);
1373      size_t VNameNull = VName.find('\0');
1374      if (VNameNull < NameSize)
1375        VName.resize(VNameNull);
1376
1377      uint64_t ReadSize = RH->NElems*VH->Size + CRCSize;
1378      if (VName != Vars[i].Name) {
1379        Offset += ReadSize;
1380        continue;
1381      }
1382
1383      size_t ElementSize = VH->Size;
1384      if (offsetof_safe(VH, ElementSize) < GH->VarsSize)
1385        ElementSize = VH->ElementSize;
1386
1387      VarFound = true;
1388      bool IsFloat = (bool) (VH->Flags & FloatValue),
1389           IsSigned = (bool) (VH->Flags & SignedValue);
1390      if (VH->Size != Vars[i].Size) {
1391        stringstream ss;
1392        ss << "Size mismatch for variable " << Vars[i].Name <<
1393              " in: " << OpenFileName << ": current: " << Vars[i].Size <<
1394              ", file: " << VH->Size;
1395        throw runtime_error(ss.str());
1396      } else if (ElementSize != Vars[i].ElementSize) {
1397        stringstream ss;
1398        ss << "Element size mismatch for variable " << Vars[i].Name <<
1399              " in: " << OpenFileName << ": current: " << Vars[i].ElementSize <<
1400              ", file: " << ElementSize;
1401        throw runtime_error(ss.str());
1402      } else if (IsFloat != Vars[i].IsFloat) {
1403        string Float("float"), Int("integer");
1404        stringstream ss;
1405        ss << "Type mismatch for variable " << Vars[i].Name <<
1406              " in: " << OpenFileName << ": current: " <<
1407              (Vars[i].IsFloat ? Float : Int) <<
1408              ", file: " << (IsFloat ? Float : Int);
1409        throw runtime_error(ss.str());
1410      } else if (IsSigned != Vars[i].IsSigned) {
1411        string Signed("signed"), Uns("unsigned");
1412        stringstream ss;
1413        ss << "Type mismatch for variable " << Vars[i].Name <<
1414              " in: " << OpenFileName << ": current: " <<
1415              (Vars[i].IsSigned ? Signed : Uns) <<
1416              ", file: " << (IsSigned ? Signed : Uns);
1417        throw runtime_error(ss.str());
1418      }
1419
1420      size_t VarOffset = RowOffset*Vars[i].Size;
1421      void *VarData = ((char *) Vars[i].Data) + VarOffset;
1422
1423      vector<unsigned char> LData;
1424      void *Data = VarData;
1425      bool HasExtraSpace = Vars[i].HasExtraSpace;
1426      if (offsetof_safe(GH, BlocksStart) < GH->GlobalHeaderSize &&
1427          GH->BlocksSize > 0) {
1428        BlockHeader<IsBigEndian> *BH = (BlockHeader<IsBigEndian> *)
1429          &FH.getHeaderCache()[GH->BlocksStart +
1430                               (RankIndex*GH->NVars + j)*GH->BlocksSize];
1431        ReadSize = BH->Size + CRCSize;
1432        Offset = BH->Start;
1433
1434        if (strncmp(BH->Filters[0], CompressName, FilterNameSize) == 0) {
1435          LData.resize(ReadSize);
1436          Data = &LData[0];
1437          HasExtraSpace = true;
1438        } else if (BH->Filters[0][0] != '\0') {
1439          stringstream ss;
1440          ss << "Unknown filter \"" << BH->Filters[0] << "\" on variable " << Vars[i].Name;
1441          throw runtime_error(ss.str());
1442        }
1443      }
1444
1445      assert(HasExtraSpace && "Extra space required for reading");
1446
1447      char CRCSave[CRCSize];
1448      char *CRCLoc = ((char *) Data) + ReadSize - CRCSize;
1449      if (HasExtraSpace)
1450        std::copy(CRCLoc, CRCLoc + CRCSize, CRCSave);
1451
1452      int Retry = 0;
1453      {
1454        int RetryCount = 300;
1455        const char *EnvStr = getenv("GENERICIO_RETRY_COUNT");
1456        if (EnvStr)
1457          RetryCount = atoi(EnvStr);
1458
1459        int RetrySleep = 100; // ms
1460        EnvStr = getenv("GENERICIO_RETRY_SLEEP");
1461        if (EnvStr)
1462          RetrySleep = atoi(EnvStr);
1463
1464        for (; Retry < RetryCount; ++Retry) {
1465          try {
1466            FH.get()->read(Data, ReadSize, Offset, Vars[i].Name);
1467            break;
1468          } catch (...) { }
1469
1470          usleep(1000*RetrySleep);
1471        }
1472
1473        if (Retry == RetryCount) {
1474          ++NErrs[0];
1475          break;
1476        } else if (Retry > 0) {
1477          EnvStr = getenv("GENERICIO_VERBOSE");
1478          if (EnvStr) {
1479            int Mod = atoi(EnvStr);
1480            if (Mod > 0) {
1481              int Rank;
1482#ifndef GENERICIO_NO_MPI
1483              MPI_Comm_rank(MPI_COMM_WORLD, &Rank);
1484#else
1485              Rank = 0;
1486#endif
1487
1488              std::cerr << "Rank " << Rank << ": " << Retry <<
1489                           " I/O retries were necessary for reading " <<
1490                           Vars[i].Name << " from: " << OpenFileName << "\n";
1491
1492              std::cerr.flush();
1493            }
1494          }
1495        }
1496      }
1497
1498      TotalReadSize += ReadSize;
1499
1500      uint64_t CRC = crc64_omp(Data, ReadSize);
1501      if (CRC != (uint64_t) -1) {
1502        ++NErrs[1];
1503
1504        int Rank;
1505#ifndef GENERICIO_NO_MPI
1506        MPI_Comm_rank(MPI_COMM_WORLD, &Rank);
1507#else
1508        Rank = 0;
1509#endif
1510
1511        // All ranks will do this and have a good time!
1512        string dn = "gio_crc_errors";
1513        mkdir(dn.c_str(), 0777);
1514
1515        srand(time(0));
1516        int DumpNum = rand();
1517        stringstream ssd;
1518        ssd << dn << "/gio_crc_error_dump." << Rank << "." << DumpNum << ".bin";
1519
1520        stringstream ss;
1521        ss << dn << "/gio_crc_error_log." << Rank << ".txt";
1522
1523        ofstream ofs(ss.str().c_str(), ofstream::out | ofstream::app);
1524        ofs << "On-Disk CRC Error Report:\n";
1525        ofs << "Variable: " << Vars[i].Name << "\n";
1526        ofs << "File: " << OpenFileName << "\n";
1527        ofs << "I/O Retries: " << Retry << "\n";
1528        ofs << "Size: " << ReadSize << " bytes\n";
1529        ofs << "Offset: " << Offset << " bytes\n";
1530        ofs << "CRC: " << CRC << " (expected is -1)\n";
1531        ofs << "Dump file: " << ssd.str() << "\n";
1532        ofs << "\n";
1533        ofs.close();
1534
1535        ofstream dofs(ssd.str().c_str(), ofstream::out);
1536        dofs.write((const char *) Data, ReadSize);
1537        dofs.close();
1538
1539        uint64_t RawCRC = crc64_omp(Data, ReadSize - CRCSize);
1540        unsigned char *UData = (unsigned char *) Data;
1541        crc64_invert(RawCRC, &UData[ReadSize - CRCSize]);
1542        uint64_t NewCRC = crc64_omp(Data, ReadSize);
1543        std::cerr << "Recalulated CRC: " << NewCRC << ((NewCRC == -1) ? "ok" : "bad") << "\n";
1544        break;
1545      }
1546
1547      if (HasExtraSpace)
1548        std::copy(CRCSave, CRCSave + CRCSize, CRCLoc);
1549
1550      if (LData.size()) {
1551        CompressHeader<IsBigEndian> *CH = (CompressHeader<IsBigEndian>*) &LData[0];
1552
1553#ifdef _OPENMP
1554#pragma omp master
1555  {
1556#endif
1557
1558       if (!blosc_initialized) {
1559         blosc_init();
1560         blosc_initialized = true;
1561       }
1562
1563#ifdef _OPENMP
1564       blosc_set_nthreads(omp_get_max_threads());
1565  }
1566#endif
1567
1568        blosc_decompress(&LData[0] + sizeof(CompressHeader<IsBigEndian>),
1569                         VarData, Vars[i].Size*RH->NElems);
1570
1571        if (CH->OrigCRC != crc64_omp(VarData, Vars[i].Size*RH->NElems)) {
1572          ++NErrs[2];
1573          break;
1574        }
1575      }
1576
1577      // Byte swap the data if necessary.
1578      if (IsBigEndian != isBigEndian())
1579        for (size_t j = 0;
1580             j < RH->NElems*(Vars[i].Size/Vars[i].ElementSize); ++j) {
1581          char *Offset = ((char *) VarData) + j*Vars[i].ElementSize;
1582          bswap(Offset, Vars[i].ElementSize);
1583        }
1584
1585      break;
1586    }
1587
1588    if (!VarFound)
1589      throw runtime_error("Variable " + Vars[i].Name +
1590                          " not found in: " + OpenFileName);
1591
1592    // This is for debugging.
1593    if (NErrs[0] || NErrs[1] || NErrs[2]) {
1594      const char *EnvStr = getenv("GENERICIO_VERBOSE");
1595      if (EnvStr) {
1596        int Mod = atoi(EnvStr);
1597        if (Mod > 0) {
1598          int Rank;
1599#ifndef GENERICIO_NO_MPI
1600          MPI_Comm_rank(MPI_COMM_WORLD, &Rank);
1601#else
1602          Rank = 0;
1603#endif
1604
1605          std::cerr << "Rank " << Rank << ": " << NErrs[0] << " I/O error(s), " <<
1606          NErrs[1] << " CRC error(s) and " << NErrs[2] <<
1607          " decompression CRC error(s) reading: " << Vars[i].Name <<
1608          " from: " << OpenFileName << "\n";
1609
1610          std::cerr.flush();
1611        }
1612      }
1613    }
1614
1615    if (NErrs[0] || NErrs[1] || NErrs[2])
1616      break;
1617  }
1618}
1619
1620void GenericIO::getVariableInfo(vector<VariableInfo> &VI) {
1621  if (FH.isBigEndian())
1622    getVariableInfo<true>(VI);
1623  else
1624    getVariableInfo<false>(VI);
1625}
1626
1627template <bool IsBigEndian>
1628void GenericIO::getVariableInfo(vector<VariableInfo> &VI) {
1629  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1630
1631  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1632  for (uint64_t j = 0; j < GH->NVars; ++j) {
1633    VariableHeader<IsBigEndian> *VH = (VariableHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->VarsStart +
1634                                                         j*GH->VarsSize];
1635
1636    string VName(VH->Name, VH->Name + NameSize);
1637    size_t VNameNull = VName.find('\0');
1638    if (VNameNull < NameSize)
1639      VName.resize(VNameNull);
1640
1641    size_t ElementSize = VH->Size;
1642    if (offsetof_safe(VH, ElementSize) < GH->VarsSize)
1643      ElementSize = VH->ElementSize;
1644
1645    bool IsFloat = (bool) (VH->Flags & FloatValue),
1646         IsSigned = (bool) (VH->Flags & SignedValue),
1647         IsPhysCoordX = (bool) (VH->Flags & ValueIsPhysCoordX),
1648         IsPhysCoordY = (bool) (VH->Flags & ValueIsPhysCoordY),
1649         IsPhysCoordZ = (bool) (VH->Flags & ValueIsPhysCoordZ),
1650         MaybePhysGhost = (bool) (VH->Flags & ValueMaybePhysGhost);
1651    VI.push_back(VariableInfo(VName, (size_t) VH->Size, IsFloat, IsSigned,
1652                              IsPhysCoordX, IsPhysCoordY, IsPhysCoordZ,
1653                              MaybePhysGhost, ElementSize));
1654  }
1655}
1656
1657void GenericIO::setNaturalDefaultPartition() {
1658#ifdef __bgq__
1659  DefaultPartition = MPIX_IO_link_id();
1660#else
1661#ifndef GENERICIO_NO_MPI
1662  bool UseName = true;
1663  const char *EnvStr = getenv("GENERICIO_PARTITIONS_USE_NAME");
1664  if (EnvStr) {
1665    int Mod = atoi(EnvStr);
1666    UseName = (Mod != 0);
1667  }
1668
1669  if (UseName) {
1670    // This is a heuristic to generate ~256 partitions based on the
1671    // names of the nodes.
1672    char Name[MPI_MAX_PROCESSOR_NAME];
1673    int Len = 0;
1674
1675    MPI_Get_processor_name(Name, &Len);
1676    unsigned char color = 0;
1677    for (int i = 0; i < Len; ++i)
1678      color += (unsigned char) Name[i];
1679
1680    DefaultPartition = color;
1681  }
1682
1683  // This is for debugging.
1684  EnvStr = getenv("GENERICIO_RANK_PARTITIONS");
1685  if (EnvStr) {
1686    int Mod = atoi(EnvStr);
1687    if (Mod > 0) {
1688      int Rank;
1689      MPI_Comm_rank(MPI_COMM_WORLD, &Rank);
1690      DefaultPartition += Rank % Mod;
1691    }
1692  }
1693#endif
1694#endif
1695}
1696
1697} /* END namespace cosmotk */
Note: See TracBrowser for help on using the repository browser.