source: GenericIO.cxx @ da65757

Revision da65757, 43.4 KB checked in by Hal Finkel <hfinkel@…>, 9 years ago (diff)

add license to all files

  • Property mode set to 100644
Line 
1/*
2 *                    Copyright (C) 2015, UChicago Argonne, LLC
3 *                               All Rights Reserved
4 *
5 *                               Generic IO (ANL-15-066)
6 *                     Hal Finkel, Argonne National Laboratory
7 *
8 *                              OPEN SOURCE LICENSE
9 *
10 * Under the terms of Contract No. DE-AC02-06CH11357 with UChicago Argonne,
11 * LLC, the U.S. Government retains certain rights in this software.
12 *
13 * Redistribution and use in source and binary forms, with or without
14 * modification, are permitted provided that the following conditions are met:
15 *
16 *   1. Redistributions of source code must retain the above copyright notice,
17 *      this list of conditions and the following disclaimer.
18 *
19 *   2. Redistributions in binary form must reproduce the above copyright
20 *      notice, this list of conditions and the following disclaimer in the
21 *      documentation and/or other materials provided with the distribution.
22 *
23 *   3. Neither the names of UChicago Argonne, LLC or the Department of Energy
24 *      nor the names of its contributors may be used to endorse or promote
25 *      products derived from this software without specific prior written
26 *      permission.
27 *
28 * *****************************************************************************
29 *
30 *                                  DISCLAIMER
31 * THE SOFTWARE IS SUPPLIED “AS IS” WITHOUT WARRANTY OF ANY KIND.  NEITHER THE
32 * UNTED STATES GOVERNMENT, NOR THE UNITED STATES DEPARTMENT OF ENERGY, NOR
33 * UCHICAGO ARGONNE, LLC, NOR ANY OF THEIR EMPLOYEES, MAKES ANY WARRANTY,
34 * EXPRESS OR IMPLIED, OR ASSUMES ANY LEGAL LIABILITY OR RESPONSIBILITY FOR THE
35 * ACCURACY, COMPLETENESS, OR USEFULNESS OF ANY INFORMATION, DATA, APPARATUS,
36 * PRODUCT, OR PROCESS DISCLOSED, OR REPRESENTS THAT ITS USE WOULD NOT INFRINGE
37 * PRIVATELY OWNED RIGHTS.
38 *
39 * *****************************************************************************
40 */
41
42#define _XOPEN_SOURCE 600
43#include "CRC64.h"
44#include "GenericIO.h"
45
46extern "C" {
47#include "blosc.h"
48}
49
50#include <sstream>
51#include <fstream>
52#include <stdexcept>
53#include <algorithm>
54#include <cassert>
55#include <cstddef>
56#include <cstring>
57
58#ifndef GENERICIO_NO_MPI
59#include <ctime>
60#endif
61
62#include <sys/types.h>
63#include <sys/stat.h>
64#include <fcntl.h>
65#include <errno.h>
66
67#ifdef __bgq__
68#include <spi/include/kernel/location.h>
69#include <spi/include/kernel/process.h>
70#include <firmware/include/personality.h>
71#endif
72
73#ifndef MPI_UINT64_T
74#define MPI_UINT64_T (sizeof(long) == 8 ? MPI_LONG : MPI_LONG_LONG)
75#endif
76
77using namespace std;
78
79namespace gio {
80
81
82#ifndef GENERICIO_NO_MPI
83GenericFileIO_MPI::~GenericFileIO_MPI() {
84  (void) MPI_File_close(&FH);
85}
86
87void GenericFileIO_MPI::open(const std::string &FN, bool ForReading) {
88  FileName = FN;
89
90  int amode = ForReading ? MPI_MODE_RDONLY : (MPI_MODE_WRONLY | MPI_MODE_CREATE);
91  if (MPI_File_open(Comm, const_cast<char *>(FileName.c_str()), amode,
92                    MPI_INFO_NULL, &FH) != MPI_SUCCESS)
93    throw runtime_error((!ForReading ? "Unable to create the file: " :
94                                       "Unable to open the file: ") +
95                        FileName);
96}
97
98void GenericFileIO_MPI::setSize(size_t sz) {
99  if (MPI_File_set_size(FH, sz) != MPI_SUCCESS)
100    throw runtime_error("Unable to set size for file: " + FileName);
101}
102
103void GenericFileIO_MPI::read(void *buf, size_t count, off_t offset,
104                             const std::string &D) {
105  while (count > 0) {
106    MPI_Status status;
107    if (MPI_File_read_at(FH, offset, buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
108      throw runtime_error("Unable to read " + D + " from file: " + FileName);
109
110    int scount;
111    (void) MPI_Get_count(&status, MPI_BYTE, &scount);
112
113    count -= scount;
114    buf = ((char *) buf) + scount;
115    offset += scount;
116  }
117}
118
119void GenericFileIO_MPI::write(const void *buf, size_t count, off_t offset,
120                              const std::string &D) {
121  while (count > 0) {
122    MPI_Status status;
123    if (MPI_File_write_at(FH, offset, (void *) buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
124      throw runtime_error("Unable to write " + D + " to file: " + FileName);
125
126    int scount;
127    (void) MPI_Get_count(&status, MPI_BYTE, &scount);
128
129    count -= scount;
130    buf = ((char *) buf) + scount;
131    offset += scount;
132  }
133}
134
135void GenericFileIO_MPICollective::read(void *buf, size_t count, off_t offset,
136                             const std::string &D) {
137  int Continue = 0;
138
139  do {
140    MPI_Status status;
141    if (MPI_File_read_at_all(FH, offset, buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
142      throw runtime_error("Unable to read " + D + " from file: " + FileName);
143
144    int scount;
145    (void) MPI_Get_count(&status, MPI_BYTE, &scount);
146
147    count -= scount;
148    buf = ((char *) buf) + scount;
149    offset += scount;
150
151    int NeedContinue = (count > 0);
152    MPI_Allreduce(&NeedContinue, &Continue, 1, MPI_INT, MPI_SUM, Comm);
153  } while (Continue);
154}
155
156void GenericFileIO_MPICollective::write(const void *buf, size_t count, off_t offset,
157                              const std::string &D) {
158  int Continue = 0;
159
160  do {
161    MPI_Status status;
162    if (MPI_File_write_at_all(FH, offset, (void *) buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
163      throw runtime_error("Unable to write " + D + " to file: " + FileName);
164
165    int scount;
166    (void) MPI_Get_count(&status, MPI_BYTE, &scount);
167
168    count -= scount;
169    buf = ((char *) buf) + scount;
170    offset += scount;
171
172    int NeedContinue = (count > 0);
173    MPI_Allreduce(&NeedContinue, &Continue, 1, MPI_INT, MPI_SUM, Comm);
174  } while (Continue);
175}
176#endif
177
178GenericFileIO_POSIX::~GenericFileIO_POSIX() {
179  if (FH != -1) close(FH);
180}
181
182void GenericFileIO_POSIX::open(const std::string &FN, bool ForReading) {
183  FileName = FN;
184
185  int flags = ForReading ? O_RDONLY : (O_WRONLY | O_CREAT);
186  int mode = S_IRUSR | S_IWUSR | S_IRGRP;
187  errno = 0;
188  if ((FH = ::open(FileName.c_str(), flags, mode)) == -1)
189    throw runtime_error((!ForReading ? "Unable to create the file: " :
190                                       "Unable to open the file: ") +
191                        FileName + ": " + strerror(errno));
192}
193
194void GenericFileIO_POSIX::setSize(size_t sz) {
195  if (ftruncate(FH, sz) == -1)
196    throw runtime_error("Unable to set size for file: " + FileName);
197}
198
199void GenericFileIO_POSIX::read(void *buf, size_t count, off_t offset,
200                               const std::string &D) {
201  while (count > 0) {
202    ssize_t scount;
203    errno = 0;
204    if ((scount = pread(FH, buf, count, offset)) == -1) {
205      if (errno == EINTR)
206        continue;
207
208      throw runtime_error("Unable to read " + D + " from file: " + FileName);
209    }
210
211    count -= scount;
212    buf = ((char *) buf) + scount;
213    offset += scount;
214  }
215}
216
217void GenericFileIO_POSIX::write(const void *buf, size_t count, off_t offset,
218                                const std::string &D) {
219  while (count > 0) {
220    ssize_t scount;
221    errno = 0;
222    if ((scount = pwrite(FH, buf, count, offset)) == -1) {
223      if (errno == EINTR)
224        continue;
225
226      throw runtime_error("Unable to write " + D + " to file: " + FileName);
227    }
228
229    count -= scount;
230    buf = ((char *) buf) + scount;
231    offset += scount;
232  }
233}
234
235static bool isBigEndian() {
236  const uint32_t one = 1;
237  return !(*((char *)(&one)));
238}
239
240static const size_t CRCSize = 8;
241
242static const size_t MagicSize = 8;
243static const char *MagicBE = "HACC01B";
244static const char *MagicLE = "HACC01L";
245
246struct GlobalHeader {
247  char Magic[MagicSize];
248  uint64_t HeaderSize;
249  uint64_t NElems; // The global total
250  uint64_t Dims[3];
251  uint64_t NVars;
252  uint64_t VarsSize;
253  uint64_t VarsStart;
254  uint64_t NRanks;
255  uint64_t RanksSize;
256  uint64_t RanksStart;
257  uint64_t GlobalHeaderSize;
258  double   PhysOrigin[3];
259  double   PhysScale[3];
260  uint64_t BlocksSize;
261  uint64_t BlocksStart;
262} __attribute__((packed));
263
264enum {
265  FloatValue          = (1 << 0),
266  SignedValue         = (1 << 1),
267  ValueIsPhysCoordX   = (1 << 2),
268  ValueIsPhysCoordY   = (1 << 3),
269  ValueIsPhysCoordZ   = (1 << 4),
270  ValueMaybePhysGhost = (1 << 5)
271};
272
273static const size_t NameSize = 256;
274struct VariableHeader {
275  char Name[NameSize];
276  uint64_t Flags;
277  uint64_t Size;
278} __attribute__((packed));
279
280struct RankHeader {
281  uint64_t Coords[3];
282  uint64_t NElems;
283  uint64_t Start;
284  uint64_t GlobalRank;
285} __attribute__((packed));
286
287static const size_t FilterNameSize = 8;
288static const size_t MaxFilters = 4;
289struct BlockHeader {
290  char Filters[MaxFilters][FilterNameSize];
291  uint64_t Start;
292  uint64_t Size;
293} __attribute__((packed));
294
295struct CompressHeader {
296  uint64_t OrigCRC;
297} __attribute__((packed));
298const char *CompressName = "BLOSC";
299
300unsigned GenericIO::DefaultFileIOType = FileIOPOSIX;
301int GenericIO::DefaultPartition = 0;
302bool GenericIO::DefaultShouldCompress = false;
303
304#ifndef GENERICIO_NO_MPI
305std::size_t GenericIO::CollectiveMPIIOThreshold = 0;
306#endif
307
308static bool blosc_initialized = false;
309
310#ifndef GENERICIO_NO_MPI
311// Note: writing errors are not currently recoverable (one rank may fail
312// while the others don't).
313void GenericIO::write() {
314  const char *Magic = isBigEndian() ? MagicBE : MagicLE;
315
316  uint64_t FileSize = 0;
317
318  int NRanks, Rank;
319  MPI_Comm_rank(Comm, &Rank);
320  MPI_Comm_size(Comm, &NRanks);
321
322#ifdef __bgq__
323  MPI_Barrier(Comm);
324#endif
325  MPI_Comm_split(Comm, Partition, Rank, &SplitComm);
326
327  int SplitNRanks, SplitRank;
328  MPI_Comm_rank(SplitComm, &SplitRank);
329  MPI_Comm_size(SplitComm, &SplitNRanks);
330
331  string LocalFileName;
332  if (SplitNRanks != NRanks) {
333    if (Rank == 0) {
334      // In split mode, the specified file becomes the rank map, and the real
335      // data is partitioned.
336
337      vector<int> MapRank, MapPartition;
338      MapRank.resize(NRanks);
339      for (int i = 0; i < NRanks; ++i) MapRank[i] = i;
340
341      MapPartition.resize(NRanks);
342      MPI_Gather(&Partition, 1, MPI_INT, &MapPartition[0], 1, MPI_INT, 0, Comm);
343
344      GenericIO GIO(MPI_COMM_SELF, FileName, FileIOType);
345      GIO.setNumElems(NRanks);
346      GIO.addVariable("$rank", MapRank); /* this is for use by humans; the reading
347                                            code assumes that the partitions are in
348                                            rank order */
349      GIO.addVariable("$partition", MapPartition);
350
351      vector<int> CX, CY, CZ;
352      int TopoStatus;
353      MPI_Topo_test(Comm, &TopoStatus);
354      if (TopoStatus == MPI_CART) {
355        CX.resize(NRanks);
356        CY.resize(NRanks);
357        CZ.resize(NRanks);
358
359        for (int i = 0; i < NRanks; ++i) {
360          int C[3];
361          MPI_Cart_coords(Comm, i, 3, C);
362
363          CX[i] = C[0];
364          CY[i] = C[1];
365          CZ[i] = C[2];
366        }
367
368        GIO.addVariable("$x", CX);
369        GIO.addVariable("$y", CY);
370        GIO.addVariable("$z", CZ);
371      }
372
373      GIO.write();
374    } else {
375      MPI_Gather(&Partition, 1, MPI_INT, 0, 0, MPI_INT, 0, Comm);
376    }
377
378    stringstream ss;
379    ss << FileName << "#" << Partition;
380    LocalFileName = ss.str();
381  } else {
382    LocalFileName = FileName;
383  }
384
385  RankHeader RHLocal;
386  int Dims[3], Periods[3], Coords[3];
387
388  int TopoStatus;
389  MPI_Topo_test(Comm, &TopoStatus);
390  if (TopoStatus == MPI_CART) {
391    MPI_Cart_get(Comm, 3, Dims, Periods, Coords);
392  } else {
393    Dims[0] = NRanks;
394    std::fill(Dims + 1, Dims + 3, 1);
395    std::fill(Periods, Periods + 3, 0);
396    Coords[0] = Rank;
397    std::fill(Coords + 1, Coords + 3, 0);
398  }
399
400  std::copy(Coords, Coords + 3, RHLocal.Coords);
401  RHLocal.NElems = NElems;
402  RHLocal.Start = 0;
403  RHLocal.GlobalRank = Rank;
404
405  bool ShouldCompress = DefaultShouldCompress;
406  const char *EnvStr = getenv("GENERICIO_COMPRESS");
407  if (EnvStr) {
408    int Mod = atoi(EnvStr);
409    ShouldCompress = (Mod > 0);
410  }
411
412  bool NeedsBlockHeaders = ShouldCompress;
413  EnvStr = getenv("GENERICIO_FORCE_BLOCKS");
414  if (!NeedsBlockHeaders && EnvStr) {
415    int Mod = atoi(EnvStr);
416    NeedsBlockHeaders = (Mod > 0);
417  }
418
419  vector<BlockHeader> LocalBlockHeaders;
420  vector<void *> LocalData;
421  vector<bool> LocalHasExtraSpace;
422  vector<vector<unsigned char> > LocalCData;
423  if (NeedsBlockHeaders) {
424    LocalBlockHeaders.resize(Vars.size());
425    LocalData.resize(Vars.size());
426    LocalHasExtraSpace.resize(Vars.size());
427    if (ShouldCompress)
428      LocalCData.resize(Vars.size());
429
430    for (size_t i = 0; i < Vars.size(); ++i) {
431      // Filters null by default, leave null starting address (needs to be
432      // calculated by the header-writing rank).
433      memset(&LocalBlockHeaders[i], 0, sizeof(BlockHeader));
434      if (ShouldCompress) {
435        LocalCData[i].resize(sizeof(CompressHeader));
436
437        CompressHeader *CH = (CompressHeader*) &LocalCData[i][0];
438        CH->OrigCRC = crc64_omp(Vars[i].Data, Vars[i].Size*NElems);
439
440#ifdef _OPENMP
441#pragma omp master
442  {
443#endif
444
445       if (!blosc_initialized) {
446         blosc_init();
447         blosc_initialized = true;
448       }
449
450#ifdef _OPENMP
451       blosc_set_nthreads(omp_get_max_threads());
452  }
453#endif
454
455        LocalCData[i].resize(LocalCData[i].size() + NElems*Vars[i].Size);
456        if (blosc_compress(9, 1, Vars[i].Size, NElems*Vars[i].Size, Vars[i].Data,
457                           &LocalCData[i][0] + sizeof(CompressHeader),
458                           NElems*Vars[i].Size) <= 0)
459          goto nocomp;
460
461        strncpy(LocalBlockHeaders[i].Filters[0], CompressName, FilterNameSize);
462        size_t CNBytes, CCBytes, CBlockSize;
463        blosc_cbuffer_sizes(&LocalCData[i][0] + sizeof(CompressHeader),
464                            &CNBytes, &CCBytes, &CBlockSize);
465        LocalCData[i].resize(CCBytes + sizeof(CompressHeader));
466
467        LocalBlockHeaders[i].Size = LocalCData[i].size();
468        LocalCData[i].resize(LocalCData[i].size() + CRCSize);
469        LocalData[i] = &LocalCData[i][0];
470        LocalHasExtraSpace[i] = true;
471      } else {
472nocomp:
473        LocalBlockHeaders[i].Size = NElems*Vars[i].Size;
474        LocalData[i] = Vars[i].Data;
475        LocalHasExtraSpace[i] = Vars[i].HasExtraSpace;
476      }
477    }
478  }
479
480  double StartTime = MPI_Wtime();
481
482  if (SplitRank == 0) {
483    uint64_t HeaderSize = sizeof(GlobalHeader) + Vars.size()*sizeof(VariableHeader) +
484                          SplitNRanks*sizeof(RankHeader) + CRCSize;
485    if (NeedsBlockHeaders)
486      HeaderSize += SplitNRanks*Vars.size()*sizeof(BlockHeader);
487
488    vector<char> Header(HeaderSize, 0);
489    GlobalHeader *GH = (GlobalHeader *) &Header[0];
490    std::copy(Magic, Magic + MagicSize, GH->Magic);
491    GH->HeaderSize = HeaderSize - CRCSize;
492    GH->NElems = NElems; // This will be updated later
493    std::copy(Dims, Dims + 3, GH->Dims);
494    GH->NVars = Vars.size();
495    GH->VarsSize = sizeof(VariableHeader);
496    GH->VarsStart = sizeof(GlobalHeader);
497    GH->NRanks = SplitNRanks;
498    GH->RanksSize = sizeof(RankHeader);
499    GH->RanksStart = GH->VarsStart + Vars.size()*sizeof(VariableHeader);
500    GH->GlobalHeaderSize = sizeof(GlobalHeader);
501    std::copy(PhysOrigin, PhysOrigin + 3, GH->PhysOrigin);
502    std::copy(PhysScale,  PhysScale  + 3, GH->PhysScale);
503    if (!NeedsBlockHeaders) {
504      GH->BlocksSize = GH->BlocksStart = 0;
505    } else {
506      GH->BlocksSize = sizeof(BlockHeader);
507      GH->BlocksStart = GH->RanksStart + SplitNRanks*sizeof(RankHeader);
508    }
509
510    uint64_t RecordSize = 0;
511    VariableHeader *VH = (VariableHeader *) &Header[GH->VarsStart];
512    for (size_t i = 0; i < Vars.size(); ++i, ++VH) {
513      string VName(Vars[i].Name);
514      VName.resize(NameSize);
515
516      std::copy(VName.begin(), VName.end(), VH->Name);
517      if (Vars[i].IsFloat)  VH->Flags |= FloatValue;
518      if (Vars[i].IsSigned) VH->Flags |= SignedValue;
519      if (Vars[i].IsPhysCoordX) VH->Flags |= ValueIsPhysCoordX;
520      if (Vars[i].IsPhysCoordY) VH->Flags |= ValueIsPhysCoordY;
521      if (Vars[i].IsPhysCoordZ) VH->Flags |= ValueIsPhysCoordZ;
522      if (Vars[i].MaybePhysGhost) VH->Flags |= ValueMaybePhysGhost;
523      RecordSize += VH->Size = Vars[i].Size;
524    }
525
526    MPI_Gather(&RHLocal, sizeof(RHLocal), MPI_BYTE,
527               &Header[GH->RanksStart], sizeof(RHLocal),
528               MPI_BYTE, 0, SplitComm);
529
530    if (NeedsBlockHeaders) {
531      MPI_Gather(&LocalBlockHeaders[0],
532                 Vars.size()*sizeof(BlockHeader), MPI_BYTE,
533                 &Header[GH->BlocksStart],
534                 Vars.size()*sizeof(BlockHeader), MPI_BYTE,
535                 0, SplitComm);
536
537      BlockHeader *BH = (BlockHeader *) &Header[GH->BlocksStart];
538      for (int i = 0; i < SplitNRanks; ++i)
539      for (size_t j = 0; j < Vars.size(); ++j, ++BH) {
540        if (i == 0 && j == 0)
541          BH->Start = HeaderSize;
542        else
543          BH->Start = BH[-1].Start + BH[-1].Size + CRCSize;
544      }
545
546      RankHeader *RH = (RankHeader *) &Header[GH->RanksStart];
547      RH->Start = HeaderSize; ++RH;
548      for (int i = 1; i < SplitNRanks; ++i, ++RH) {
549        RH->Start =
550          ((BlockHeader *) &Header[GH->BlocksStart])[i*Vars.size()].Start;
551        GH->NElems += RH->NElems;
552      }
553
554      // Compute the total file size.
555      uint64_t LastData = BH[-1].Size + CRCSize;
556      FileSize = BH[-1].Start + LastData;
557    } else {
558      RankHeader *RH = (RankHeader *) &Header[GH->RanksStart];
559      RH->Start = HeaderSize; ++RH;
560      for (int i = 1; i < SplitNRanks; ++i, ++RH) {
561        uint64_t PrevNElems = RH[-1].NElems;
562        uint64_t PrevData = PrevNElems*RecordSize + CRCSize*Vars.size();
563        RH->Start = RH[-1].Start + PrevData;
564        GH->NElems += RH->NElems;
565      }
566
567      // Compute the total file size.
568      uint64_t LastNElems = RH[-1].NElems;
569      uint64_t LastData = LastNElems*RecordSize + CRCSize*Vars.size();
570      FileSize = RH[-1].Start + LastData;
571    }
572
573    // Now that the starting offset has been computed, send it back to each rank.
574    MPI_Scatter(&Header[GH->RanksStart], sizeof(RHLocal),
575                MPI_BYTE, &RHLocal, sizeof(RHLocal),
576                MPI_BYTE, 0, SplitComm);
577
578    if (NeedsBlockHeaders)
579      MPI_Scatter(&Header[GH->BlocksStart],
580                  sizeof(BlockHeader)*Vars.size(), MPI_BYTE,
581                  &LocalBlockHeaders[0],
582                  sizeof(BlockHeader)*Vars.size(), MPI_BYTE,
583                  0, SplitComm);
584
585    uint64_t HeaderCRC = crc64_omp(&Header[0], HeaderSize - CRCSize);
586    crc64_invert(HeaderCRC, &Header[HeaderSize - CRCSize]);
587
588    if (FileIOType == FileIOMPI)
589      FH.get() = new GenericFileIO_MPI(MPI_COMM_SELF);
590    else if (FileIOType == FileIOMPICollective)
591      FH.get() = new GenericFileIO_MPICollective(MPI_COMM_SELF);
592    else
593      FH.get() = new GenericFileIO_POSIX();
594
595    FH.get()->open(LocalFileName);
596    FH.get()->setSize(FileSize);
597    FH.get()->write(&Header[0], HeaderSize, 0, "header");
598
599    close();
600  } else {
601    MPI_Gather(&RHLocal, sizeof(RHLocal), MPI_BYTE, 0, 0, MPI_BYTE, 0, SplitComm);
602    if (NeedsBlockHeaders)
603      MPI_Gather(&LocalBlockHeaders[0], Vars.size()*sizeof(BlockHeader),
604                 MPI_BYTE, 0, 0, MPI_BYTE, 0, SplitComm);
605    MPI_Scatter(0, 0, MPI_BYTE, &RHLocal, sizeof(RHLocal), MPI_BYTE, 0, SplitComm);
606    if (NeedsBlockHeaders)
607      MPI_Scatter(0, 0, MPI_BYTE, &LocalBlockHeaders[0], sizeof(BlockHeader)*Vars.size(),
608                  MPI_BYTE, 0, SplitComm);
609  }
610
611  MPI_Barrier(SplitComm);
612
613  if (FileIOType == FileIOMPI)
614    FH.get() = new GenericFileIO_MPI(SplitComm);
615  else if (FileIOType == FileIOMPICollective)
616    FH.get() = new GenericFileIO_MPICollective(SplitComm);
617  else
618    FH.get() = new GenericFileIO_POSIX();
619
620  FH.get()->open(LocalFileName);
621
622  uint64_t Offset = RHLocal.Start;
623  for (size_t i = 0; i < Vars.size(); ++i) {
624    uint64_t WriteSize = NeedsBlockHeaders ?
625                         LocalBlockHeaders[i].Size : NElems*Vars[i].Size;
626    void *Data = NeedsBlockHeaders ? LocalData[i] : Vars[i].Data;
627    uint64_t CRC = crc64_omp(Data, WriteSize);
628    bool HasExtraSpace = NeedsBlockHeaders ?
629                         LocalHasExtraSpace[i] : Vars[i].HasExtraSpace;
630    char *CRCLoc = HasExtraSpace ?  ((char *) Data) + WriteSize : (char *) &CRC;
631
632    if (NeedsBlockHeaders)
633      Offset = LocalBlockHeaders[i].Start;
634
635    // When using extra space for the CRC write, preserve the original contents.
636    char CRCSave[CRCSize];
637    if (HasExtraSpace)
638      std::copy(CRCLoc, CRCLoc + CRCSize, CRCSave);
639
640    crc64_invert(CRC, CRCLoc);
641
642    if (HasExtraSpace) {
643      FH.get()->write(Data, WriteSize + CRCSize, Offset, Vars[i].Name + " with CRC");
644    } else {
645      FH.get()->write(Data, WriteSize, Offset, Vars[i].Name);
646      FH.get()->write(CRCLoc, CRCSize, Offset + WriteSize, Vars[i].Name + " CRC");
647    }
648
649    if (HasExtraSpace)
650       std::copy(CRCSave, CRCSave + CRCSize, CRCLoc);
651
652    Offset += WriteSize + CRCSize;
653  }
654
655  close();
656  MPI_Barrier(Comm);
657
658  double EndTime = MPI_Wtime();
659  double TotalTime = EndTime - StartTime;
660  double MaxTotalTime;
661  MPI_Reduce(&TotalTime, &MaxTotalTime, 1, MPI_DOUBLE, MPI_MAX, 0, Comm);
662
663  if (SplitNRanks != NRanks) {
664    uint64_t ContribFileSize = (SplitRank == 0) ? FileSize : 0;
665    MPI_Reduce(&ContribFileSize, &FileSize, 1, MPI_UINT64_T, MPI_SUM, 0, Comm);
666  }
667
668  if (Rank == 0) {
669    double Rate = ((double) FileSize) / MaxTotalTime / (1024.*1024.);
670    cout << "Wrote " << Vars.size() << " variables to " << FileName <<
671            " (" << FileSize << " bytes) in " << MaxTotalTime << "s: " <<
672            Rate << " MB/s" << endl;
673  }
674
675  MPI_Comm_free(&SplitComm);
676  SplitComm = MPI_COMM_NULL;
677}
678#endif // GENERICIO_NO_MPI
679
680// Note: Errors from this function should be recoverable. This means that if
681// one rank throws an exception, then all ranks should.
682void GenericIO::openAndReadHeader(bool MustMatch, int EffRank, bool CheckPartMap) {
683  const char *Magic = isBigEndian() ? MagicBE : MagicLE;
684  const char *MagicInv = isBigEndian() ? MagicLE : MagicBE;
685
686  int NRanks, Rank;
687#ifndef GENERICIO_NO_MPI
688  MPI_Comm_rank(Comm, &Rank);
689  MPI_Comm_size(Comm, &NRanks);
690#else
691  Rank = 0;
692  NRanks = 1;
693#endif
694
695  if (EffRank == -1)
696    EffRank = Rank;
697
698  if (RankMap.empty() && CheckPartMap) {
699    // First, check to see if the file is a rank map.
700    unsigned long RanksInMap = 0;
701    if (Rank == 0) {
702      try {
703#ifndef GENERICIO_NO_MPI
704        GenericIO GIO(MPI_COMM_SELF, FileName, FileIOType);
705#else
706        GenericIO GIO(FileName, FileIOType);
707#endif
708        GIO.openAndReadHeader(true, 0, false);
709        RanksInMap = GIO.readNumElems();
710
711        RankMap.resize(RanksInMap + GIO.requestedExtraSpace()/sizeof(int));
712        GIO.addVariable("$partition", RankMap, true);
713
714        GIO.readData(0, false);
715        RankMap.resize(RanksInMap);
716      } catch (...) {
717        RankMap.clear();
718        RanksInMap = 0;
719      }
720    }
721
722#ifndef GENERICIO_NO_MPI
723    MPI_Bcast(&RanksInMap, 1, MPI_UNSIGNED_LONG, 0, Comm);
724    if (RanksInMap > 0) {
725      RankMap.resize(RanksInMap);
726      MPI_Bcast(&RankMap[0], RanksInMap, MPI_INT, 0, Comm);
727    }
728#endif
729  }
730
731#ifndef GENERICIO_NO_MPI
732  if (SplitComm != MPI_COMM_NULL)
733    MPI_Comm_free(&SplitComm);
734#endif
735
736  string LocalFileName;
737  if (RankMap.empty()) {
738    LocalFileName = FileName;
739#ifndef GENERICIO_NO_MPI
740    MPI_Comm_dup(Comm, &SplitComm);
741#endif
742  } else {
743    stringstream ss;
744    ss << FileName << "#" << RankMap[EffRank];
745    LocalFileName = ss.str();
746#ifndef GENERICIO_NO_MPI
747#ifdef __bgq__
748    MPI_Barrier(Comm);
749#endif
750    MPI_Comm_split(Comm, RankMap[EffRank], Rank, &SplitComm);
751#endif
752  }
753
754  if (LocalFileName == OpenFileName)
755    return;
756  FH.close();
757
758  int SplitNRanks, SplitRank;
759#ifndef GENERICIO_NO_MPI
760  MPI_Comm_rank(SplitComm, &SplitRank);
761  MPI_Comm_size(SplitComm, &SplitNRanks);
762#else
763  SplitRank = 0;
764  SplitNRanks = 1;
765#endif
766
767  uint64_t HeaderSize;
768  vector<char> Header;
769
770  if (SplitRank == 0) {
771#ifndef GENERICIO_NO_MPI
772    if (FileIOType == FileIOMPI)
773      FH.get() = new GenericFileIO_MPI(MPI_COMM_SELF);
774    else if (FileIOType == FileIOMPICollective)
775      FH.get() = new GenericFileIO_MPICollective(MPI_COMM_SELF);
776    else
777#endif
778      FH.get() = new GenericFileIO_POSIX();
779
780#ifndef GENERICIO_NO_MPI
781    char True = 1, False = 0;
782#endif
783
784    try {
785      FH.get()->open(LocalFileName, true);
786
787      GlobalHeader GH;
788      FH.get()->read(&GH, sizeof(GlobalHeader), 0, "global header");
789
790      if (string(GH.Magic, GH.Magic + MagicSize - 1) != Magic) {
791        string Error;
792        if (string(GH.Magic, GH.Magic + MagicSize - 1) == MagicInv) {
793          Error = "wrong endianness";
794        } else {
795          Error = "invalid file-type identifier";
796        }
797        throw runtime_error("Won't read " + LocalFileName + ": " + Error);
798      }
799
800      if (MustMatch) {
801        if (SplitNRanks != (int) GH.NRanks) {
802          stringstream ss;
803          ss << "Won't read " << LocalFileName << ": communicator-size mismatch: " <<
804                "current: " << SplitNRanks << ", file: " << GH.NRanks;
805          throw runtime_error(ss.str());
806        }
807
808#ifndef GENERICIO_NO_MPI
809        int TopoStatus;
810        MPI_Topo_test(Comm, &TopoStatus);
811        if (TopoStatus == MPI_CART) {
812          int Dims[3], Periods[3], Coords[3];
813          MPI_Cart_get(Comm, 3, Dims, Periods, Coords);
814
815          bool DimsMatch = true;
816          for (int i = 0; i < 3; ++i) {
817            if ((uint64_t) Dims[i] != GH.Dims[i]) {
818              DimsMatch = false;
819              break;
820            }
821          }
822
823          if (!DimsMatch) {
824            stringstream ss;
825            ss << "Won't read " << LocalFileName <<
826                  ": communicator-decomposition mismatch: " <<
827                  "current: " << Dims[0] << "x" << Dims[1] << "x" << Dims[2] <<
828                  ", file: " << GH.Dims[0] << "x" << GH.Dims[1] << "x" <<
829                  GH.Dims[2];
830            throw runtime_error(ss.str());
831          }
832        }
833#endif
834      }
835
836      HeaderSize = GH.HeaderSize;
837      Header.resize(HeaderSize + CRCSize, 0xFE /* poison */);
838      FH.get()->read(&Header[0], HeaderSize + CRCSize, 0, "header");
839
840      uint64_t CRC = crc64_omp(&Header[0], HeaderSize + CRCSize);
841      if (CRC != (uint64_t) -1) {
842        throw runtime_error("Header CRC check failed: " + LocalFileName);
843      }
844
845#ifndef GENERICIO_NO_MPI
846      close();
847      MPI_Bcast(&True, 1, MPI_BYTE, 0, SplitComm);
848#endif
849    } catch (...) {
850#ifndef GENERICIO_NO_MPI
851      MPI_Bcast(&False, 1, MPI_BYTE, 0, SplitComm);
852#endif
853      close();
854      throw;
855    }
856  } else {
857#ifndef GENERICIO_NO_MPI
858    char Okay;
859    MPI_Bcast(&Okay, 1, MPI_BYTE, 0, SplitComm);
860    if (!Okay)
861      throw runtime_error("Failure broadcast from rank 0");
862#endif
863  }
864
865#ifndef GENERICIO_NO_MPI
866  MPI_Bcast(&HeaderSize, 1, MPI_UINT64_T, 0, SplitComm);
867#endif
868
869  Header.resize(HeaderSize, 0xFD /* poison */);
870#ifndef GENERICIO_NO_MPI
871  MPI_Bcast(&Header[0], HeaderSize, MPI_BYTE, 0, SplitComm);
872#endif
873
874  FH.getHeaderCache().clear();
875  FH.getHeaderCache().swap(Header);
876  OpenFileName = LocalFileName;
877
878#ifndef GENERICIO_NO_MPI
879  MPI_Barrier(Comm);
880
881  if (FileIOType == FileIOMPI)
882    FH.get() = new GenericFileIO_MPI(SplitComm);
883  else if (FileIOType == FileIOMPICollective)
884    FH.get() = new GenericFileIO_MPICollective(SplitComm);
885  else
886    FH.get() = new GenericFileIO_POSIX();
887
888  int OpenErr = 0, TotOpenErr;
889  try {
890    FH.get()->open(LocalFileName, true);
891    MPI_Allreduce(&OpenErr, &TotOpenErr, 1, MPI_INT, MPI_SUM, Comm);
892  } catch (...) {
893    OpenErr = 1;
894    MPI_Allreduce(&OpenErr, &TotOpenErr, 1, MPI_INT, MPI_SUM, Comm);
895    throw;
896  }
897
898  if (TotOpenErr > 0) {
899    stringstream ss;
900    ss << TotOpenErr << " ranks failed to open file: " << LocalFileName;
901    throw runtime_error(ss.str());
902  }
903#endif
904}
905
906int GenericIO::readNRanks() {
907  if (RankMap.size())
908    return RankMap.size();
909
910  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
911  GlobalHeader *GH = (GlobalHeader *) &FH.getHeaderCache()[0];
912  return (int) GH->NRanks;
913}
914
915void GenericIO::readDims(int Dims[3]) {
916  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
917  GlobalHeader *GH = (GlobalHeader *) &FH.getHeaderCache()[0];
918  std::copy(GH->Dims, GH->Dims + 3, Dims);
919}
920
921uint64_t GenericIO::readTotalNumElems() {
922  if (RankMap.size())
923    return (uint64_t) -1;
924
925  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
926  GlobalHeader *GH = (GlobalHeader *) &FH.getHeaderCache()[0];
927  return GH->NElems;
928}
929
930void GenericIO::readPhysOrigin(double Origin[3]) {
931  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
932  GlobalHeader *GH = (GlobalHeader *) &FH.getHeaderCache()[0];
933  if (offsetof(GlobalHeader, PhysOrigin) >= GH->GlobalHeaderSize) {
934    std::fill(Origin, Origin + 3, 0.0);
935    return;
936  }
937
938  std::copy(GH->PhysOrigin, GH->PhysOrigin + 3, Origin);
939}
940
941void GenericIO::readPhysScale(double Scale[3]) {
942  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
943  GlobalHeader *GH = (GlobalHeader *) &FH.getHeaderCache()[0];
944  if (offsetof(GlobalHeader, PhysScale) >= GH->GlobalHeaderSize) {
945    std::fill(Scale, Scale + 3, 0.0);
946    return;
947  }
948
949  std::copy(GH->PhysScale, GH->PhysScale + 3, Scale);
950}
951
952static size_t getRankIndex(int EffRank, GlobalHeader *GH,
953                           vector<int> &RankMap, vector<char> &HeaderCache) {
954  if (offsetof(RankHeader, GlobalRank) >= GH->RanksSize || RankMap.empty())
955    return EffRank;
956
957  for (size_t i = 0; i < GH->NRanks; ++i) {
958    RankHeader *RH = (RankHeader *) &HeaderCache[GH->RanksStart +
959                                                 i*GH->RanksSize];
960    if ((int) RH->GlobalRank == EffRank)
961      return i;
962  }
963
964  assert(false && "Index requested of an invalid rank");
965  return (size_t) -1;
966}
967
968int GenericIO::readGlobalRankNumber(int EffRank) {
969  if (EffRank == -1) {
970#ifndef GENERICIO_NO_MPI
971    MPI_Comm_rank(Comm, &EffRank);
972#else
973    EffRank = 0;
974#endif
975  }
976
977  openAndReadHeader(false, EffRank, false);
978
979  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
980
981  GlobalHeader *GH = (GlobalHeader *) &FH.getHeaderCache()[0];
982  size_t RankIndex = getRankIndex(EffRank, GH, RankMap, FH.getHeaderCache());
983
984  assert(RankIndex < GH->NRanks && "Invalid rank specified");
985
986  if (offsetof(RankHeader, GlobalRank) >= GH->RanksSize)
987    return EffRank;
988
989  RankHeader *RH = (RankHeader *) &FH.getHeaderCache()[GH->RanksStart +
990                                               RankIndex*GH->RanksSize];
991
992  return (int) RH->GlobalRank;
993}
994
995size_t GenericIO::readNumElems(int EffRank) {
996  if (EffRank == -1) {
997#ifndef GENERICIO_NO_MPI
998    MPI_Comm_rank(Comm, &EffRank);
999#else
1000    EffRank = 0;
1001#endif
1002  }
1003
1004  openAndReadHeader(false, EffRank, false);
1005
1006  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1007
1008  GlobalHeader *GH = (GlobalHeader *) &FH.getHeaderCache()[0];
1009  size_t RankIndex = getRankIndex(EffRank, GH, RankMap, FH.getHeaderCache());
1010
1011  assert(RankIndex < GH->NRanks && "Invalid rank specified");
1012
1013  RankHeader *RH = (RankHeader *) &FH.getHeaderCache()[GH->RanksStart +
1014                                               RankIndex*GH->RanksSize];
1015  return (size_t) RH->NElems;
1016}
1017
1018void GenericIO::readCoords(int Coords[3], int EffRank) {
1019  if (EffRank == -1) {
1020#ifndef GENERICIO_NO_MPI
1021    MPI_Comm_rank(Comm, &EffRank);
1022#else
1023    EffRank = 0;
1024#endif
1025  }
1026
1027  openAndReadHeader(false, EffRank, false);
1028
1029  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1030
1031  GlobalHeader *GH = (GlobalHeader *) &FH.getHeaderCache()[0];
1032  size_t RankIndex = getRankIndex(EffRank, GH, RankMap, FH.getHeaderCache());
1033
1034  assert(RankIndex < GH->NRanks && "Invalid rank specified");
1035
1036  RankHeader *RH = (RankHeader *) &FH.getHeaderCache()[GH->RanksStart +
1037                                               RankIndex*GH->RanksSize];
1038
1039  std::copy(RH->Coords, RH->Coords + 3, Coords);
1040}
1041
1042// Note: Errors from this function should be recoverable. This means that if
1043// one rank throws an exception, then all ranks should.
1044void GenericIO::readData(int EffRank, bool PrintStats, bool CollStats) {
1045  int Rank;
1046#ifndef GENERICIO_NO_MPI
1047  MPI_Comm_rank(Comm, &Rank);
1048#else
1049  Rank = 0;
1050#endif
1051
1052  openAndReadHeader(false, EffRank, false);
1053
1054  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1055
1056  if (EffRank == -1)
1057    EffRank = Rank;
1058
1059  GlobalHeader *GH = (GlobalHeader *) &FH.getHeaderCache()[0];
1060  size_t RankIndex = getRankIndex(EffRank, GH, RankMap, FH.getHeaderCache());
1061
1062  assert(RankIndex < GH->NRanks && "Invalid rank specified");
1063
1064  RankHeader *RH = (RankHeader *) &FH.getHeaderCache()[GH->RanksStart +
1065                                               RankIndex*GH->RanksSize];
1066
1067  uint64_t TotalReadSize = 0;
1068#ifndef GENERICIO_NO_MPI
1069  double StartTime = MPI_Wtime();
1070#else
1071  double StartTime = double(clock())/CLOCKS_PER_SEC;
1072#endif
1073
1074  int NErrs[3] = { 0, 0, 0 };
1075  for (size_t i = 0; i < Vars.size(); ++i) {
1076    uint64_t Offset = RH->Start;
1077    bool VarFound = false;
1078    for (uint64_t j = 0; j < GH->NVars; ++j) {
1079      VariableHeader *VH = (VariableHeader *) &FH.getHeaderCache()[GH->VarsStart +
1080                                                           j*GH->VarsSize];
1081
1082      string VName(VH->Name, VH->Name + NameSize);
1083      size_t VNameNull = VName.find('\0');
1084      if (VNameNull < NameSize)
1085        VName.resize(VNameNull);
1086
1087      uint64_t ReadSize = RH->NElems*VH->Size + CRCSize;
1088      if (VName != Vars[i].Name) {
1089        Offset += ReadSize;
1090        continue;
1091      }
1092
1093      VarFound = true;
1094      bool IsFloat = (bool) (VH->Flags & FloatValue),
1095           IsSigned = (bool) (VH->Flags & SignedValue);
1096      if (VH->Size != Vars[i].Size) {
1097        stringstream ss;
1098        ss << "Size mismatch for variable " << Vars[i].Name <<
1099              " in: " << OpenFileName << ": current: " << Vars[i].Size <<
1100              ", file: " << VH->Size;
1101        throw runtime_error(ss.str());
1102      } else if (IsFloat != Vars[i].IsFloat) {
1103        string Float("float"), Int("integer");
1104        stringstream ss;
1105        ss << "Type mismatch for variable " << Vars[i].Name <<
1106              " in: " << OpenFileName << ": current: " <<
1107              (Vars[i].IsFloat ? Float : Int) <<
1108              ", file: " << (IsFloat ? Float : Int);
1109        throw runtime_error(ss.str());
1110      } else if (IsSigned != Vars[i].IsSigned) {
1111        string Signed("signed"), Uns("unsigned");
1112        stringstream ss;
1113        ss << "Type mismatch for variable " << Vars[i].Name <<
1114              " in: " << OpenFileName << ": current: " <<
1115              (Vars[i].IsSigned ? Signed : Uns) <<
1116              ", file: " << (IsSigned ? Signed : Uns);
1117        throw runtime_error(ss.str());
1118      }
1119
1120      vector<unsigned char> LData;
1121      void *Data = Vars[i].Data;
1122      bool HasExtraSpace = Vars[i].HasExtraSpace;
1123      if (offsetof(GlobalHeader, BlocksStart) < GH->GlobalHeaderSize &&
1124          GH->BlocksSize > 0) {
1125        BlockHeader *BH = (BlockHeader *)
1126          &FH.getHeaderCache()[GH->BlocksStart +
1127                               (RankIndex*GH->NVars + j)*GH->BlocksSize];
1128        ReadSize = BH->Size + CRCSize;
1129        Offset = BH->Start;
1130
1131        if (strncmp(BH->Filters[0], CompressName, FilterNameSize) == 0) {
1132          LData.resize(ReadSize);
1133          Data = &LData[0];
1134          HasExtraSpace = true;
1135        } else if (BH->Filters[0][0] != '\0') {
1136          stringstream ss;
1137          ss << "Unknown filter \"" << BH->Filters[0] << "\" on variable " << Vars[i].Name;
1138          throw runtime_error(ss.str());
1139        }
1140      }
1141
1142      assert(HasExtraSpace && "Extra space required for reading");
1143
1144      char CRCSave[CRCSize];
1145      char *CRCLoc = ((char *) Data) + ReadSize - CRCSize;
1146      if (HasExtraSpace)
1147        std::copy(CRCLoc, CRCLoc + CRCSize, CRCSave);
1148
1149      int Retry = 0;
1150      {
1151        int RetryCount = 300;
1152        const char *EnvStr = getenv("GENERICIO_RETRY_COUNT");
1153        if (EnvStr)
1154          RetryCount = atoi(EnvStr);
1155
1156        int RetrySleep = 100; // ms
1157        EnvStr = getenv("GENERICIO_RETRY_SLEEP");
1158        if (EnvStr)
1159          RetrySleep = atoi(EnvStr);
1160
1161        for (; Retry < RetryCount; ++Retry) {
1162          try {
1163            FH.get()->read(Data, ReadSize, Offset, Vars[i].Name);
1164            break;
1165          } catch (...) { }
1166
1167          usleep(1000*RetrySleep);
1168        }
1169
1170        if (Retry == RetryCount) {
1171          ++NErrs[0];
1172          break;
1173        } else if (Retry > 0) {
1174          EnvStr = getenv("GENERICIO_VERBOSE");
1175          if (EnvStr) {
1176            int Mod = atoi(EnvStr);
1177            if (Mod > 0) {
1178              int Rank;
1179#ifndef GENERICIO_NO_MPI
1180              MPI_Comm_rank(MPI_COMM_WORLD, &Rank);
1181#else
1182              Rank = 0;
1183#endif
1184
1185              std::cerr << "Rank " << Rank << ": " << Retry <<
1186                           " I/O retries were necessary for reading " <<
1187                           Vars[i].Name << " from: " << OpenFileName << "\n";
1188
1189              std::cerr.flush();
1190            }
1191          }
1192        }
1193      }
1194
1195      TotalReadSize += ReadSize;
1196
1197      uint64_t CRC = crc64_omp(Data, ReadSize);
1198      if (CRC != (uint64_t) -1) {
1199        ++NErrs[1];
1200
1201        int Rank;
1202#ifndef GENERICIO_NO_MPI
1203        MPI_Comm_rank(MPI_COMM_WORLD, &Rank);
1204#else
1205        Rank = 0;
1206#endif
1207
1208        // All ranks will do this and have a good time!
1209        string dn = "gio_crc_errors";
1210        mkdir(dn.c_str(), 0777);
1211
1212        srand(time(0));
1213        int DumpNum = rand();
1214        stringstream ssd;
1215        ssd << dn << "/gio_crc_error_dump." << Rank << "." << DumpNum << ".bin";
1216
1217        stringstream ss;
1218        ss << dn << "/gio_crc_error_log." << Rank << ".txt";
1219
1220        ofstream ofs(ss.str().c_str(), ofstream::out | ofstream::app);
1221        ofs << "On-Disk CRC Error Report:\n";
1222        ofs << "Variable: " << Vars[i].Name << "\n";
1223        ofs << "File: " << OpenFileName << "\n";
1224        ofs << "I/O Retries: " << Retry << "\n";
1225        ofs << "Size: " << ReadSize << " bytes\n";
1226        ofs << "Offset: " << Offset << " bytes\n";
1227        ofs << "CRC: " << CRC << " (expected is -1)\n";
1228        ofs << "Dump file: " << ssd.str() << "\n";
1229        ofs << "\n";
1230        ofs.close();
1231
1232        ofstream dofs(ssd.str().c_str(), ofstream::out);
1233        dofs.write((const char *) Data, ReadSize);
1234        dofs.close();
1235        break;
1236      }
1237
1238      if (HasExtraSpace)
1239        std::copy(CRCSave, CRCSave + CRCSize, CRCLoc);
1240
1241      if (LData.size()) {
1242        CompressHeader *CH = (CompressHeader*) &LData[0];
1243
1244#ifdef _OPENMP
1245#pragma omp master
1246  {
1247#endif
1248
1249       if (!blosc_initialized) {
1250         blosc_init();
1251         blosc_initialized = true;
1252       }
1253
1254#ifdef _OPENMP
1255       blosc_set_nthreads(omp_get_max_threads());
1256  }
1257#endif
1258
1259        blosc_decompress(&LData[0] + sizeof(CompressHeader),
1260                         Vars[i].Data, Vars[i].Size*RH->NElems);
1261
1262        if (CH->OrigCRC != crc64_omp(Vars[i].Data, Vars[i].Size*RH->NElems)) {
1263          ++NErrs[2];
1264          break;
1265        }
1266      }
1267
1268      break;
1269    }
1270
1271    if (!VarFound)
1272      throw runtime_error("Variable " + Vars[i].Name +
1273                          " not found in: " + OpenFileName);
1274
1275    // This is for debugging.
1276    if (NErrs[0] || NErrs[1] || NErrs[2]) {
1277      const char *EnvStr = getenv("GENERICIO_VERBOSE");
1278      if (EnvStr) {
1279        int Mod = atoi(EnvStr);
1280        if (Mod > 0) {
1281          int Rank;
1282#ifndef GENERICIO_NO_MPI
1283          MPI_Comm_rank(MPI_COMM_WORLD, &Rank);
1284#else
1285          Rank = 0;
1286#endif
1287
1288          std::cerr << "Rank " << Rank << ": " << NErrs[0] << " I/O error(s), " <<
1289          NErrs[1] << " CRC error(s) and " << NErrs[2] <<
1290          " decompression CRC error(s) reading: " << Vars[i].Name <<
1291          " from: " << OpenFileName << "\n";
1292
1293          std::cerr.flush();
1294        }
1295      }
1296    }
1297
1298    if (NErrs[0] || NErrs[1] || NErrs[2])
1299      break;
1300  }
1301
1302  int AllNErrs[3];
1303#ifndef GENERICIO_NO_MPI
1304  MPI_Allreduce(NErrs, AllNErrs, 3, MPI_INT, MPI_SUM, Comm);
1305#else
1306  AllNErrs[0] = NErrs[0]; AllNErrs[1] = NErrs[1]; AllNErrs[2] = NErrs[2];
1307#endif
1308
1309  if (AllNErrs[0] > 0 || AllNErrs[1] > 0 || AllNErrs[2] > 0) {
1310    stringstream ss;
1311    ss << "Experienced " << AllNErrs[0] << " I/O error(s), " <<
1312          AllNErrs[1] << " CRC error(s) and " << AllNErrs[2] <<
1313          " decompression CRC error(s) reading: " << OpenFileName;
1314    throw runtime_error(ss.str());
1315  }
1316
1317#ifndef GENERICIO_NO_MPI
1318  MPI_Barrier(Comm);
1319#endif
1320
1321#ifndef GENERICIO_NO_MPI
1322  double EndTime = MPI_Wtime();
1323#else
1324  double EndTime = double(clock())/CLOCKS_PER_SEC;
1325#endif
1326
1327  double TotalTime = EndTime - StartTime;
1328  double MaxTotalTime;
1329#ifndef GENERICIO_NO_MPI
1330  if (CollStats)
1331    MPI_Reduce(&TotalTime, &MaxTotalTime, 1, MPI_DOUBLE, MPI_MAX, 0, Comm);
1332  else
1333#endif
1334  MaxTotalTime = TotalTime;
1335
1336  uint64_t AllTotalReadSize;
1337#ifndef GENERICIO_NO_MPI
1338  if (CollStats)
1339    MPI_Reduce(&TotalReadSize, &AllTotalReadSize, 1, MPI_UINT64_T, MPI_SUM, 0, Comm);
1340  else
1341#endif
1342  AllTotalReadSize = TotalReadSize;
1343
1344  if (Rank == 0 && PrintStats) {
1345    double Rate = ((double) AllTotalReadSize) / MaxTotalTime / (1024.*1024.);
1346    cout << "Read " << Vars.size() << " variables from " << FileName <<
1347            " (" << AllTotalReadSize << " bytes) in " << MaxTotalTime << "s: " <<
1348            Rate << " MB/s [excluding header read]" << endl;
1349  }
1350
1351}
1352
1353void GenericIO::getVariableInfo(vector<VariableInfo> &VI) {
1354  assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
1355
1356  GlobalHeader *GH = (GlobalHeader *) &FH.getHeaderCache()[0];
1357  for (uint64_t j = 0; j < GH->NVars; ++j) {
1358    VariableHeader *VH = (VariableHeader *) &FH.getHeaderCache()[GH->VarsStart +
1359                                                         j*GH->VarsSize];
1360
1361    string VName(VH->Name, VH->Name + NameSize);
1362    size_t VNameNull = VName.find('\0');
1363    if (VNameNull < NameSize)
1364      VName.resize(VNameNull);
1365
1366    bool IsFloat = (bool) (VH->Flags & FloatValue),
1367         IsSigned = (bool) (VH->Flags & SignedValue),
1368         IsPhysCoordX = (bool) (VH->Flags & ValueIsPhysCoordX),
1369         IsPhysCoordY = (bool) (VH->Flags & ValueIsPhysCoordY),
1370         IsPhysCoordZ = (bool) (VH->Flags & ValueIsPhysCoordZ),
1371         MaybePhysGhost = (bool) (VH->Flags & ValueMaybePhysGhost);
1372    VI.push_back(VariableInfo(VName, (size_t) VH->Size, IsFloat, IsSigned,
1373                              IsPhysCoordX, IsPhysCoordY, IsPhysCoordZ,
1374                              MaybePhysGhost));
1375  }
1376}
1377
1378void GenericIO::setNaturalDefaultPartition() {
1379#ifdef __bgq__
1380  Personality_t pers;
1381  Kernel_GetPersonality(&pers, sizeof(pers));
1382
1383  // Nodes in an ION Partition
1384  int SPLIT_A = 2;
1385  int SPLIT_B = 2;
1386  int SPLIT_C = 4;
1387  int SPLIT_D = 4;
1388  int SPLIT_E = 2;
1389
1390  int Anodes, Bnodes, Cnodes, Dnodes, Enodes;
1391  int Acoord, Bcoord, Ccoord, Dcoord, Ecoord;
1392  int A_color, B_color, C_color, D_color, E_color;
1393  int A_blocks, B_blocks, C_blocks, D_blocks, E_blocks;
1394  uint32_t id_on_node;
1395  int ranks_per_node, color;
1396
1397  Anodes = pers.Network_Config.Anodes;
1398  Acoord = pers.Network_Config.Acoord;
1399
1400  Bnodes = pers.Network_Config.Bnodes;
1401  Bcoord = pers.Network_Config.Bcoord;
1402
1403  Cnodes = pers.Network_Config.Cnodes;
1404  Ccoord = pers.Network_Config.Ccoord;
1405
1406  Dnodes = pers.Network_Config.Dnodes;
1407  Dcoord = pers.Network_Config.Dcoord;
1408
1409  Enodes = pers.Network_Config.Enodes;
1410  Ecoord = pers.Network_Config.Ecoord;
1411
1412  A_color  = Acoord /  SPLIT_A;
1413  B_color  = Bcoord /  SPLIT_B;
1414  C_color  = Ccoord /  SPLIT_C;
1415  D_color  = Dcoord /  SPLIT_D;
1416  E_color  = Ecoord /  SPLIT_E;
1417
1418  // Number of blocks
1419  A_blocks = Anodes / SPLIT_A;
1420  B_blocks = Bnodes / SPLIT_B;
1421  C_blocks = Cnodes / SPLIT_C;
1422  D_blocks = Dnodes / SPLIT_D;
1423  E_blocks = Enodes / SPLIT_E;
1424
1425  color = (A_color * (B_blocks * C_blocks * D_blocks * E_blocks))
1426    + (B_color * (C_blocks * D_blocks * E_blocks))
1427    + (C_color * ( D_blocks * E_blocks))
1428    + (D_color * ( E_blocks))
1429    + E_color;
1430
1431  DefaultPartition = color;
1432#else
1433#ifndef GENERICIO_NO_MPI
1434  bool UseName = true;
1435  const char *EnvStr = getenv("GENERICIO_PARTITIONS_USE_NAME");
1436  if (EnvStr) {
1437    int Mod = atoi(EnvStr);
1438    UseName = (Mod != 0);
1439  }
1440
1441  if (UseName) {
1442    // This is a heuristic to generate ~256 partitions based on the
1443    // names of the nodes.
1444    char Name[MPI_MAX_PROCESSOR_NAME];
1445    int Len = 0;
1446
1447    MPI_Get_processor_name(Name, &Len);
1448    unsigned char color = 0;
1449    for (int i = 0; i < Len; ++i)
1450      color += (unsigned char) Name[i];
1451
1452    DefaultPartition = color;
1453  }
1454
1455  // This is for debugging.
1456  EnvStr = getenv("GENERICIO_RANK_PARTITIONS");
1457  if (EnvStr) {
1458    int Mod = atoi(EnvStr);
1459    if (Mod > 0) {
1460      int Rank;
1461      MPI_Comm_rank(MPI_COMM_WORLD, &Rank);
1462      DefaultPartition += Rank % Mod;
1463    }
1464  }
1465#endif
1466#endif
1467}
1468
1469} /* END namespace cosmotk */
Note: See TracBrowser for help on using the repository browser.