source: GenericIO.cxx @ 8f0a211

Revision 8f0a211, 51.8 KB checked in by Hal Finkel <hfinkel@…>, 8 years ago (diff)

Add redistribution support

  • Property mode set to 100644
Line 
1/*
2 *                    Copyright (C) 2015, UChicago Argonne, LLC
3 *                               All Rights Reserved
4 *
5 *                               Generic IO (ANL-15-066)
6 *                     Hal Finkel, Argonne National Laboratory
7 *
8 *                              OPEN SOURCE LICENSE
9 *
10 * Under the terms of Contract No. DE-AC02-06CH11357 with UChicago Argonne,
11 * LLC, the U.S. Government retains certain rights in this software.
12 *
13 * Redistribution and use in source and binary forms, with or without
14 * modification, are permitted provided that the following conditions are met:
15 *
16 *   1. Redistributions of source code must retain the above copyright notice,
17 *      this list of conditions and the following disclaimer.
18 *
19 *   2. Redistributions in binary form must reproduce the above copyright
20 *      notice, this list of conditions and the following disclaimer in the
21 *      documentation and/or other materials provided with the distribution.
22 *
23 *   3. Neither the names of UChicago Argonne, LLC or the Department of Energy
24 *      nor the names of its contributors may be used to endorse or promote
25 *      products derived from this software without specific prior written
26 *      permission.
27 *
28 * *****************************************************************************
29 *
30 *                                  DISCLAIMER
31 * THE SOFTWARE IS SUPPLIED “AS IS” WITHOUT WARRANTY OF ANY KIND.  NEITHER THE
32 * UNTED STATES GOVERNMENT, NOR THE UNITED STATES DEPARTMENT OF ENERGY, NOR
33 * UCHICAGO ARGONNE, LLC, NOR ANY OF THEIR EMPLOYEES, MAKES ANY WARRANTY,
34 * EXPRESS OR IMPLIED, OR ASSUMES ANY LEGAL LIABILITY OR RESPONSIBILITY FOR THE
35 * ACCURACY, COMPLETENESS, OR USEFULNESS OF ANY INFORMATION, DATA, APPARATUS,
36 * PRODUCT, OR PROCESS DISCLOSED, OR REPRESENTS THAT ITS USE WOULD NOT INFRINGE
37 * PRIVATELY OWNED RIGHTS.
38 *
39 * *****************************************************************************
40 */
41
42#define _XOPEN_SOURCE 600
43#include "CRC64.h"
44#include "GenericIO.h"
45
46extern "C" {
47#include "blosc.h"
48}
49
50#include <sstream>
51#include <fstream>
52#include <stdexcept>
53#include <algorithm>
54#include <cassert>
55#include <cstddef>
56#include <cstring>
57
58#ifndef GENERICIO_NO_MPI
59#include <ctime>
60#endif
61
62#include <sys/types.h>
63#include <sys/stat.h>
64#include <fcntl.h>
65#include <errno.h>
66
67#ifdef __bgq__
68#include <spi/include/kernel/location.h>
69#include <spi/include/kernel/process.h>
70#include <firmware/include/personality.h>
71#endif
72
73#ifndef MPI_UINT64_T
74#define MPI_UINT64_T (sizeof(long) == 8 ? MPI_LONG : MPI_LONG_LONG)
75#endif
76
77using namespace std;
78
79namespace gio {
80
81
82#ifndef GENERICIO_NO_MPI
83GenericFileIO_MPI::~GenericFileIO_MPI() {
84  (void) MPI_File_close(&FH);
85}
86
87void GenericFileIO_MPI::open(const std::string &FN, bool ForReading) {
88  FileName = FN;
89
90  int amode = ForReading ? MPI_MODE_RDONLY : (MPI_MODE_WRONLY | MPI_MODE_CREATE);
91  if (MPI_File_open(Comm, const_cast<char *>(FileName.c_str()), amode,
92                    MPI_INFO_NULL, &FH) != MPI_SUCCESS)
93    throw runtime_error((!ForReading ? "Unable to create the file: " :
94                                       "Unable to open the file: ") +
95                        FileName);
96}
97
98void GenericFileIO_MPI::setSize(size_t sz) {
99  if (MPI_File_set_size(FH, sz) != MPI_SUCCESS)
100    throw runtime_error("Unable to set size for file: " + FileName);
101}
102
103void GenericFileIO_MPI::read(void *buf, size_t count, off_t offset,
104                             const std::string &D) {
105  while (count > 0) {
106    MPI_Status status;
107    if (MPI_File_read_at(FH, offset, buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
108      throw runtime_error("Unable to read " + D + " from file: " + FileName);
109
110    int scount;
111    (void) MPI_Get_count(&status, MPI_BYTE, &scount);
112
113    count -= scount;
114    buf = ((char *) buf) + scount;
115    offset += scount;
116  }
117}
118
119void GenericFileIO_MPI::write(const void *buf, size_t count, off_t offset,
120                              const std::string &D) {
121  while (count > 0) {
122    MPI_Status status;
123    if (MPI_File_write_at(FH, offset, (void *) buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
124      throw runtime_error("Unable to write " + D + " to file: " + FileName);
125
126    int scount;
127    (void) MPI_Get_count(&status, MPI_BYTE, &scount);
128
129    count -= scount;
130    buf = ((char *) buf) + scount;
131    offset += scount;
132  }
133}
134
135void GenericFileIO_MPICollective::read(void *buf, size_t count, off_t offset,
136                             const std::string &D) {
137  int Continue = 0;
138
139  do {
140    MPI_Status status;
141    if (MPI_File_read_at_all(FH, offset, buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
142      throw runtime_error("Unable to read " + D + " from file: " + FileName);
143
144    int scount;
145    (void) MPI_Get_count(&status, MPI_BYTE, &scount);
146
147    count -= scount;
148    buf = ((char *) buf) + scount;
149    offset += scount;
150
151    int NeedContinue = (count > 0);
152    MPI_Allreduce(&NeedContinue, &Continue, 1, MPI_INT, MPI_SUM, Comm);
153  } while (Continue);
154}
155
156void GenericFileIO_MPICollective::write(const void *buf, size_t count, off_t offset,
157                              const std::string &D) {
158  int Continue = 0;
159
160  do {
161    MPI_Status status;
162    if (MPI_File_write_at_all(FH, offset, (void *) buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
163      throw runtime_error("Unable to write " + D + " to file: " + FileName);
164
165    int scount;
166    (void) MPI_Get_count(&status, MPI_BYTE, &scount);
167
168    count -= scount;
169    buf = ((char *) buf) + scount;
170    offset += scount;
171
172    int NeedContinue = (count > 0);
173    MPI_Allreduce(&NeedContinue, &Continue, 1, MPI_INT, MPI_SUM, Comm);
174  } while (Continue);
175}
176#endif
177
178GenericFileIO_POSIX::~GenericFileIO_POSIX() {
179  if (FH != -1) close(FH);
180}
181
182void GenericFileIO_POSIX::open(const std::string &FN, bool ForReading) {
183  FileName = FN;
184
185  int flags = ForReading ? O_RDONLY : (O_WRONLY | O_CREAT);
186  int mode = S_IRUSR | S_IWUSR | S_IRGRP;
187  errno = 0;
188  if ((FH = ::open(FileName.c_str(), flags, mode)) == -1)
189    throw runtime_error((!ForReading ? "Unable to create the file: " :
190                                       "Unable to open the file: ") +
191                        FileName + ": " + strerror(errno));
192}
193
194void GenericFileIO_POSIX::setSize(size_t sz) {
195  if (ftruncate(FH, sz) == -1)
196    throw runtime_error("Unable to set size for file: " + FileName);
197}
198
199void GenericFileIO_POSIX::read(void *buf, size_t count, off_t offset,
200                               const std::string &D) {
201  while (count > 0) {
202    ssize_t scount;
203    errno = 0;
204    if ((scount = pread(FH, buf, count, offset)) == -1) {
205      if (errno == EINTR)
206        continue;
207
208      throw runtime_error("Unable to read " + D + " from file: " + FileName);
209    }
210
211    count -= scount;
212    buf = ((char *) buf) + scount;
213    offset += scount;
214  }
215}
216
217void GenericFileIO_POSIX::write(const void *buf, size_t count, off_t offset,
218                                const std::string &D) {
219  while (count > 0) {
220    ssize_t scount;
221    errno = 0;
222    if ((scount = pwrite(FH, buf, count, offset)) == -1) {
223      if (errno == EINTR)
224        continue;
225
226      throw runtime_error("Unable to write " + D + " to file: " + FileName);
227    }
228
229    count -= scount;
230    buf = ((char *) buf) + scount;
231    offset += scount;
232  }
233}
234
235static bool isBigEndian() {
236  const uint32_t one = 1;
237  return !(*((char *)(&one)));
238}
239
240static void bswap(void *v, size_t s) {
241  char *p = (char *) v;
242  for (size_t i = 0; i < s/2; ++i)
243    std::swap(p[i], p[s - (i+1)]);
244}
245
246// Using #pragma pack here, instead of __attribute__((packed)) because xlc, at
247// least as of v12.1, won't take __attribute__((packed)) on non-POD and/or
248// templated types.
249#pragma pack(1)
250
251template <typename T, bool IsBigEndian>
252struct endian_specific_value {
253  operator T() const {
254    T rvalue = value;
255    if (IsBigEndian != isBigEndian())
256      bswap(&rvalue, sizeof(T));
257
258    return rvalue;
259  };
260
261  endian_specific_value &operator = (T nvalue) {
262    if (IsBigEndian != isBigEndian())
263      bswap(&nvalue, sizeof(T));
264
265    value = nvalue;
266    return *this;
267  }
268
269  endian_specific_value &operator += (T nvalue) {
270    *this = *this + nvalue;
271    return *this;
272  }
273
274  endian_specific_value &operator -= (T nvalue) {
275    *this = *this - nvalue;
276    return *this;
277  }
278
279private:
280  T value;
281};
282
283static const size_t CRCSize = 8;
284
285static const size_t MagicSize = 8;
286static const char *MagicBE = "HACC01B";
287static const char *MagicLE = "HACC01L";
288
289template <bool IsBigEndian>
290struct GlobalHeader {
291  char Magic[MagicSize];
292  endian_specific_value<uint64_t, IsBigEndian> HeaderSize;
293  endian_specific_value<uint64_t, IsBigEndian> NElems; // The global total
294  endian_specific_value<uint64_t, IsBigEndian> Dims[3];
295  endian_specific_value<uint64_t, IsBigEndian> NVars;
296  endian_specific_value<uint64_t, IsBigEndian> VarsSize;
297  endian_specific_value<uint64_t, IsBigEndian> VarsStart;
298  endian_specific_value<uint64_t, IsBigEndian> NRanks;
299  endian_specific_value<uint64_t, IsBigEndian> RanksSize;
300  endian_specific_value<uint64_t, IsBigEndian> RanksStart;
301  endian_specific_value<uint64_t, IsBigEndian> GlobalHeaderSize;
302  endian_specific_value<double,   IsBigEndian> PhysOrigin[3];
303  endian_specific_value<double,   IsBigEndian> PhysScale[3];
304  endian_specific_value<uint64_t, IsBigEndian> BlocksSize;
305  endian_specific_value<uint64_t, IsBigEndian> BlocksStart;
306};
307
308enum {
309  FloatValue          = (1 << 0),
310  SignedValue         = (1 << 1),
311  ValueIsPhysCoordX   = (1 << 2),
312  ValueIsPhysCoordY   = (1 << 3),
313  ValueIsPhysCoordZ   = (1 << 4),
314  ValueMaybePhysGhost = (1 << 5)
315};
316
317static const size_t NameSize = 256;
318template <bool IsBigEndian>
319struct VariableHeader {
320  char Name[NameSize];
321  endian_specific_value<uint64_t, IsBigEndian> Flags;
322  endian_specific_value<uint64_t, IsBigEndian> Size;
323};
324
325template <bool IsBigEndian>
326struct RankHeader {
327  endian_specific_value<uint64_t, IsBigEndian> Coords[3];
328  endian_specific_value<uint64_t, IsBigEndian> NElems;
329  endian_specific_value<uint64_t, IsBigEndian> Start;
330  endian_specific_value<uint64_t, IsBigEndian> GlobalRank;
331};
332
333static const size_t FilterNameSize = 8;
334static const size_t MaxFilters = 4;
335template <bool IsBigEndian>
336struct BlockHeader {
337  char Filters[MaxFilters][FilterNameSize];
338  endian_specific_value<uint64_t, IsBigEndian> Start;
339  endian_specific_value<uint64_t, IsBigEndian> Size;
340};
341
342template <bool IsBigEndian>
343struct CompressHeader {
344  endian_specific_value<uint64_t, IsBigEndian> OrigCRC;
345};
346const char *CompressName = "BLOSC";
347
348#pragma pack()
349
350unsigned GenericIO::DefaultFileIOType = FileIOPOSIX;
351int GenericIO::DefaultPartition = 0;
352bool GenericIO::DefaultShouldCompress = false;
353
354#ifndef GENERICIO_NO_MPI
355std::size_t GenericIO::CollectiveMPIIOThreshold = 0;
356#endif
357
358static bool blosc_initialized = false;
359
360#ifndef GENERICIO_NO_MPI
361void GenericIO::write() {
362  if (isBigEndian())
363    write<true>();
364  else
365    write<false>();
366}
367
368// Note: writing errors are not currently recoverable (one rank may fail
369// while the others don't).
370template <bool IsBigEndian>
371void GenericIO::write() {
372  const char *Magic = IsBigEndian ? MagicBE : MagicLE;
373
374  uint64_t FileSize = 0;
375
376  int NRanks, Rank;
377  MPI_Comm_rank(Comm, &Rank);
378  MPI_Comm_size(Comm, &NRanks);
379
380#ifdef __bgq__
381  MPI_Barrier(Comm);
382#endif
383  MPI_Comm_split(Comm, Partition, Rank, &SplitComm);
384
385  int SplitNRanks, SplitRank;
386  MPI_Comm_rank(SplitComm, &SplitRank);
387  MPI_Comm_size(SplitComm, &SplitNRanks);
388
389  string LocalFileName;
390  if (SplitNRanks != NRanks) {
391    if (Rank == 0) {
392      // In split mode, the specified file becomes the rank map, and the real
393      // data is partitioned.
394
395      vector<int> MapRank, MapPartition;
396      MapRank.resize(NRanks);
397      for (int i = 0; i < NRanks; ++i) MapRank[i] = i;
398
399      MapPartition.resize(NRanks);
400      MPI_Gather(&Partition, 1, MPI_INT, &MapPartition[0], 1, MPI_INT, 0, Comm);
401
402      GenericIO GIO(MPI_COMM_SELF, FileName, FileIOType);
403      GIO.setNumElems(NRanks);
404      GIO.addVariable("$rank", MapRank); /* this is for use by humans; the reading
405                                            code assumes that the partitions are in
406                                            rank order */
407      GIO.addVariable("$partition", MapPartition);
408
409      vector<int> CX, CY, CZ;
410      int TopoStatus;
411      MPI_Topo_test(Comm, &TopoStatus);
412      if (TopoStatus == MPI_CART) {
413        CX.resize(NRanks);
414        CY.resize(NRanks);
415        CZ.resize(NRanks);
416
417        for (int i = 0; i < NRanks; ++i) {
418          int C[3];
419          MPI_Cart_coords(Comm, i, 3, C);
420
421          CX[i] = C[0];
422          CY[i] = C[1];
423          CZ[i] = C[2];
424        }
425
426        GIO.addVariable("$x", CX);
427        GIO.addVariable("$y", CY);
428        GIO.addVariable("$z", CZ);
429      }
430
431      GIO.write();
432    } else {
433      MPI_Gather(&Partition, 1, MPI_INT, 0, 0, MPI_INT, 0, Comm);
434    }
435
436    stringstream ss;
437    ss << FileName << "#" << Partition;
438    LocalFileName = ss.str();
439  } else {
440    LocalFileName = FileName;
441  }
442
443  RankHeader<IsBigEndian> RHLocal;
444  int Dims[3], Periods[3], Coords[3];
445
446  int TopoStatus;
447  MPI_Topo_test(Comm, &TopoStatus);
448  if (TopoStatus == MPI_CART) {
449    MPI_Cart_get(Comm, 3, Dims, Periods, Coords);
450  } else {
451    Dims[0] = NRanks;
452    std::fill(Dims + 1, Dims + 3, 1);
453    std::fill(Periods, Periods + 3, 0);
454    Coords[0] = Rank;
455    std::fill(Coords + 1, Coords + 3, 0);
456  }
457
458  std::copy(Coords, Coords + 3, RHLocal.Coords);
459  RHLocal.NElems = NElems;
460  RHLocal.Start = 0;
461  RHLocal.GlobalRank = Rank;
462
463  bool ShouldCompress = DefaultShouldCompress;
464  const char *EnvStr = getenv("GENERICIO_COMPRESS");
465  if (EnvStr) {
466    int Mod = atoi(EnvStr);
467    ShouldCompress = (Mod > 0);
468  }
469
470  bool NeedsBlockHeaders = ShouldCompress;
471  EnvStr = getenv("GENERICIO_FORCE_BLOCKS");
472  if (!NeedsBlockHeaders && EnvStr) {
473    int Mod = atoi(EnvStr);
474    NeedsBlockHeaders = (Mod > 0);
475  }
476
477  vector<BlockHeader<IsBigEndian> > LocalBlockHeaders;
478  vector<void *> LocalData;
479  vector<bool> LocalHasExtraSpace;
480  vector<vector<unsigned char> > LocalCData;
481  if (NeedsBlockHeaders) {
482    LocalBlockHeaders.resize(Vars.size());
483    LocalData.resize(Vars.size());
484    LocalHasExtraSpace.resize(Vars.size());
485    if (ShouldCompress)
486      LocalCData.resize(Vars.size());
487
488    for (size_t i = 0; i < Vars.size(); ++i) {
489      // Filters null by default, leave null starting address (needs to be
490      // calculated by the header-writing rank).
491      memset(&LocalBlockHeaders[i], 0, sizeof(BlockHeader<IsBigEndian>));
492      if (ShouldCompress) {
493        LocalCData[i].resize(sizeof(CompressHeader<IsBigEndian>));
494
495        CompressHeader<IsBigEndian> *CH = (CompressHeader<IsBigEndian>*) &LocalCData[i][0];
496        CH->OrigCRC = crc64_omp(Vars[i].Data, Vars[i].Size*NElems);
497
498#ifdef _OPENMP
499#pragma omp master
500  {
501#endif
502
503       if (!blosc_initialized) {
504         blosc_init();
505         blosc_initialized = true;
506       }
507
508#ifdef _OPENMP
509       blosc_set_nthreads(omp_get_max_threads());
510  }
511#endif
512
513        LocalCData[i].resize(LocalCData[i].size() + NElems*Vars[i].Size);
514        if (blosc_compress(9, 1, Vars[i].Size, NElems*Vars[i].Size, Vars[i].Data,
515                           &LocalCData[i][0] + sizeof(CompressHeader<IsBigEndian>),
516                           NElems*Vars[i].Size) <= 0)
517          goto nocomp;
518
519        strncpy(LocalBlockHeaders[i].Filters[0], CompressName, FilterNameSize);
520        size_t CNBytes, CCBytes, CBlockSize;
521        blosc_cbuffer_sizes(&LocalCData[i][0] + sizeof(CompressHeader<IsBigEndian>),
522                            &CNBytes, &CCBytes, &CBlockSize);
523        LocalCData[i].resize(CCBytes + sizeof(CompressHeader<IsBigEndian>));
524
525        LocalBlockHeaders[i].Size = LocalCData[i].size();
526        LocalCData[i].resize(LocalCData[i].size() + CRCSize);
527        LocalData[i] = &LocalCData[i][0];
528        LocalHasExtraSpace[i] = true;
529      } else {
530nocomp:
531        LocalBlockHeaders[i].Size = NElems*Vars[i].Size;
532        LocalData[i] = Vars[i].Data;
533        LocalHasExtraSpace[i] = Vars[i].HasExtraSpace;
534      }
535    }
536  }
537
538  double StartTime = MPI_Wtime();
539
540  if (SplitRank == 0) {
541    uint64_t HeaderSize = sizeof(GlobalHeader<IsBigEndian>) + Vars.size()*sizeof(VariableHeader<IsBigEndian>) +
542                          SplitNRanks*sizeof(RankHeader<IsBigEndian>) + CRCSize;
543    if (NeedsBlockHeaders)
544      HeaderSize += SplitNRanks*Vars.size()*sizeof(BlockHeader<IsBigEndian>);
545
546    vector<char> Header(HeaderSize, 0);
547    GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &Header[0];
548    std::copy(Magic, Magic + MagicSize, GH->Magic);
549    GH->HeaderSize = HeaderSize - CRCSize;
550    GH->NElems = NElems; // This will be updated later
551    std::copy(Dims, Dims + 3, GH->Dims);
552    GH->NVars = Vars.size();
553    GH->VarsSize = sizeof(VariableHeader<IsBigEndian>);
554    GH->VarsStart = sizeof(GlobalHeader<IsBigEndian>);
555    GH->NRanks = SplitNRanks;
556    GH->RanksSize = sizeof(RankHeader<IsBigEndian>);
557    GH->RanksStart = GH->VarsStart + Vars.size()*sizeof(VariableHeader<IsBigEndian>);
558    GH->GlobalHeaderSize = sizeof(GlobalHeader<IsBigEndian>);
559    std::copy(PhysOrigin, PhysOrigin + 3, GH->PhysOrigin);
560    std::copy(PhysScale,  PhysScale  + 3, GH->PhysScale);
561    if (!NeedsBlockHeaders) {
562      GH->BlocksSize = GH->BlocksStart = 0;
563    } else {
564      GH->BlocksSize = sizeof(BlockHeader<IsBigEndian>);
565      GH->BlocksStart = GH->RanksStart + SplitNRanks*sizeof(RankHeader<IsBigEndian>);
566    }
567
568    uint64_t RecordSize = 0;
569    VariableHeader<IsBigEndian> *VH = (VariableHeader<IsBigEndian> *) &Header[GH->VarsStart];
570    for (size_t i = 0; i < Vars.size(); ++i, ++VH) {
571      string VName(Vars[i].Name);
572      VName.resize(NameSize);
573
574      std::copy(VName.begin(), VName.end(), VH->Name);
575      uint64_t VFlags = 0;
576      if (Vars[i].IsFloat)  VFlags |= FloatValue;
577      if (Vars[i].IsSigned) VFlags |= SignedValue;
578      if (Vars[i].IsPhysCoordX) VFlags |= ValueIsPhysCoordX;
579      if (Vars[i].IsPhysCoordY) VFlags |= ValueIsPhysCoordY;
580      if (Vars[i].IsPhysCoordZ) VFlags |= ValueIsPhysCoordZ;
581      if (Vars[i].MaybePhysGhost) VFlags |= ValueMaybePhysGhost;
582      VH->Flags = VFlags;
583      RecordSize += VH->Size = Vars[i].Size;
584    }
585
586    MPI_Gather(&RHLocal, sizeof(RHLocal), MPI_BYTE,
587               &Header[GH->RanksStart], sizeof(RHLocal),
588               MPI_BYTE, 0, SplitComm);
589
590    if (NeedsBlockHeaders) {
591      MPI_Gather(&LocalBlockHeaders[0],
592                 Vars.size()*sizeof(BlockHeader<IsBigEndian>), MPI_BYTE,
593                 &Header[GH->BlocksStart],
594                 Vars.size()*sizeof(BlockHeader<IsBigEndian>), MPI_BYTE,
595                 0, SplitComm);
596
597      BlockHeader<IsBigEndian> *BH = (BlockHeader<IsBigEndian> *) &Header[GH->BlocksStart];
598      for (int i = 0; i < SplitNRanks; ++i)
599      for (size_t j = 0; j < Vars.size(); ++j, ++BH) {
600        if (i == 0 && j == 0)
601          BH->Start = HeaderSize;
602        else
603          BH->Start = BH[-1].Start + BH[-1].Size + CRCSize;
604      }
605
606      RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &Header[GH->RanksStart];
607      RH->Start = HeaderSize; ++RH;
608      for (int i = 1; i < SplitNRanks; ++i, ++RH) {
609        RH->Start =
610          ((BlockHeader<IsBigEndian> *) &Header[GH->BlocksStart])[i*Vars.size()].Start;
611        GH->NElems += RH->NElems;
612      }
613
614      // Compute the total file size.
615      uint64_t LastData = BH[-1].Size + CRCSize;
616      FileSize = BH[-1].Start + LastData;
617    } else {
618      RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &Header[GH->RanksStart];
619      RH->Start = HeaderSize; ++RH;
620      for (int i = 1; i < SplitNRanks; ++i, ++RH) {
621        uint64_t PrevNElems = RH[-1].NElems;
622        uint64_t PrevData = PrevNElems*RecordSize + CRCSize*Vars.size();
623        RH->Start = RH[-1].Start + PrevData;
624        GH->NElems += RH->NElems;
625      }
626
627      // Compute the total file size.
628      uint64_t LastNElems = RH[-1].NElems;
629      uint64_t LastData = LastNElems*RecordSize + CRCSize*Vars.size();
630      FileSize = RH[-1].Start + LastData;
631    }
632
633    // Now that the starting offset has been computed, send it back to each rank.
634    MPI_Scatter(&Header[GH->RanksStart], sizeof(RHLocal),
635                MPI_BYTE, &RHLocal, sizeof(RHLocal),
636                MPI_BYTE, 0, SplitComm);
637
638    if (NeedsBlockHeaders)
639      MPI_Scatter(&Header[GH->BlocksStart],
640                  sizeof(BlockHeader<IsBigEndian>)*Vars.size(), MPI_BYTE,
641                  &LocalBlockHeaders[0],
642                  sizeof(BlockHeader<IsBigEndian>)*Vars.size(), MPI_BYTE,
643                  0, SplitComm);
644
645    uint64_t HeaderCRC = crc64_omp(&Header[0], HeaderSize - CRCSize);
646    crc64_invert(HeaderCRC, &Header[HeaderSize - CRCSize]);
647
648    if (FileIOType == FileIOMPI)
649      FH.get() = new GenericFileIO_MPI(MPI_COMM_SELF);
650    else if (FileIOType == FileIOMPICollective)
651      FH.get() = new GenericFileIO_MPICollective(MPI_COMM_SELF);
652    else
653      FH.get() = new GenericFileIO_POSIX();
654
655    FH.get()->open(LocalFileName);
656    FH.get()->setSize(FileSize);
657    FH.get()->write(&Header[0], HeaderSize, 0, "header");
658
659    close();
660  } else {
661    MPI_Gather(&RHLocal, sizeof(RHLocal), MPI_BYTE, 0, 0, MPI_BYTE, 0, SplitComm);
662    if (NeedsBlockHeaders)
663      MPI_Gather(&LocalBlockHeaders[0], Vars.size()*sizeof(BlockHeader<IsBigEndian>),
664                 MPI_BYTE, 0, 0, MPI_BYTE, 0, SplitComm);
665    MPI_Scatter(0, 0, MPI_BYTE, &RHLocal, sizeof(RHLocal), MPI_BYTE, 0, SplitComm);
666    if (NeedsBlockHeaders)
667      MPI_Scatter(0, 0, MPI_BYTE, &LocalBlockHeaders[0], sizeof(BlockHeader<IsBigEndian>)*Vars.size(),
668                  MPI_BYTE, 0, SplitComm);
669  }
670
671  MPI_Barrier(SplitComm);
672
673  if (FileIOType == FileIOMPI)
674    FH.get() = new GenericFileIO_MPI(SplitComm);
675  else if (FileIOType == FileIOMPICollective)
676    FH.get() = new GenericFileIO_MPICollective(SplitComm);
677  else
678    FH.get() = new GenericFileIO_POSIX();
679
680  FH.get()->open(LocalFileName);
681
682  uint64_t Offset = RHLocal.Start;
683  for (size_t i = 0; i < Vars.size(); ++i) {
684    uint64_t WriteSize = NeedsBlockHeaders ?
685                         LocalBlockHeaders[i].Size : NElems*Vars[i].Size;
686    void *Data = NeedsBlockHeaders ? LocalData[i] : Vars[i].Data;
687    uint64_t CRC = crc64_omp(Data, WriteSize);
688    bool HasExtraSpace = NeedsBlockHeaders ?
689                         LocalHasExtraSpace[i] : Vars[i].HasExtraSpace;
690    char *CRCLoc = HasExtraSpace ?  ((char *) Data) + WriteSize : (char *) &CRC;
691
692    if (NeedsBlockHeaders)
693      Offset = LocalBlockHeaders[i].Start;
694
695    // When using extra space for the CRC write, preserve the original contents.
696    char CRCSave[CRCSize];
697    if (HasExtraSpace)
698      std::copy(CRCLoc, CRCLoc + CRCSize, CRCSave);
699
700    crc64_invert(CRC, CRCLoc);
701
702    if (HasExtraSpace) {
703      FH.get()->write(Data, WriteSize + CRCSize, Offset, Vars[i].Name + " with CRC");
704    } else {
705      FH.get()->write(Data, WriteSize, Offset, Vars[i].Name);
706      FH.get()->write(CRCLoc, CRCSize, Offset + WriteSize, Vars[i].Name + " CRC");
707    }
708
709    if (HasExtraSpace)
710       std::copy(CRCSave, CRCSave + CRCSize, CRCLoc);
711
712    Offset += WriteSize + CRCSize;
713  }
714
715  close();
716  MPI_Barrier(Comm);
717
718  double EndTime = MPI_Wtime();
719  double TotalTime = EndTime - StartTime;
720  double MaxTotalTime;
721  MPI_Reduce(&TotalTime, &MaxTotalTime, 1, MPI_DOUBLE, MPI_MAX, 0, Comm);
722
723  if (SplitNRanks != NRanks) {
724    uint64_t ContribFileSize = (SplitRank == 0) ? FileSize : 0;
725    MPI_Reduce(&ContribFileSize, &FileSize, 1, MPI_UINT64_T, MPI_SUM, 0, Comm);
726  }
727
728  if (Rank == 0) {
729    double Rate = ((double) FileSize) / MaxTotalTime / (1024.*1024.);
730    cout << "Wrote " << Vars.size() << " variables to " << FileName <<
731            " (" << FileSize << " bytes) in " << MaxTotalTime << "s: " <<
732            Rate << " MB/s" << endl;
733  }
734
735  MPI_Comm_free(&SplitComm);
736  SplitComm = MPI_COMM_NULL;
737}
738#endif // GENERICIO_NO_MPI
739
740template <bool IsBigEndian>
741void GenericIO::readHeaderLeader(void *GHPtr, MismatchBehavior MB, int NRanks,
742                                 int Rank, int SplitNRanks,
743                                 string &LocalFileName, uint64_t &HeaderSize,
744                                 vector<char> &Header) {
745  GlobalHeader<IsBigEndian> &GH = *(GlobalHeader<IsBigEndian> *) GHPtr;
746
747  if (MB == MismatchDisallowed) {
748    if (SplitNRanks != (int) GH.NRanks) {
749      stringstream ss;
750      ss << "Won't read " << LocalFileName << ": communicator-size mismatch: " <<
751            "current: " << SplitNRanks << ", file: " << GH.NRanks;
752      throw runtime_error(ss.str());
753    }
754
755#ifndef GENERICIO_NO_MPI
756    int TopoStatus;
757    MPI_Topo_test(Comm, &TopoStatus);
758    if (TopoStatus == MPI_CART) {
759      int Dims[3], Periods[3], Coords[3];
760      MPI_Cart_get(Comm, 3, Dims, Periods, Coords);
761
762      bool DimsMatch = true;
763      for (int i = 0; i < 3; ++i) {
764        if ((uint64_t) Dims[i] != GH.Dims[i]) {
765          DimsMatch = false;
766          break;
767        }
768      }
769
770      if (!DimsMatch) {
771        stringstream ss;
772        ss << "Won't read " << LocalFileName <<
773              ": communicator-decomposition mismatch: " <<
774              "current: " << Dims[0] << "x" << Dims[1] << "x" << Dims[2] <<
775              ", file: " << GH.Dims[0] << "x" << GH.Dims[1] << "x" <<
776              GH.Dims[2];
777        throw runtime_error(ss.str());
778      }
779    }
780#endif
781  } else if (MB == MismatchRedistribute && !Redistributing) {
782    Redistributing = true;
783
784    int NFileRanks = RankMap.empty() ? (int) GH.NRanks : (int) RankMap.size();
785    int NFileRanksPerRank = NFileRanks/NRanks;
786    int NRemFileRank = NFileRanks % NRanks;
787
788    if (!NFileRanksPerRank) {
789      // We have only the remainder, so the last NRemFileRank ranks get one
790      // file rank, and the others don't.
791      if (NRemFileRank && NRanks - Rank <= NRemFileRank)
792        SourceRanks.push_back(NRanks - (Rank + 1));
793    } else {
794      // Since NRemFileRank < NRanks, and we don't want to put any extra memory
795      // load on rank 0 (because rank 0's memory load is normally higher than
796      // the other ranks anyway), the last NRemFileRank will each take
797      // (NFileRanksPerRank+1) file ranks.
798
799      int FirstFileRank = 0, LastFileRank = NFileRanksPerRank - 1;
800      for (int i = 1; i <= Rank; ++i) {
801        FirstFileRank = LastFileRank + 1;
802        LastFileRank  = FirstFileRank + NFileRanksPerRank - 1;
803
804        if (NRemFileRank && NRanks - i <= NRemFileRank)
805          ++LastFileRank;
806      }
807
808      for (int i = FirstFileRank; i <= LastFileRank; ++i)
809        SourceRanks.push_back(i);
810    }
811  }
812
813  HeaderSize = GH.HeaderSize;
814  Header.resize(HeaderSize + CRCSize, 0xFE /* poison */);
815  FH.get()->read(&Header[0], HeaderSize + CRCSize, 0, "header");
816
817  uint64_t CRC = crc64_omp(&Header[0], HeaderSize + CRCSize);
818  if (CRC != (uint64_t) -1) {
819    throw runtime_error("Header CRC check failed: " + LocalFileName);
820  }
821}
822
823// Note: Errors from this function should be recoverable. This means that if
824// one rank throws an exception, then all ranks should.
825void GenericIO::openAndReadHeader(MismatchBehavior MB, int EffRank, bool CheckPartMap) {
826  int NRanks, Rank;
827#ifndef GENERICIO_NO_MPI
828  MPI_Comm_rank(Comm, &Rank);
829  MPI_Comm_size(Comm, &NRanks);
830#else
831  Rank = 0;
832  NRanks = 1;
833#endif
834
835  if (EffRank == -1)
836    EffRank = MB == MismatchRedistribute ? 0 : Rank;
837
838  if (RankMap.empty() && CheckPartMap) {
839    // First, check to see if the file is a rank map.
840    unsigned long RanksInMap = 0;
841    if (Rank == 0) {
842      try {
843#ifndef GENERICIO_NO_MPI
844        GenericIO GIO(MPI_COMM_SELF, FileName, FileIOType);
845#else
846        GenericIO GIO(FileName, FileIOType);
847#endif
848        GIO.openAndReadHeader(MismatchDisallowed, 0, false);
849        RanksInMap = GIO.readNumElems();
850
851        RankMap.resize(RanksInMap + GIO.requestedExtraSpace()/sizeof(int));
852        GIO.addVariable("$partition", RankMap, true);
853
854        GIO.readData(0, false);
855        RankMap.resize(RanksInMap);
856      } catch (...) {
857        RankMap.clear();
858        RanksInMap = 0;
859      }
860    }
861
862#ifndef GENERICIO_NO_MPI
863    MPI_Bcast(&RanksInMap, 1, MPI_UNSIGNED_LONG, 0, Comm);
864    if (RanksInMap > 0) {
865      RankMap.resize(RanksInMap);
866      MPI_Bcast(&RankMap[0], RanksInMap, MPI_INT, 0, Comm);
867    }
868#endif
869  }
870
871#ifndef GENERICIO_NO_MPI
872  if (SplitComm != MPI_COMM_NULL)
873    MPI_Comm_free(&SplitComm);
874#endif
875
876  string LocalFileName;
877  if (RankMap.empty()) {
878    LocalFileName = FileName;
879#ifndef GENERICIO_NO_MPI
880    MPI_Comm_dup(MB == MismatchRedistribute ? MPI_COMM_SELF : Comm, &SplitComm);
881#endif
882  } else {
883    stringstream ss;
884    ss << FileName << "#" << RankMap[EffRank];
885    LocalFileName = ss.str();
886#ifndef GENERICIO_NO_MPI
887    if (MB == MismatchRedistribute) {
888      MPI_Comm_dup(MPI_COMM_SELF, &SplitComm);
889    } else {
890#ifdef __bgq__
891      MPI_Barrier(Comm);
892#endif
893      MPI_Comm_split(Comm, RankMap[EffRank], Rank, &SplitComm);
894    }
895#endif
896  }
897
898  if (LocalFileName == OpenFileName)
899    return;
900  FH.close();
901
902  int SplitNRanks, SplitRank;
903#ifndef GENERICIO_NO_MPI
904  MPI_Comm_rank(SplitComm, &SplitRank);
905  MPI_Comm_size(SplitComm, &SplitNRanks);
906#else
907  SplitRank = 0;
908  SplitNRanks = 1;
909#endif
910
911  uint64_t HeaderSize;
912  vector<char> Header;
913
914  if (SplitRank == 0) {
915#ifndef GENERICIO_NO_MPI
916    if (FileIOType == FileIOMPI)
917      FH.get() = new GenericFileIO_MPI(MPI_COMM_SELF);
918    else if (FileIOType == FileIOMPICollective)
919      FH.get() = new GenericFileIO_MPICollective(MPI_COMM_SELF);
920    else
921#endif
922      FH.get() = new GenericFileIO_POSIX();
923
924#ifndef GENERICIO_NO_MPI
925    char True = 1, False = 0;
926#endif
927
928    try {
929      FH.get()->open(LocalFileName, true);
930
931      GlobalHeader<false> GH; // endianness does not matter yet...
932      FH.get()->read(&GH, sizeof(GlobalHeader<false>), 0, "global header");
933
934      if (string(GH.Magic, GH.Magic + MagicSize - 1) == MagicLE) {
935        readHeaderLeader<false>(&GH, MB, NRanks, Rank, SplitNRanks, LocalFileName,
936                                HeaderSize, Header);
937      } else if (string(GH.Magic, GH.Magic + MagicSize - 1) == MagicBE) {
938        readHeaderLeader<true>(&GH, MB, NRanks, Rank, SplitNRanks, LocalFileName,
939                               HeaderSize, Header);
940      } else {
941        string Error = "invalid file-type identifier";
942        throw runtime_error("Won't read " + LocalFileName + ": " + Error);
943      }
944
945#ifndef GENERICIO_NO_MPI
946      close();
947      MPI_Bcast(&True, 1, MPI_BYTE, 0, SplitComm);
948#endif
949    } catch (...) {
950#ifndef GENERICIO_NO_MPI
951      MPI_Bcast(&False, 1, MPI_BYTE, 0, SplitComm);
952#endif
953      close();
954      throw;
955    }
956  } else {
957#ifndef GENERICIO_NO_MPI
958    char Okay;
959    MPI_Bcast(&Okay, 1, MPI_BYTE, 0, SplitComm);
960    if (!Okay)
961      throw runtime_error("Failure broadcast from rank 0");
962#endif
963  }
964
965#ifndef GENERICIO_NO_MPI
966  MPI_Bcast(&HeaderSize, 1, MPI_UINT64_T, 0, SplitComm);
967#endif
968
969  Header.resize(HeaderSize, 0xFD /* poison */);
970#ifndef GENERICIO_NO_MPI
971  MPI_Bcast(&Header[0], HeaderSize, MPI_BYTE, 0, SplitComm);
972#endif
973
974  FH.getHeaderCache().clear();
975
976  GlobalHeader<false> *GH = (GlobalHeader<false> *) &Header[0];
977  FH.setIsBigEndian(string(GH->Magic, GH->Magic + MagicSize - 1) == MagicBE);
978
979  FH.getHeaderCache().swap(Header);
980  OpenFileName = LocalFileName;
981
982#ifndef GENERICIO_NO_MPI
983  if (!DisableCollErrChecking)
984    MPI_Barrier(Comm);
985
986  if (FileIOType == FileIOMPI)
987    FH.get() = new GenericFileIO_MPI(SplitComm);
988  else if (FileIOType == FileIOMPICollective)
989    FH.get() = new GenericFileIO_MPICollective(SplitComm);
990  else
991    FH.get() = new GenericFileIO_POSIX();
992
993  int OpenErr = 0, TotOpenErr;
994  try {
995    FH.get()->open(LocalFileName, true);
996    MPI_Allreduce(&OpenErr, &TotOpenErr, 1, MPI_INT, MPI_SUM,
997                  DisableCollErrChecking ? MPI_COMM_SELF : Comm);
998  } catch (...) {
999    OpenErr = 1;
1000    MPI_Allreduce(&OpenErr, &TotOpenErr, 1, MPI_INT, MPI_SUM,
1001                  DisableCollErrChecking ? MPI_COMM_SELF : Comm);
1002    throw;
1003  }
1004
1005  if (TotOpenErr > 0) {
1006    stringstream ss;
1007    ss << TotOpenErr << " ranks failed to open file: " << LocalFileName;
1008    throw runtime_error(ss.str());
1009  }
1010#endif
1011}
1012
1013int GenericIO::readNRanks() {
1014  if (FH.isBigEndian())
1015    return readNRanks<true>();
1016  return readNRanks<false>();
1017}
1018
1019template <bool IsBigEndian>
1020int GenericIO::readNRanks() {
1021  if (RankMap.size())
1022    return RankMap.size();
1023
1024  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1025  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1026  return (int) GH->NRanks;
1027}
1028
1029void GenericIO::readDims(int Dims[3]) {
1030  if (FH.isBigEndian())
1031    readDims<true>(Dims);
1032  else
1033    readDims<false>(Dims);
1034}
1035
1036template <bool IsBigEndian>
1037void GenericIO::readDims(int Dims[3]) {
1038  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1039  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1040  std::copy(GH->Dims, GH->Dims + 3, Dims);
1041}
1042
1043uint64_t GenericIO::readTotalNumElems() {
1044  if (FH.isBigEndian())
1045    return readTotalNumElems<true>();
1046  return readTotalNumElems<false>();
1047}
1048
1049template <bool IsBigEndian>
1050uint64_t GenericIO::readTotalNumElems() {
1051  if (RankMap.size())
1052    return (uint64_t) -1;
1053
1054  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1055  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1056  return GH->NElems;
1057}
1058
1059void GenericIO::readPhysOrigin(double Origin[3]) {
1060  if (FH.isBigEndian())
1061    readPhysOrigin<true>(Origin);
1062  else
1063    readPhysOrigin<false>(Origin);
1064}
1065
1066// Define a "safe" version of offsetof (offsetof itself might not work for
1067// non-POD types, and at least xlC v12.1 will complain about this if you try).
1068#define offsetof_safe(S, F) (size_t(&(S)->F) - size_t(S))
1069
1070template <bool IsBigEndian>
1071void GenericIO::readPhysOrigin(double Origin[3]) {
1072  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1073  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1074  if (offsetof_safe(GH, PhysOrigin) >= GH->GlobalHeaderSize) {
1075    std::fill(Origin, Origin + 3, 0.0);
1076    return;
1077  }
1078
1079  std::copy(GH->PhysOrigin, GH->PhysOrigin + 3, Origin);
1080}
1081
1082void GenericIO::readPhysScale(double Scale[3]) {
1083  if (FH.isBigEndian())
1084    readPhysScale<true>(Scale);
1085  else
1086    readPhysScale<false>(Scale);
1087}
1088
1089template <bool IsBigEndian>
1090void GenericIO::readPhysScale(double Scale[3]) {
1091  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1092  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1093  if (offsetof_safe(GH, PhysScale) >= GH->GlobalHeaderSize) {
1094    std::fill(Scale, Scale + 3, 0.0);
1095    return;
1096  }
1097
1098  std::copy(GH->PhysScale, GH->PhysScale + 3, Scale);
1099}
1100
1101template <bool IsBigEndian>
1102static size_t getRankIndex(int EffRank, GlobalHeader<IsBigEndian> *GH,
1103                           vector<int> &RankMap, vector<char> &HeaderCache) {
1104  if (RankMap.empty())
1105    return EffRank;
1106
1107  for (size_t i = 0; i < GH->NRanks; ++i) {
1108    RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &HeaderCache[GH->RanksStart +
1109                                                 i*GH->RanksSize];
1110    if (offsetof_safe(RH, GlobalRank) >= GH->RanksSize)
1111      return EffRank;
1112
1113    if ((int) RH->GlobalRank == EffRank)
1114      return i;
1115  }
1116
1117  assert(false && "Index requested of an invalid rank");
1118  return (size_t) -1;
1119}
1120
1121int GenericIO::readGlobalRankNumber(int EffRank) {
1122  if (FH.isBigEndian())
1123    return readGlobalRankNumber<true>(EffRank);
1124  return readGlobalRankNumber<false>(EffRank);
1125}
1126
1127template <bool IsBigEndian>
1128int GenericIO::readGlobalRankNumber(int EffRank) {
1129  if (EffRank == -1) {
1130#ifndef GENERICIO_NO_MPI
1131    MPI_Comm_rank(Comm, &EffRank);
1132#else
1133    EffRank = 0;
1134#endif
1135  }
1136
1137  openAndReadHeader(MismatchAllowed, EffRank, false);
1138
1139  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1140
1141  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1142  size_t RankIndex = getRankIndex<IsBigEndian>(EffRank, GH, RankMap, FH.getHeaderCache());
1143
1144  assert(RankIndex < GH->NRanks && "Invalid rank specified");
1145
1146  RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->RanksStart +
1147                                               RankIndex*GH->RanksSize];
1148
1149  if (offsetof_safe(RH, GlobalRank) >= GH->RanksSize)
1150    return EffRank;
1151
1152  return (int) RH->GlobalRank;
1153}
1154
1155size_t GenericIO::readNumElems(int EffRank) {
1156  if (EffRank == -1 && Redistributing) {
1157    DisableCollErrChecking = true;
1158
1159    size_t TotalSize = 0;
1160    for (int i = 0, ie = SourceRanks.size(); i != ie; ++i)
1161      TotalSize += readNumElems(SourceRanks[i]);
1162
1163    DisableCollErrChecking = false;
1164    return TotalSize;
1165  }
1166
1167  if (FH.isBigEndian())
1168    return readNumElems<true>(EffRank);
1169  return readNumElems<false>(EffRank);
1170}
1171
1172template <bool IsBigEndian>
1173size_t GenericIO::readNumElems(int EffRank) {
1174  if (EffRank == -1) {
1175#ifndef GENERICIO_NO_MPI
1176    MPI_Comm_rank(Comm, &EffRank);
1177#else
1178    EffRank = 0;
1179#endif
1180  }
1181
1182  openAndReadHeader(Redistributing ? MismatchRedistribute : MismatchAllowed,
1183                    EffRank, false);
1184
1185  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1186
1187  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1188  size_t RankIndex = getRankIndex<IsBigEndian>(EffRank, GH, RankMap, FH.getHeaderCache());
1189
1190  assert(RankIndex < GH->NRanks && "Invalid rank specified");
1191
1192  RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->RanksStart +
1193                                               RankIndex*GH->RanksSize];
1194  return (size_t) RH->NElems;
1195}
1196
1197void GenericIO::readCoords(int Coords[3], int EffRank) {
1198  if (EffRank == -1 && Redistributing) {
1199    std::fill(Coords, Coords + 3, 0);
1200    return;
1201  }
1202
1203  if (FH.isBigEndian())
1204    readCoords<true>(Coords, EffRank);
1205  else
1206    readCoords<false>(Coords, EffRank);
1207}
1208
1209template <bool IsBigEndian>
1210void GenericIO::readCoords(int Coords[3], int EffRank) {
1211  if (EffRank == -1) {
1212#ifndef GENERICIO_NO_MPI
1213    MPI_Comm_rank(Comm, &EffRank);
1214#else
1215    EffRank = 0;
1216#endif
1217  }
1218
1219  openAndReadHeader(MismatchAllowed, EffRank, false);
1220
1221  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1222
1223  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1224  size_t RankIndex = getRankIndex<IsBigEndian>(EffRank, GH, RankMap, FH.getHeaderCache());
1225
1226  assert(RankIndex < GH->NRanks && "Invalid rank specified");
1227
1228  RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->RanksStart +
1229                                               RankIndex*GH->RanksSize];
1230
1231  std::copy(RH->Coords, RH->Coords + 3, Coords);
1232}
1233
1234void GenericIO::readData(int EffRank, bool PrintStats, bool CollStats) {
1235  int Rank;
1236#ifndef GENERICIO_NO_MPI
1237  MPI_Comm_rank(Comm, &Rank);
1238#else
1239  Rank = 0;
1240#endif
1241
1242  uint64_t TotalReadSize = 0;
1243#ifndef GENERICIO_NO_MPI
1244  double StartTime = MPI_Wtime();
1245#else
1246  double StartTime = double(clock())/CLOCKS_PER_SEC;
1247#endif
1248
1249  int NErrs[3] = { 0, 0, 0 };
1250
1251  if (EffRank == -1 && Redistributing) {
1252    DisableCollErrChecking = true;
1253
1254    size_t RowOffset = 0;
1255    for (int i = 0, ie = SourceRanks.size(); i != ie; ++i) {
1256      readData(SourceRanks[i], RowOffset, Rank, TotalReadSize, NErrs);
1257      RowOffset += readNumElems(SourceRanks[i]);
1258    }
1259
1260    DisableCollErrChecking = false;
1261  } else {
1262    readData(EffRank, 0, Rank, TotalReadSize, NErrs);
1263  }
1264
1265  int AllNErrs[3];
1266#ifndef GENERICIO_NO_MPI
1267  MPI_Allreduce(NErrs, AllNErrs, 3, MPI_INT, MPI_SUM, Comm);
1268#else
1269  AllNErrs[0] = NErrs[0]; AllNErrs[1] = NErrs[1]; AllNErrs[2] = NErrs[2];
1270#endif
1271
1272  if (AllNErrs[0] > 0 || AllNErrs[1] > 0 || AllNErrs[2] > 0) {
1273    stringstream ss;
1274    ss << "Experienced " << AllNErrs[0] << " I/O error(s), " <<
1275          AllNErrs[1] << " CRC error(s) and " << AllNErrs[2] <<
1276          " decompression CRC error(s) reading: " << OpenFileName;
1277    throw runtime_error(ss.str());
1278  }
1279
1280#ifndef GENERICIO_NO_MPI
1281  MPI_Barrier(Comm);
1282#endif
1283
1284#ifndef GENERICIO_NO_MPI
1285  double EndTime = MPI_Wtime();
1286#else
1287  double EndTime = double(clock())/CLOCKS_PER_SEC;
1288#endif
1289
1290  double TotalTime = EndTime - StartTime;
1291  double MaxTotalTime;
1292#ifndef GENERICIO_NO_MPI
1293  if (CollStats)
1294    MPI_Reduce(&TotalTime, &MaxTotalTime, 1, MPI_DOUBLE, MPI_MAX, 0, Comm);
1295  else
1296#endif
1297  MaxTotalTime = TotalTime;
1298
1299  uint64_t AllTotalReadSize;
1300#ifndef GENERICIO_NO_MPI
1301  if (CollStats)
1302    MPI_Reduce(&TotalReadSize, &AllTotalReadSize, 1, MPI_UINT64_T, MPI_SUM, 0, Comm);
1303  else
1304#endif
1305  AllTotalReadSize = TotalReadSize;
1306
1307  if (Rank == 0 && PrintStats) {
1308    double Rate = ((double) AllTotalReadSize) / MaxTotalTime / (1024.*1024.);
1309    cout << "Read " << Vars.size() << " variables from " << FileName <<
1310            " (" << AllTotalReadSize << " bytes) in " << MaxTotalTime << "s: " <<
1311            Rate << " MB/s [excluding header read]" << endl;
1312  }
1313}
1314
1315void GenericIO::readData(int EffRank, size_t RowOffset, int Rank,
1316                         uint64_t &TotalReadSize, int NErrs[3]) {
1317  if (FH.isBigEndian())
1318    readData<true>(EffRank, RowOffset, Rank, TotalReadSize, NErrs);
1319  else
1320    readData<false>(EffRank, RowOffset, Rank, TotalReadSize, NErrs);
1321}
1322
1323// Note: Errors from this function should be recoverable. This means that if
1324// one rank throws an exception, then all ranks should.
1325template <bool IsBigEndian>
1326void GenericIO::readData(int EffRank, size_t RowOffset, int Rank,
1327                         uint64_t &TotalReadSize, int NErrs[3]) {
1328  openAndReadHeader(Redistributing ? MismatchRedistribute : MismatchAllowed,
1329                    EffRank, false);
1330
1331  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1332
1333  if (EffRank == -1)
1334    EffRank = Rank;
1335
1336  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1337  size_t RankIndex = getRankIndex<IsBigEndian>(EffRank, GH, RankMap, FH.getHeaderCache());
1338
1339  assert(RankIndex < GH->NRanks && "Invalid rank specified");
1340
1341  RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->RanksStart +
1342                                               RankIndex*GH->RanksSize];
1343
1344  for (size_t i = 0; i < Vars.size(); ++i) {
1345    uint64_t Offset = RH->Start;
1346    bool VarFound = false;
1347    for (uint64_t j = 0; j < GH->NVars; ++j) {
1348      VariableHeader<IsBigEndian> *VH = (VariableHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->VarsStart +
1349                                                           j*GH->VarsSize];
1350
1351      string VName(VH->Name, VH->Name + NameSize);
1352      size_t VNameNull = VName.find('\0');
1353      if (VNameNull < NameSize)
1354        VName.resize(VNameNull);
1355
1356      uint64_t ReadSize = RH->NElems*VH->Size + CRCSize;
1357      if (VName != Vars[i].Name) {
1358        Offset += ReadSize;
1359        continue;
1360      }
1361
1362      VarFound = true;
1363      bool IsFloat = (bool) (VH->Flags & FloatValue),
1364           IsSigned = (bool) (VH->Flags & SignedValue);
1365      if (VH->Size != Vars[i].Size) {
1366        stringstream ss;
1367        ss << "Size mismatch for variable " << Vars[i].Name <<
1368              " in: " << OpenFileName << ": current: " << Vars[i].Size <<
1369              ", file: " << VH->Size;
1370        throw runtime_error(ss.str());
1371      } else if (IsFloat != Vars[i].IsFloat) {
1372        string Float("float"), Int("integer");
1373        stringstream ss;
1374        ss << "Type mismatch for variable " << Vars[i].Name <<
1375              " in: " << OpenFileName << ": current: " <<
1376              (Vars[i].IsFloat ? Float : Int) <<
1377              ", file: " << (IsFloat ? Float : Int);
1378        throw runtime_error(ss.str());
1379      } else if (IsSigned != Vars[i].IsSigned) {
1380        string Signed("signed"), Uns("unsigned");
1381        stringstream ss;
1382        ss << "Type mismatch for variable " << Vars[i].Name <<
1383              " in: " << OpenFileName << ": current: " <<
1384              (Vars[i].IsSigned ? Signed : Uns) <<
1385              ", file: " << (IsSigned ? Signed : Uns);
1386        throw runtime_error(ss.str());
1387      }
1388
1389      size_t VarOffset = RowOffset*Vars[i].Size;
1390      void *VarData = ((char *) Vars[i].Data) + VarOffset;
1391
1392      vector<unsigned char> LData;
1393      void *Data = VarData;
1394      bool HasExtraSpace = Vars[i].HasExtraSpace;
1395      if (offsetof_safe(GH, BlocksStart) < GH->GlobalHeaderSize &&
1396          GH->BlocksSize > 0) {
1397        BlockHeader<IsBigEndian> *BH = (BlockHeader<IsBigEndian> *)
1398          &FH.getHeaderCache()[GH->BlocksStart +
1399                               (RankIndex*GH->NVars + j)*GH->BlocksSize];
1400        ReadSize = BH->Size + CRCSize;
1401        Offset = BH->Start;
1402
1403        if (strncmp(BH->Filters[0], CompressName, FilterNameSize) == 0) {
1404          LData.resize(ReadSize);
1405          Data = &LData[0];
1406          HasExtraSpace = true;
1407        } else if (BH->Filters[0][0] != '\0') {
1408          stringstream ss;
1409          ss << "Unknown filter \"" << BH->Filters[0] << "\" on variable " << Vars[i].Name;
1410          throw runtime_error(ss.str());
1411        }
1412      }
1413
1414      assert(HasExtraSpace && "Extra space required for reading");
1415
1416      char CRCSave[CRCSize];
1417      char *CRCLoc = ((char *) Data) + ReadSize - CRCSize;
1418      if (HasExtraSpace)
1419        std::copy(CRCLoc, CRCLoc + CRCSize, CRCSave);
1420
1421      int Retry = 0;
1422      {
1423        int RetryCount = 300;
1424        const char *EnvStr = getenv("GENERICIO_RETRY_COUNT");
1425        if (EnvStr)
1426          RetryCount = atoi(EnvStr);
1427
1428        int RetrySleep = 100; // ms
1429        EnvStr = getenv("GENERICIO_RETRY_SLEEP");
1430        if (EnvStr)
1431          RetrySleep = atoi(EnvStr);
1432
1433        for (; Retry < RetryCount; ++Retry) {
1434          try {
1435            FH.get()->read(Data, ReadSize, Offset, Vars[i].Name);
1436            break;
1437          } catch (...) { }
1438
1439          usleep(1000*RetrySleep);
1440        }
1441
1442        if (Retry == RetryCount) {
1443          ++NErrs[0];
1444          break;
1445        } else if (Retry > 0) {
1446          EnvStr = getenv("GENERICIO_VERBOSE");
1447          if (EnvStr) {
1448            int Mod = atoi(EnvStr);
1449            if (Mod > 0) {
1450              int Rank;
1451#ifndef GENERICIO_NO_MPI
1452              MPI_Comm_rank(MPI_COMM_WORLD, &Rank);
1453#else
1454              Rank = 0;
1455#endif
1456
1457              std::cerr << "Rank " << Rank << ": " << Retry <<
1458                           " I/O retries were necessary for reading " <<
1459                           Vars[i].Name << " from: " << OpenFileName << "\n";
1460
1461              std::cerr.flush();
1462            }
1463          }
1464        }
1465      }
1466
1467      TotalReadSize += ReadSize;
1468
1469      uint64_t CRC = crc64_omp(Data, ReadSize);
1470      if (CRC != (uint64_t) -1) {
1471        ++NErrs[1];
1472
1473        int Rank;
1474#ifndef GENERICIO_NO_MPI
1475        MPI_Comm_rank(MPI_COMM_WORLD, &Rank);
1476#else
1477        Rank = 0;
1478#endif
1479
1480        // All ranks will do this and have a good time!
1481        string dn = "gio_crc_errors";
1482        mkdir(dn.c_str(), 0777);
1483
1484        srand(time(0));
1485        int DumpNum = rand();
1486        stringstream ssd;
1487        ssd << dn << "/gio_crc_error_dump." << Rank << "." << DumpNum << ".bin";
1488
1489        stringstream ss;
1490        ss << dn << "/gio_crc_error_log." << Rank << ".txt";
1491
1492        ofstream ofs(ss.str().c_str(), ofstream::out | ofstream::app);
1493        ofs << "On-Disk CRC Error Report:\n";
1494        ofs << "Variable: " << Vars[i].Name << "\n";
1495        ofs << "File: " << OpenFileName << "\n";
1496        ofs << "I/O Retries: " << Retry << "\n";
1497        ofs << "Size: " << ReadSize << " bytes\n";
1498        ofs << "Offset: " << Offset << " bytes\n";
1499        ofs << "CRC: " << CRC << " (expected is -1)\n";
1500        ofs << "Dump file: " << ssd.str() << "\n";
1501        ofs << "\n";
1502        ofs.close();
1503
1504        ofstream dofs(ssd.str().c_str(), ofstream::out);
1505        dofs.write((const char *) Data, ReadSize);
1506        dofs.close();
1507        break;
1508      }
1509
1510      if (HasExtraSpace)
1511        std::copy(CRCSave, CRCSave + CRCSize, CRCLoc);
1512
1513      if (LData.size()) {
1514        CompressHeader<IsBigEndian> *CH = (CompressHeader<IsBigEndian>*) &LData[0];
1515
1516#ifdef _OPENMP
1517#pragma omp master
1518  {
1519#endif
1520
1521       if (!blosc_initialized) {
1522         blosc_init();
1523         blosc_initialized = true;
1524       }
1525
1526#ifdef _OPENMP
1527       blosc_set_nthreads(omp_get_max_threads());
1528  }
1529#endif
1530
1531        blosc_decompress(&LData[0] + sizeof(CompressHeader<IsBigEndian>),
1532                         VarData, Vars[i].Size*RH->NElems);
1533
1534        if (CH->OrigCRC != crc64_omp(VarData, Vars[i].Size*RH->NElems)) {
1535          ++NErrs[2];
1536          break;
1537        }
1538      }
1539
1540      // Byte swap the data if necessary.
1541      if (IsBigEndian != isBigEndian())
1542        for (size_t j = 0; j < RH->NElems; ++j) {
1543          char *Offset = ((char *) VarData) + j*Vars[i].Size;
1544          bswap(Offset, Vars[i].Size);
1545        }
1546
1547      break;
1548    }
1549
1550    if (!VarFound)
1551      throw runtime_error("Variable " + Vars[i].Name +
1552                          " not found in: " + OpenFileName);
1553
1554    // This is for debugging.
1555    if (NErrs[0] || NErrs[1] || NErrs[2]) {
1556      const char *EnvStr = getenv("GENERICIO_VERBOSE");
1557      if (EnvStr) {
1558        int Mod = atoi(EnvStr);
1559        if (Mod > 0) {
1560          int Rank;
1561#ifndef GENERICIO_NO_MPI
1562          MPI_Comm_rank(MPI_COMM_WORLD, &Rank);
1563#else
1564          Rank = 0;
1565#endif
1566
1567          std::cerr << "Rank " << Rank << ": " << NErrs[0] << " I/O error(s), " <<
1568          NErrs[1] << " CRC error(s) and " << NErrs[2] <<
1569          " decompression CRC error(s) reading: " << Vars[i].Name <<
1570          " from: " << OpenFileName << "\n";
1571
1572          std::cerr.flush();
1573        }
1574      }
1575    }
1576
1577    if (NErrs[0] || NErrs[1] || NErrs[2])
1578      break;
1579  }
1580}
1581
1582void GenericIO::getVariableInfo(vector<VariableInfo> &VI) {
1583  if (FH.isBigEndian())
1584    getVariableInfo<true>(VI);
1585  else
1586    getVariableInfo<false>(VI);
1587}
1588
1589template <bool IsBigEndian>
1590void GenericIO::getVariableInfo(vector<VariableInfo> &VI) {
1591  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1592
1593  GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &FH.getHeaderCache()[0];
1594  for (uint64_t j = 0; j < GH->NVars; ++j) {
1595    VariableHeader<IsBigEndian> *VH = (VariableHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->VarsStart +
1596                                                         j*GH->VarsSize];
1597
1598    string VName(VH->Name, VH->Name + NameSize);
1599    size_t VNameNull = VName.find('\0');
1600    if (VNameNull < NameSize)
1601      VName.resize(VNameNull);
1602
1603    bool IsFloat = (bool) (VH->Flags & FloatValue),
1604         IsSigned = (bool) (VH->Flags & SignedValue),
1605         IsPhysCoordX = (bool) (VH->Flags & ValueIsPhysCoordX),
1606         IsPhysCoordY = (bool) (VH->Flags & ValueIsPhysCoordY),
1607         IsPhysCoordZ = (bool) (VH->Flags & ValueIsPhysCoordZ),
1608         MaybePhysGhost = (bool) (VH->Flags & ValueMaybePhysGhost);
1609    VI.push_back(VariableInfo(VName, (size_t) VH->Size, IsFloat, IsSigned,
1610                              IsPhysCoordX, IsPhysCoordY, IsPhysCoordZ,
1611                              MaybePhysGhost));
1612  }
1613}
1614
1615void GenericIO::setNaturalDefaultPartition() {
1616#ifdef __bgq__
1617  Personality_t pers;
1618  Kernel_GetPersonality(&pers, sizeof(pers));
1619
1620  // Nodes in an ION Partition
1621  int SPLIT_A = 2;
1622  int SPLIT_B = 2;
1623  int SPLIT_C = 4;
1624  int SPLIT_D = 4;
1625  int SPLIT_E = 2;
1626
1627  int Anodes, Bnodes, Cnodes, Dnodes, Enodes;
1628  int Acoord, Bcoord, Ccoord, Dcoord, Ecoord;
1629  int A_color, B_color, C_color, D_color, E_color;
1630  int A_blocks, B_blocks, C_blocks, D_blocks, E_blocks;
1631  uint32_t id_on_node;
1632  int ranks_per_node, color;
1633
1634  Anodes = pers.Network_Config.Anodes;
1635  Acoord = pers.Network_Config.Acoord;
1636
1637  Bnodes = pers.Network_Config.Bnodes;
1638  Bcoord = pers.Network_Config.Bcoord;
1639
1640  Cnodes = pers.Network_Config.Cnodes;
1641  Ccoord = pers.Network_Config.Ccoord;
1642
1643  Dnodes = pers.Network_Config.Dnodes;
1644  Dcoord = pers.Network_Config.Dcoord;
1645
1646  Enodes = pers.Network_Config.Enodes;
1647  Ecoord = pers.Network_Config.Ecoord;
1648
1649  A_color  = Acoord /  SPLIT_A;
1650  B_color  = Bcoord /  SPLIT_B;
1651  C_color  = Ccoord /  SPLIT_C;
1652  D_color  = Dcoord /  SPLIT_D;
1653  E_color  = Ecoord /  SPLIT_E;
1654
1655  // Number of blocks
1656  A_blocks = Anodes / SPLIT_A;
1657  B_blocks = Bnodes / SPLIT_B;
1658  C_blocks = Cnodes / SPLIT_C;
1659  D_blocks = Dnodes / SPLIT_D;
1660  E_blocks = Enodes / SPLIT_E;
1661
1662  color = (A_color * (B_blocks * C_blocks * D_blocks * E_blocks))
1663    + (B_color * (C_blocks * D_blocks * E_blocks))
1664    + (C_color * ( D_blocks * E_blocks))
1665    + (D_color * ( E_blocks))
1666    + E_color;
1667
1668  DefaultPartition = color;
1669#else
1670#ifndef GENERICIO_NO_MPI
1671  bool UseName = true;
1672  const char *EnvStr = getenv("GENERICIO_PARTITIONS_USE_NAME");
1673  if (EnvStr) {
1674    int Mod = atoi(EnvStr);
1675    UseName = (Mod != 0);
1676  }
1677
1678  if (UseName) {
1679    // This is a heuristic to generate ~256 partitions based on the
1680    // names of the nodes.
1681    char Name[MPI_MAX_PROCESSOR_NAME];
1682    int Len = 0;
1683
1684    MPI_Get_processor_name(Name, &Len);
1685    unsigned char color = 0;
1686    for (int i = 0; i < Len; ++i)
1687      color += (unsigned char) Name[i];
1688
1689    DefaultPartition = color;
1690  }
1691
1692  // This is for debugging.
1693  EnvStr = getenv("GENERICIO_RANK_PARTITIONS");
1694  if (EnvStr) {
1695    int Mod = atoi(EnvStr);
1696    if (Mod > 0) {
1697      int Rank;
1698      MPI_Comm_rank(MPI_COMM_WORLD, &Rank);
1699      DefaultPartition += Rank % Mod;
1700    }
1701  }
1702#endif
1703#endif
1704}
1705
1706} /* END namespace cosmotk */
Note: See TracBrowser for help on using the repository browser.