source: thirdparty/blosc/blosc.c @ 981e22c

Revision 981e22c, 63.1 KB checked in by Hal Finkel <hfinkel@…>, 8 years ago (diff)

Upgrade to latest blosc library

blosc git: e394f327ccc78319d90a06af0b88bce07034b8dd

  • Property mode set to 100644
Line 
1/*********************************************************************
2  Blosc - Blocked Shuffling and Compression Library
3
4  Author: Francesc Alted <[email protected]>
5  Creation date: 2009-05-20
6
7  See LICENSES/BLOSC.txt for details about copyright and rights to use.
8**********************************************************************/
9
10
11#include <stdio.h>
12#include <stdlib.h>
13#include <errno.h>
14#include <string.h>
15#include <sys/types.h>
16#include <sys/stat.h>
17#include <assert.h>
18#if defined(USING_CMAKE)
19  #include "config.h"
20#endif /*  USING_CMAKE */
21#include "blosc.h"
22#include "shuffle.h"
23#include "blosclz.h"
24#if defined(HAVE_LZ4)
25  #include "lz4.h"
26  #include "lz4hc.h"
27#endif /*  HAVE_LZ4 */
28#if defined(HAVE_SNAPPY)
29  #include "snappy-c.h"
30#endif /*  HAVE_SNAPPY */
31#if defined(HAVE_ZLIB)
32  #include "zlib.h"
33#endif /*  HAVE_ZLIB */
34#if defined(HAVE_ZSTD)
35  #include "zstd.h"
36#endif /*  HAVE_ZSTD */
37
38#if defined(_WIN32) && !defined(__MINGW32__)
39  #include <windows.h>
40  #include <malloc.h>
41
42  /* stdint.h only available in VS2010 (VC++ 16.0) and newer */
43  #if defined(_MSC_VER) && _MSC_VER < 1600
44    #include "win32/stdint-windows.h"
45  #else
46    #include <stdint.h>
47  #endif
48
49  #include <process.h>
50  #define getpid _getpid
51#else
52  #include <stdint.h>
53  #include <unistd.h>
54  #include <inttypes.h>
55#endif  /* _WIN32 */
56
57#if defined(_WIN32) && !defined(__GNUC__)
58  #include "win32/pthread.h"
59  #include "win32/pthread.c"
60#else
61  #include <pthread.h>
62#endif
63
64/* If C11 is supported, use it's built-in aligned allocation. */
65#if __STDC_VERSION__ >= 201112L
66  #include <stdalign.h>
67#endif
68
69
70/* Some useful units */
71#define KB 1024
72#define MB (1024*KB)
73
74/* Minimum buffer size to be compressed */
75#define MIN_BUFFERSIZE 128       /* Cannot be smaller than 66 */
76
77/* The maximum number of splits in a block for compression */
78#define MAX_SPLITS 16            /* Cannot be larger than 128 */
79
80/* The size of L1 cache.  32 KB is quite common nowadays. */
81#define L1 (32*KB)
82
83/* Have problems using posix barriers when symbol value is 200112L */
84/* This requires more investigation, but will work for the moment */
85#if defined(_POSIX_BARRIERS) && ( (_POSIX_BARRIERS - 20012L) >= 0 && _POSIX_BARRIERS != 200112L)
86#define _POSIX_BARRIERS_MINE
87#endif
88/* Synchronization variables */
89
90
91struct blosc_context {
92  int32_t compress;               /* 1 if we are doing compression 0 if decompress */
93
94  const uint8_t* src;
95  uint8_t* dest;                  /* The current pos in the destination buffer */
96  uint8_t* header_flags;          /* Flags for header.  Currently booked:
97                                    - 0: byte-shuffled?
98                                    - 1: memcpy'ed?
99                                    - 2: bit-shuffled? */
100  int32_t sourcesize;             /* Number of bytes in source buffer (or uncompressed bytes in compressed file) */
101  int32_t nblocks;                /* Number of total blocks in buffer */
102  int32_t leftover;               /* Extra bytes at end of buffer */
103  int32_t blocksize;              /* Length of the block in bytes */
104  int32_t typesize;               /* Type size */
105  int32_t num_output_bytes;       /* Counter for the number of output bytes */
106  int32_t destsize;               /* Maximum size for destination buffer */
107  uint8_t* bstarts;               /* Start of the buffer past header info */
108  int32_t compcode;               /* Compressor code to use */
109  int clevel;                     /* Compression level (1-9) */
110
111  /* Threading */
112  int32_t numthreads;
113  int32_t threads_started;
114  int32_t end_threads;
115  pthread_t threads[BLOSC_MAX_THREADS];
116  int32_t tids[BLOSC_MAX_THREADS];
117  pthread_mutex_t count_mutex;
118  #ifdef _POSIX_BARRIERS_MINE
119  pthread_barrier_t barr_init;
120  pthread_barrier_t barr_finish;
121  #else
122  int32_t count_threads;
123  pthread_mutex_t count_threads_mutex;
124  pthread_cond_t count_threads_cv;
125  #endif
126  #if !defined(_WIN32)
127  pthread_attr_t ct_attr;            /* creation time attrs for threads */
128  #endif
129  int32_t thread_giveup_code;               /* error code when give up */
130  int32_t thread_nblock;                    /* block counter */
131};
132
133struct thread_context {
134  struct blosc_context* parent_context;
135  int32_t tid;
136  uint8_t* tmp;
137  uint8_t* tmp2;
138  uint8_t* tmp3;
139  int32_t tmpblocksize; /* Used to keep track of how big the temporary buffers are */
140};
141
142/* Global context for non-contextual API */
143static struct blosc_context* g_global_context;
144static pthread_mutex_t global_comp_mutex;
145static int32_t g_compressor = BLOSC_BLOSCLZ;  /* the compressor to use by default */
146static int32_t g_threads = 1;
147static int32_t g_force_blocksize = 0;
148static int32_t g_initlib = 0;
149
150
151
152/* Wrapped function to adjust the number of threads used by blosc */
153int blosc_set_nthreads_(struct blosc_context*);
154
155/* Releases the global threadpool */
156int blosc_release_threadpool(struct blosc_context* context);
157
158/* Macros for synchronization */
159
160/* Wait until all threads are initialized */
161#ifdef _POSIX_BARRIERS_MINE
162#define WAIT_INIT(RET_VAL, CONTEXT_PTR)  \
163  rc = pthread_barrier_wait(&CONTEXT_PTR->barr_init); \
164  if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { \
165    printf("Could not wait on barrier (init): %d\n", rc); \
166    return((RET_VAL));                            \
167  }
168#else
169#define WAIT_INIT(RET_VAL, CONTEXT_PTR)   \
170  pthread_mutex_lock(&CONTEXT_PTR->count_threads_mutex); \
171  if (CONTEXT_PTR->count_threads < CONTEXT_PTR->numthreads) { \
172    CONTEXT_PTR->count_threads++;  \
173    pthread_cond_wait(&CONTEXT_PTR->count_threads_cv, &CONTEXT_PTR->count_threads_mutex); \
174  } \
175  else { \
176    pthread_cond_broadcast(&CONTEXT_PTR->count_threads_cv); \
177  } \
178  pthread_mutex_unlock(&CONTEXT_PTR->count_threads_mutex);
179#endif
180
181/* Wait for all threads to finish */
182#ifdef _POSIX_BARRIERS_MINE
183#define WAIT_FINISH(RET_VAL, CONTEXT_PTR)   \
184  rc = pthread_barrier_wait(&CONTEXT_PTR->barr_finish); \
185  if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { \
186    printf("Could not wait on barrier (finish)\n"); \
187    return((RET_VAL));                              \
188  }
189#else
190#define WAIT_FINISH(RET_VAL, CONTEXT_PTR)                           \
191  pthread_mutex_lock(&CONTEXT_PTR->count_threads_mutex); \
192  if (CONTEXT_PTR->count_threads > 0) { \
193    CONTEXT_PTR->count_threads--; \
194    pthread_cond_wait(&CONTEXT_PTR->count_threads_cv, &CONTEXT_PTR->count_threads_mutex); \
195  } \
196  else { \
197    pthread_cond_broadcast(&CONTEXT_PTR->count_threads_cv); \
198  } \
199  pthread_mutex_unlock(&CONTEXT_PTR->count_threads_mutex);
200#endif
201
202
203/* A function for aligned malloc that is portable */
204static uint8_t *my_malloc(size_t size)
205{
206  void *block = NULL;
207  int res = 0;
208
209/* Do an alignment to 32 bytes because AVX2 is supported */
210#if _ISOC11_SOURCE
211  /* C11 aligned allocation. 'size' must be a multiple of the alignment. */
212  block = aligned_alloc(32, size);
213#elif defined(_WIN32)
214  /* A (void *) cast needed for avoiding a warning with MINGW :-/ */
215  block = (void *)_aligned_malloc(size, 32);
216#elif defined __APPLE__
217  /* Mac OS X guarantees 16-byte alignment in small allocs */
218  block = malloc(size);
219#elif _POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600
220  /* Platform does have an implementation of posix_memalign */
221  res = posix_memalign(&block, 32, size);
222#else
223  block = malloc(size);
224#endif  /* _WIN32 */
225
226  if (block == NULL || res != 0) {
227    printf("Error allocating memory!");
228    return NULL;
229  }
230
231  return (uint8_t *)block;
232}
233
234
235/* Release memory booked by my_malloc */
236static void my_free(void *block)
237{
238#if defined(_WIN32)
239    _aligned_free(block);
240#else
241    free(block);
242#endif  /* _WIN32 */
243}
244
245
246/* Copy 4 bytes from `*pa` to int32_t, changing endianness if necessary. */
247static int32_t sw32_(const uint8_t *pa)
248{
249  int32_t idest;
250  uint8_t *dest = (uint8_t *)&idest;
251  int i = 1;                    /* for big/little endian detection */
252  char *p = (char *)&i;
253
254  if (p[0] != 1) {
255    /* big endian */
256    dest[0] = pa[3];
257    dest[1] = pa[2];
258    dest[2] = pa[1];
259    dest[3] = pa[0];
260  }
261  else {
262    /* little endian */
263    dest[0] = pa[0];
264    dest[1] = pa[1];
265    dest[2] = pa[2];
266    dest[3] = pa[3];
267  }
268  return idest;
269}
270
271
272/* Copy 4 bytes from `*pa` to `*dest`, changing endianness if necessary. */
273static void _sw32(uint8_t* dest, int32_t a)
274{
275  uint8_t *pa = (uint8_t *)&a;
276  int i = 1;                    /* for big/little endian detection */
277  char *p = (char *)&i;
278
279  if (p[0] != 1) {
280    /* big endian */
281    dest[0] = pa[3];
282    dest[1] = pa[2];
283    dest[2] = pa[1];
284    dest[3] = pa[0];
285  }
286  else {
287    /* little endian */
288    dest[0] = pa[0];
289    dest[1] = pa[1];
290    dest[2] = pa[2];
291    dest[3] = pa[3];
292  }
293}
294
295
296/*
297 * Conversion routines between compressor and compression libraries
298 */
299
300/* Return the library code associated with the compressor name */
301static int compname_to_clibcode(const char *compname)
302{
303  if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0)
304    return BLOSC_BLOSCLZ_LIB;
305  if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0)
306    return BLOSC_LZ4_LIB;
307  if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0)
308    return BLOSC_LZ4_LIB;
309  if (strcmp(compname, BLOSC_SNAPPY_COMPNAME) == 0)
310    return BLOSC_SNAPPY_LIB;
311  if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0)
312    return BLOSC_ZLIB_LIB;
313  if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0)
314    return BLOSC_ZSTD_LIB;
315  return -1;
316}
317
318/* Return the library name associated with the compressor code */
319static char *clibcode_to_clibname(int clibcode)
320{
321  if (clibcode == BLOSC_BLOSCLZ_LIB) return BLOSC_BLOSCLZ_LIBNAME;
322  if (clibcode == BLOSC_LZ4_LIB) return BLOSC_LZ4_LIBNAME;
323  if (clibcode == BLOSC_SNAPPY_LIB) return BLOSC_SNAPPY_LIBNAME;
324  if (clibcode == BLOSC_ZLIB_LIB) return BLOSC_ZLIB_LIBNAME;
325  if (clibcode == BLOSC_ZSTD_LIB) return BLOSC_ZSTD_LIBNAME;
326  return NULL;                  /* should never happen */
327}
328
329
330/*
331 * Conversion routines between compressor names and compressor codes
332 */
333
334/* Get the compressor name associated with the compressor code */
335int blosc_compcode_to_compname(int compcode, char **compname)
336{
337  int code = -1;    /* -1 means non-existent compressor code */
338  char *name = NULL;
339
340  /* Map the compressor code */
341  if (compcode == BLOSC_BLOSCLZ)
342    name = BLOSC_BLOSCLZ_COMPNAME;
343  else if (compcode == BLOSC_LZ4)
344    name = BLOSC_LZ4_COMPNAME;
345  else if (compcode == BLOSC_LZ4HC)
346    name = BLOSC_LZ4HC_COMPNAME;
347  else if (compcode == BLOSC_SNAPPY)
348    name = BLOSC_SNAPPY_COMPNAME;
349  else if (compcode == BLOSC_ZLIB)
350    name = BLOSC_ZLIB_COMPNAME;
351  else if (compcode == BLOSC_ZSTD)
352    name = BLOSC_ZSTD_COMPNAME;
353
354  *compname = name;
355
356  /* Guess if there is support for this code */
357  if (compcode == BLOSC_BLOSCLZ)
358    code = BLOSC_BLOSCLZ;
359#if defined(HAVE_LZ4)
360  else if (compcode == BLOSC_LZ4)
361    code = BLOSC_LZ4;
362  else if (compcode == BLOSC_LZ4HC)
363    code = BLOSC_LZ4HC;
364#endif /*  HAVE_LZ4 */
365#if defined(HAVE_SNAPPY)
366  else if (compcode == BLOSC_SNAPPY)
367    code = BLOSC_SNAPPY;
368#endif /*  HAVE_SNAPPY */
369#if defined(HAVE_ZLIB)
370  else if (compcode == BLOSC_ZLIB)
371    code = BLOSC_ZLIB;
372#endif /*  HAVE_ZLIB */
373#if defined(HAVE_ZSTD)
374  else if (compcode == BLOSC_ZSTD)
375    code = BLOSC_ZSTD;
376#endif /*  HAVE_ZSTD */
377
378  return code;
379}
380
381/* Get the compressor code for the compressor name. -1 if it is not available */
382int blosc_compname_to_compcode(const char *compname)
383{
384  int code = -1;  /* -1 means non-existent compressor code */
385
386  if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0) {
387    code = BLOSC_BLOSCLZ;
388  }
389#if defined(HAVE_LZ4)
390  else if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0) {
391    code = BLOSC_LZ4;
392  }
393  else if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0) {
394    code = BLOSC_LZ4HC;
395  }
396#endif /*  HAVE_LZ4 */
397#if defined(HAVE_SNAPPY)
398  else if (strcmp(compname, BLOSC_SNAPPY_COMPNAME) == 0) {
399    code = BLOSC_SNAPPY;
400  }
401#endif /*  HAVE_SNAPPY */
402#if defined(HAVE_ZLIB)
403  else if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0) {
404    code = BLOSC_ZLIB;
405  }
406#endif /*  HAVE_ZLIB */
407#if defined(HAVE_ZSTD)
408  else if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0) {
409    code = BLOSC_ZSTD;
410  }
411#endif /*  HAVE_ZSTD */
412
413return code;
414}
415
416
417#if defined(HAVE_LZ4)
418static int lz4_wrap_compress(const char* input, size_t input_length,
419                             char* output, size_t maxout, int accel)
420{
421  int cbytes;
422  cbytes = LZ4_compress_fast(input, output, (int)input_length, (int)maxout,
423                             accel);
424  return cbytes;
425}
426
427static int lz4hc_wrap_compress(const char* input, size_t input_length,
428                               char* output, size_t maxout, int clevel)
429{
430  int cbytes;
431  if (input_length > (size_t)(2<<30))
432    return -1;   /* input larger than 1 GB is not supported */
433  /* clevel for lz4hc goes up to 16, at least in LZ4 1.1.3 */
434  cbytes = LZ4_compressHC2_limitedOutput(input, output, (int)input_length,
435                                         (int)maxout, clevel*2-1);
436  return cbytes;
437}
438
439static int lz4_wrap_decompress(const char* input, size_t compressed_length,
440                               char* output, size_t maxout)
441{
442  size_t cbytes;
443  cbytes = LZ4_decompress_fast(input, output, (int)maxout);
444  if (cbytes != compressed_length) {
445    return 0;
446  }
447  return (int)maxout;
448}
449
450#endif /* HAVE_LZ4 */
451
452#if defined(HAVE_SNAPPY)
453static int snappy_wrap_compress(const char* input, size_t input_length,
454                                char* output, size_t maxout)
455{
456  snappy_status status;
457  size_t cl = maxout;
458  status = snappy_compress(input, input_length, output, &cl);
459  if (status != SNAPPY_OK){
460    return 0;
461  }
462  return (int)cl;
463}
464
465static int snappy_wrap_decompress(const char* input, size_t compressed_length,
466                                  char* output, size_t maxout)
467{
468  snappy_status status;
469  size_t ul = maxout;
470  status = snappy_uncompress(input, compressed_length, output, &ul);
471  if (status != SNAPPY_OK){
472    return 0;
473  }
474  return (int)ul;
475}
476#endif /* HAVE_SNAPPY */
477
478#if defined(HAVE_ZLIB)
479/* zlib is not very respectful with sharing name space with others.
480 Fortunately, its names do not collide with those already in blosc. */
481static int zlib_wrap_compress(const char* input, size_t input_length,
482                              char* output, size_t maxout, int clevel)
483{
484  int status;
485  uLongf cl = maxout;
486  status = compress2(
487             (Bytef*)output, &cl, (Bytef*)input, (uLong)input_length, clevel);
488  if (status != Z_OK){
489    return 0;
490  }
491  return (int)cl;
492}
493
494static int zlib_wrap_decompress(const char* input, size_t compressed_length,
495                                char* output, size_t maxout)
496{
497  int status;
498  uLongf ul = maxout;
499  status = uncompress(
500             (Bytef*)output, &ul, (Bytef*)input, (uLong)compressed_length);
501  if (status != Z_OK){
502    return 0;
503  }
504  return (int)ul;
505}
506#endif /*  HAVE_ZLIB */
507
508#if defined(HAVE_ZSTD)
509static int zstd_wrap_compress(const char* input, size_t input_length,
510                              char* output, size_t maxout, int clevel) {
511  size_t code;
512  // clevel = (clevel < 9) ? clevel * 2 - 1 : ZSTD_maxCLevel();  // see zstd#254
513  clevel = (clevel < 9) ? clevel * 2 - 1 : 22;
514  code = ZSTD_compress(
515      (void*)output, maxout, (void*)input, input_length, clevel);
516  if (ZSTD_isError(code)) {
517    return 0;
518  }
519  return (int)code;
520}
521
522static int zstd_wrap_decompress(const char* input, size_t compressed_length,
523                                char* output, size_t maxout) {
524  size_t code;
525  code = ZSTD_decompress(
526      (void*)output, maxout, (void*)input, compressed_length);
527  if (ZSTD_isError(code)) {
528    fprintf(stderr, "error decompressing with Zstd: %s \n", ZSTD_getErrorName(code));
529    return 0;
530  }
531  return (int)code;
532}
533#endif /*  HAVE_ZSTD */
534
535/* Compute acceleration for blosclz */
536static int get_accel(const struct blosc_context* context) {
537  int32_t clevel = context->clevel;
538  int32_t typesize = context->typesize;
539
540  if (clevel == 9) {
541    return 1;
542  }
543  if (context->compcode == BLOSC_BLOSCLZ) {
544    /* Compute the power of 2. See:
545     * http://www.exploringbinary.com/ten-ways-to-check-if-an-integer-is-a-power-of-two-in-c/
546     */
547    int32_t tspow2 = ((typesize != 0) && !(typesize & (typesize - 1)));
548    if (tspow2 && typesize < 32) {
549      return 32;
550    }
551  }
552  else if (context->compcode == BLOSC_LZ4) {
553    /* This acceleration setting based on discussions held in:
554     * https://groups.google.com/forum/#!topic/lz4c/zosy90P8MQw
555     */
556    return (10 - clevel);
557  }
558  return 1;
559}
560
561/* Shuffle & compress a single block */
562static int blosc_c(const struct blosc_context* context, int32_t blocksize,
563                   int32_t leftoverblock, int32_t ntbytes, int32_t maxbytes,
564                   const uint8_t *src, uint8_t *dest, uint8_t *tmp,
565                   uint8_t *tmp2)
566{
567  int32_t j, neblock, nsplits;
568  int32_t cbytes;                   /* number of compressed bytes in split */
569  int32_t ctbytes = 0;              /* number of compressed bytes in block */
570  int32_t maxout;
571  int32_t typesize = context->typesize;
572  const uint8_t *_tmp = src;
573  char *compname;
574  int accel;
575  int bscount;
576
577  if (*(context->header_flags) & BLOSC_DOSHUFFLE & (typesize > 1)) {
578    /* Byte shuffling only makes sense if typesize > 1 */
579    shuffle(typesize, blocksize, src, tmp);
580    _tmp = tmp;
581  }
582  /* We don't allow more than 1 filter at the same time (yet) */
583  else if (*(context->header_flags) & BLOSC_DOBITSHUFFLE) {
584    bscount = bitshuffle(typesize, blocksize, src, tmp, tmp2);
585    if (bscount < 0)
586      return bscount;
587    _tmp = tmp;
588  }
589
590  /* Calculate acceleration for different compressors */
591  accel = get_accel(context);
592
593  /* Compress for each shuffled slice split for this block. */
594  /* If typesize is too large, neblock is too small or we are in a
595     leftover block, do not split at all. */
596  if ((typesize <= MAX_SPLITS) && (blocksize/typesize) >= MIN_BUFFERSIZE &&
597      (!leftoverblock)) {
598    nsplits = typesize;
599  }
600  else {
601    nsplits = 1;
602  }
603  neblock = blocksize / nsplits;
604  for (j = 0; j < nsplits; j++) {
605    dest += sizeof(int32_t);
606    ntbytes += (int32_t)sizeof(int32_t);
607    ctbytes += (int32_t)sizeof(int32_t);
608    maxout = neblock;
609    #if defined(HAVE_SNAPPY)
610    if (context->compcode == BLOSC_SNAPPY) {
611      /* TODO perhaps refactor this to keep the value stashed somewhere */
612      maxout = snappy_max_compressed_length(neblock);
613    }
614    #endif /*  HAVE_SNAPPY */
615    if (ntbytes+maxout > maxbytes) {
616      maxout = maxbytes - ntbytes;   /* avoid buffer overrun */
617      if (maxout <= 0) {
618        return 0;                  /* non-compressible block */
619      }
620    }
621    if (context->compcode == BLOSC_BLOSCLZ) {
622      cbytes = blosclz_compress(context->clevel, _tmp+j*neblock, neblock,
623                                dest, maxout, accel);
624    }
625    #if defined(HAVE_LZ4)
626    else if (context->compcode == BLOSC_LZ4) {
627      cbytes = lz4_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
628                                 (char *)dest, (size_t)maxout, accel);
629    }
630    else if (context->compcode == BLOSC_LZ4HC) {
631      cbytes = lz4hc_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
632                                   (char *)dest, (size_t)maxout,
633                                   context->clevel);
634    }
635    #endif /* HAVE_LZ4 */
636    #if defined(HAVE_SNAPPY)
637    else if (context->compcode == BLOSC_SNAPPY) {
638      cbytes = snappy_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
639                                    (char *)dest, (size_t)maxout);
640    }
641    #endif /* HAVE_SNAPPY */
642    #if defined(HAVE_ZLIB)
643    else if (context->compcode == BLOSC_ZLIB) {
644      cbytes = zlib_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
645                                  (char *)dest, (size_t)maxout,
646                                  context->clevel);
647    }
648    #endif /* HAVE_ZLIB */
649    #if defined(HAVE_ZSTD)
650    else if (context->compcode == BLOSC_ZSTD) {
651      cbytes = zstd_wrap_compress((char*)_tmp + j * neblock, (size_t)neblock,
652                                  (char*)dest, (size_t)maxout, context->clevel);
653    }
654    #endif /* HAVE_ZSTD */
655
656    else {
657      blosc_compcode_to_compname(context->compcode, &compname);
658      fprintf(stderr, "Blosc has not been compiled with '%s' ", compname);
659      fprintf(stderr, "compression support.  Please use one having it.");
660      return -5;    /* signals no compression support */
661    }
662
663    if (cbytes > maxout) {
664      /* Buffer overrun caused by compression (should never happen) */
665      return -1;
666    }
667    else if (cbytes < 0) {
668      /* cbytes should never be negative */
669      return -2;
670    }
671    else if (cbytes == 0 || cbytes == neblock) {
672      /* The compressor has been unable to compress data at all. */
673      /* Before doing the copy, check that we are not running into a
674         buffer overflow. */
675      if ((ntbytes+neblock) > maxbytes) {
676        return 0;    /* Non-compressible data */
677      }
678      memcpy(dest, _tmp+j*neblock, neblock);
679      cbytes = neblock;
680    }
681    _sw32(dest - 4, cbytes);
682    dest += cbytes;
683    ntbytes += cbytes;
684    ctbytes += cbytes;
685  }  /* Closes j < nsplits */
686
687  return ctbytes;
688}
689
690/* Decompress & unshuffle a single block */
691static int blosc_d(struct blosc_context* context, int32_t blocksize, int32_t leftoverblock,
692                   const uint8_t *src, uint8_t *dest, uint8_t *tmp, uint8_t *tmp2)
693{
694  int32_t j, neblock, nsplits;
695  int32_t nbytes;                /* number of decompressed bytes in split */
696  int32_t cbytes;                /* number of compressed bytes in split */
697  int32_t ctbytes = 0;           /* number of compressed bytes in block */
698  int32_t ntbytes = 0;           /* number of uncompressed bytes in block */
699  uint8_t *_tmp = dest;
700  int32_t typesize = context->typesize;
701  int32_t compformat;
702  char *compname;
703  int bscount;
704
705  if ((*(context->header_flags) & BLOSC_DOSHUFFLE & (typesize > 1)) ||  \
706      (*(context->header_flags) & BLOSC_DOBITSHUFFLE)) {
707    _tmp = tmp;
708  }
709
710  compformat = (*(context->header_flags) & 0xe0) >> 5;
711
712  /* Compress for each shuffled slice split for this block. */
713  if ((typesize <= MAX_SPLITS) && (blocksize/typesize) >= MIN_BUFFERSIZE &&
714      (!leftoverblock)) {
715    nsplits = typesize;
716  }
717  else {
718    nsplits = 1;
719  }
720  neblock = blocksize / nsplits;
721  for (j = 0; j < nsplits; j++) {
722    cbytes = sw32_(src);      /* amount of compressed bytes */
723    src += sizeof(int32_t);
724    ctbytes += (int32_t)sizeof(int32_t);
725    /* Uncompress */
726    if (cbytes == neblock) {
727      memcpy(_tmp, src, neblock);
728      nbytes = neblock;
729    }
730    else {
731      if (compformat == BLOSC_BLOSCLZ_FORMAT) {
732        nbytes = blosclz_decompress(src, cbytes, _tmp, neblock);
733      }
734      #if defined(HAVE_LZ4)
735      else if (compformat == BLOSC_LZ4_FORMAT) {
736        nbytes = lz4_wrap_decompress((char *)src, (size_t)cbytes,
737                                     (char*)_tmp, (size_t)neblock);
738      }
739      #endif /*  HAVE_LZ4 */
740      #if defined(HAVE_SNAPPY)
741      else if (compformat == BLOSC_SNAPPY_FORMAT) {
742        nbytes = snappy_wrap_decompress((char *)src, (size_t)cbytes,
743                                        (char*)_tmp, (size_t)neblock);
744      }
745      #endif /*  HAVE_SNAPPY */
746      #if defined(HAVE_ZLIB)
747      else if (compformat == BLOSC_ZLIB_FORMAT) {
748        nbytes = zlib_wrap_decompress((char *)src, (size_t)cbytes,
749                                      (char*)_tmp, (size_t)neblock);
750      }
751      #endif /*  HAVE_ZLIB */
752      #if defined(HAVE_ZSTD)
753      else if (compformat == BLOSC_ZSTD_FORMAT) {
754        nbytes = zstd_wrap_decompress((char*)src, (size_t)cbytes,
755                                      (char*)_tmp, (size_t)neblock);
756      }
757      #endif /*  HAVE_ZSTD */
758      else {
759        compname = clibcode_to_clibname(compformat);
760        fprintf(stderr,
761                "Blosc has not been compiled with decompression "
762                "support for '%s' format. ", compname);
763        fprintf(stderr, "Please recompile for adding this support.\n");
764        return -5;    /* signals no decompression support */
765      }
766
767      /* Check that decompressed bytes number is correct */
768      if (nbytes != neblock) {
769          return -2;
770      }
771
772    }
773    src += cbytes;
774    ctbytes += cbytes;
775    _tmp += nbytes;
776    ntbytes += nbytes;
777  } /* Closes j < nsplits */
778
779  if (*(context->header_flags) & BLOSC_DOSHUFFLE & (typesize > 1)) {
780    unshuffle(typesize, blocksize, tmp, dest);
781  }
782  else if (*(context->header_flags) & BLOSC_DOBITSHUFFLE) {
783    bscount = bitunshuffle(typesize, blocksize, tmp, dest, tmp2);
784    if (bscount < 0)
785      return bscount;
786  }
787
788  /* Return the number of uncompressed bytes */
789  return ntbytes;
790}
791
792
793/* Serial version for compression/decompression */
794static int serial_blosc(struct blosc_context* context)
795{
796  int32_t j, bsize, leftoverblock;
797  int32_t cbytes;
798
799  int32_t ebsize = context->blocksize + context->typesize * (int32_t)sizeof(int32_t);
800  int32_t ntbytes = context->num_output_bytes;
801
802  uint8_t *tmp = my_malloc(context->blocksize + ebsize);
803  uint8_t *tmp2 = tmp + context->blocksize;
804
805  for (j = 0; j < context->nblocks; j++) {
806    if (context->compress && !(*(context->header_flags) & BLOSC_MEMCPYED)) {
807      _sw32(context->bstarts + j * 4, ntbytes);
808    }
809    bsize = context->blocksize;
810    leftoverblock = 0;
811    if ((j == context->nblocks - 1) && (context->leftover > 0)) {
812      bsize = context->leftover;
813      leftoverblock = 1;
814    }
815    if (context->compress) {
816      if (*(context->header_flags) & BLOSC_MEMCPYED) {
817        /* We want to memcpy only */
818        memcpy(context->dest+BLOSC_MAX_OVERHEAD+j*context->blocksize,
819                context->src+j*context->blocksize,
820                bsize);
821        cbytes = bsize;
822      }
823      else {
824        /* Regular compression */
825        cbytes = blosc_c(context, bsize, leftoverblock, ntbytes,
826                         context->destsize, context->src+j*context->blocksize,
827                         context->dest+ntbytes, tmp, tmp2);
828        if (cbytes == 0) {
829          ntbytes = 0;              /* uncompressible data */
830          break;
831        }
832      }
833    }
834    else {
835      if (*(context->header_flags) & BLOSC_MEMCPYED) {
836        /* We want to memcpy only */
837        memcpy(context->dest+j*context->blocksize,
838                context->src+BLOSC_MAX_OVERHEAD+j*context->blocksize,
839                bsize);
840        cbytes = bsize;
841      }
842      else {
843        /* Regular decompression */
844        cbytes = blosc_d(context, bsize, leftoverblock,
845                          context->src + sw32_(context->bstarts + j * 4),
846                          context->dest+j*context->blocksize, tmp, tmp2);
847      }
848    }
849    if (cbytes < 0) {
850      ntbytes = cbytes;         /* error in blosc_c or blosc_d */
851      break;
852    }
853    ntbytes += cbytes;
854  }
855
856  // Free temporaries
857  my_free(tmp);
858
859  return ntbytes;
860}
861
862
863/* Threaded version for compression/decompression */
864static int parallel_blosc(struct blosc_context* context)
865{
866  int rc;
867
868  /* Check whether we need to restart threads */
869  blosc_set_nthreads_(context);
870
871  /* Set sentinels */
872  context->thread_giveup_code = 1;
873  context->thread_nblock = -1;
874
875  /* Synchronization point for all threads (wait for initialization) */
876  WAIT_INIT(-1, context);
877
878  /* Synchronization point for all threads (wait for finalization) */
879  WAIT_FINISH(-1, context);
880
881  if (context->thread_giveup_code > 0) {
882    /* Return the total bytes (de-)compressed in threads */
883    return context->num_output_bytes;
884  }
885  else {
886    /* Compression/decompression gave up.  Return error code. */
887    return context->thread_giveup_code;
888  }
889}
890
891
892/* Do the compression or decompression of the buffer depending on the
893   global params. */
894static int do_job(struct blosc_context* context)
895{
896  int32_t ntbytes;
897
898  /* Run the serial version when nthreads is 1 or when the buffers are
899     not much larger than blocksize */
900  if (context->numthreads == 1 || (context->sourcesize / context->blocksize) <= 1) {
901    ntbytes = serial_blosc(context);
902  }
903  else {
904    ntbytes = parallel_blosc(context);
905  }
906
907  return ntbytes;
908}
909
910
911static int32_t compute_blocksize(struct blosc_context* context, int32_t clevel,
912                                 int32_t typesize, int32_t nbytes,
913                                 int32_t forced_blocksize)
914{
915  int32_t blocksize;
916
917  /* Protection against very small buffers */
918  if (nbytes < (int32_t)typesize) {
919    return 1;
920  }
921
922  blocksize = nbytes;           /* Start by a whole buffer as blocksize */
923
924  if (forced_blocksize) {
925    blocksize = forced_blocksize;
926    /* Check that forced blocksize is not too small */
927    if (blocksize < MIN_BUFFERSIZE) {
928      blocksize = MIN_BUFFERSIZE;
929    }
930  }
931  else if (nbytes >= L1) {
932    blocksize = L1;
933
934    /* For LZ4HC, increase the block sizes by a factor of 8 because it
935       is meant for compressing large blocks (it shows a big overhead
936       when compressing small ones). */
937    if (context->compcode == BLOSC_LZ4HC) {
938      blocksize *= 8;
939    }
940
941    /* For Zlib, increase the block sizes by a factor of 8 because it
942       is meant for compressing large blocks (it shows a big overhead
943       when compressing small ones). */
944    if (context->compcode == BLOSC_ZLIB) {
945      blocksize *= 8;
946    }
947
948    /* For Zstd, increase the block sizes by a factor of 8 because it
949       is meant for compressing large blocks (it shows a big overhead
950       when compressing small ones). */
951    if (context->compcode == BLOSC_ZSTD) {
952      blocksize *= 8;
953    }
954
955    if (clevel == 0) {
956      blocksize /= 4;
957    }
958    else if (clevel <= 3) {
959      blocksize /= 2;
960    }
961    else if (clevel <= 5) {
962      blocksize *= 1;
963    }
964    else if (clevel <= 6) {
965      blocksize *= 2;
966    }
967    else if (clevel < 9) {
968      blocksize *= 4;
969    }
970    else {
971      blocksize *= 16;
972    }
973  }
974
975  /* Check that blocksize is not too large */
976  if (blocksize > (int32_t)nbytes) {
977    blocksize = nbytes;
978  }
979
980  /* blocksize *must absolutely* be a multiple of the typesize */
981  if (blocksize > typesize) {
982    blocksize = blocksize / typesize * typesize;
983  }
984
985  return blocksize;
986}
987
988static int initialize_context_compression(struct blosc_context* context,
989                          int clevel,
990                          int doshuffle,
991                          size_t typesize,
992                          size_t sourcesize,
993                          const void* src,
994                          void* dest,
995                          size_t destsize,
996                          int32_t compressor,
997                          int32_t blocksize,
998                          int32_t numthreads)
999{
1000  /* Set parameters */
1001  context->compress = 1;
1002  context->src = (const uint8_t*)src;
1003  context->dest = (uint8_t *)(dest);
1004  context->num_output_bytes = 0;
1005  context->destsize = (int32_t)destsize;
1006  context->sourcesize = sourcesize;
1007  context->typesize = typesize;
1008  context->compcode = compressor;
1009  context->numthreads = numthreads;
1010  context->end_threads = 0;
1011  context->clevel = clevel;
1012
1013  /* Check buffer size limits */
1014  if (sourcesize > BLOSC_MAX_BUFFERSIZE) {
1015    /* If buffer is too large, give up. */
1016    fprintf(stderr, "Input buffer size cannot exceed %d bytes\n",
1017            BLOSC_MAX_BUFFERSIZE);
1018    return -1;
1019  }
1020
1021  /* Compression level */
1022  if (clevel < 0 || clevel > 9) {
1023    /* If clevel not in 0..9, print an error */
1024    fprintf(stderr, "`clevel` parameter must be between 0 and 9!\n");
1025    return -10;
1026  }
1027
1028  /* Shuffle */
1029  if (doshuffle != 0 && doshuffle != 1 && doshuffle != 2) {
1030    fprintf(stderr, "`shuffle` parameter must be either 0, 1 or 2!\n");
1031    return -10;
1032  }
1033
1034  /* Check typesize limits */
1035  if (context->typesize > BLOSC_MAX_TYPESIZE) {
1036    /* If typesize is too large, treat buffer as an 1-byte stream. */
1037    context->typesize = 1;
1038  }
1039
1040  /* Get the blocksize */
1041  context->blocksize = compute_blocksize(context, clevel, (int32_t)context->typesize, context->sourcesize, blocksize);
1042
1043  /* Compute number of blocks in buffer */
1044  context->nblocks = context->sourcesize / context->blocksize;
1045  context->leftover = context->sourcesize % context->blocksize;
1046  context->nblocks = (context->leftover > 0) ? (context->nblocks + 1) : context->nblocks;
1047
1048  return 1;
1049}
1050
1051static int write_compression_header(struct blosc_context* context, int clevel, int doshuffle)
1052{
1053  int32_t compformat;
1054
1055  /* Write version header for this block */
1056  context->dest[0] = BLOSC_VERSION_FORMAT;              /* blosc format version */
1057
1058  /* Write compressor format */
1059  compformat = -1;
1060  switch (context->compcode)
1061  {
1062  case BLOSC_BLOSCLZ:
1063    compformat = BLOSC_BLOSCLZ_FORMAT;
1064    context->dest[1] = BLOSC_BLOSCLZ_VERSION_FORMAT; /* blosclz format version */
1065    break;
1066
1067#if defined(HAVE_LZ4)
1068  case BLOSC_LZ4:
1069    compformat = BLOSC_LZ4_FORMAT;
1070    context->dest[1] = BLOSC_LZ4_VERSION_FORMAT;  /* lz4 format version */
1071    break;
1072  case BLOSC_LZ4HC:
1073    compformat = BLOSC_LZ4HC_FORMAT;
1074    context->dest[1] = BLOSC_LZ4HC_VERSION_FORMAT; /* lz4hc is the same as lz4 */
1075    break;
1076#endif /* HAVE_LZ4 */
1077
1078#if defined(HAVE_SNAPPY)
1079  case BLOSC_SNAPPY:
1080    compformat = BLOSC_SNAPPY_FORMAT;
1081    context->dest[1] = BLOSC_SNAPPY_VERSION_FORMAT;    /* snappy format version */
1082    break;
1083#endif /* HAVE_SNAPPY */
1084
1085#if defined(HAVE_ZLIB)
1086  case BLOSC_ZLIB:
1087    compformat = BLOSC_ZLIB_FORMAT;
1088    context->dest[1] = BLOSC_ZLIB_VERSION_FORMAT;      /* zlib format version */
1089    break;
1090#endif /* HAVE_ZLIB */
1091
1092#if defined(HAVE_ZSTD)
1093  case BLOSC_ZSTD:
1094    compformat = BLOSC_ZSTD_FORMAT;
1095    context->dest[1] = BLOSC_ZSTD_VERSION_FORMAT;      /* zstd format version */
1096    break;
1097#endif /* HAVE_ZSTD */
1098
1099  default:
1100  {
1101    char *compname;
1102    compname = clibcode_to_clibname(compformat);
1103    fprintf(stderr, "Blosc has not been compiled with '%s' ", compname);
1104    fprintf(stderr, "compression support.  Please use one having it.");
1105    return -5;    /* signals no compression support */
1106    break;
1107  }
1108  }
1109
1110  context->header_flags = context->dest+2;  /* flags */
1111  context->dest[2] = 0;  /* zeroes flags */
1112  context->dest[3] = (uint8_t)context->typesize;  /* type size */
1113  _sw32(context->dest + 4, context->sourcesize);  /* size of the buffer */
1114  _sw32(context->dest + 8, context->blocksize);  /* block size */
1115  context->bstarts = context->dest + 16;  /* starts for every block */
1116  context->num_output_bytes = 16 + sizeof(int32_t)*context->nblocks;  /* space for header and pointers */
1117
1118  if (context->clevel == 0) {
1119    /* Compression level 0 means buffer to be memcpy'ed */
1120    *(context->header_flags) |= BLOSC_MEMCPYED;
1121  }
1122
1123  if (context->sourcesize < MIN_BUFFERSIZE) {
1124    /* Buffer is too small.  Try memcpy'ing. */
1125    *(context->header_flags) |= BLOSC_MEMCPYED;
1126  }
1127
1128  if (doshuffle == BLOSC_SHUFFLE) {
1129    /* Byte-shuffle is active */
1130    *(context->header_flags) |= BLOSC_DOSHUFFLE;     /* bit 0 set to one in flags */
1131  }
1132
1133  if (doshuffle == BLOSC_BITSHUFFLE) {
1134    /* Bit-shuffle is active */
1135    *(context->header_flags) |= BLOSC_DOBITSHUFFLE;  /* bit 2 set to one in flags */
1136  }
1137
1138  *(context->header_flags) |= compformat << 5;      /* compressor format start at bit 5 */
1139
1140  return 1;
1141}
1142
1143int blosc_compress_context(struct blosc_context* context)
1144{
1145  int32_t ntbytes = 0;
1146
1147  if (!(*(context->header_flags) & BLOSC_MEMCPYED)) {
1148    /* Do the actual compression */
1149    ntbytes = do_job(context);
1150    if (ntbytes < 0) {
1151      return -1;
1152    }
1153    if ((ntbytes == 0) && (context->sourcesize+BLOSC_MAX_OVERHEAD <= context->destsize)) {
1154      /* Last chance for fitting `src` buffer in `dest`.  Update flags
1155       and do a memcpy later on. */
1156      *(context->header_flags) |= BLOSC_MEMCPYED;
1157    }
1158  }
1159
1160  if (*(context->header_flags) & BLOSC_MEMCPYED) {
1161    if (context->sourcesize + BLOSC_MAX_OVERHEAD > context->destsize) {
1162      /* We are exceeding maximum output size */
1163      ntbytes = 0;
1164    }
1165    else {
1166      memcpy(context->dest+BLOSC_MAX_OVERHEAD, context->src,
1167             context->sourcesize);
1168      ntbytes = context->sourcesize + BLOSC_MAX_OVERHEAD;
1169    }
1170  }
1171
1172  /* Set the number of compressed bytes in header */
1173  _sw32(context->dest + 12, ntbytes);
1174
1175  assert(ntbytes <= context->destsize);
1176  return ntbytes;
1177}
1178
1179/* The public routine for compression with context. */
1180int blosc_compress_ctx(int clevel, int doshuffle, size_t typesize,
1181                       size_t nbytes, const void* src, void* dest,
1182                       size_t destsize, const char* compressor,
1183                       size_t blocksize, int numinternalthreads)
1184{
1185  int error, result;
1186  struct blosc_context context;
1187
1188  context.threads_started = 0;
1189  error = initialize_context_compression(&context, clevel, doshuffle, typesize,
1190                                         nbytes, src, dest, destsize,
1191                                         blosc_compname_to_compcode(compressor),
1192                                         blocksize, numinternalthreads);
1193  if (error < 0) { return error; }
1194
1195  error = write_compression_header(&context, clevel, doshuffle);
1196  if (error < 0) { return error; }
1197
1198  result = blosc_compress_context(&context);
1199
1200  if (numinternalthreads > 1)
1201  {
1202    blosc_release_threadpool(&context);
1203  }
1204
1205  return result;
1206}
1207
1208/* The public routine for compression.  See blosc.h for docstrings. */
1209int blosc_compress(int clevel, int doshuffle, size_t typesize, size_t nbytes,
1210                   const void *src, void *dest, size_t destsize)
1211{
1212  int error;
1213  int result;
1214  char* envvar;
1215
1216  /* Check if should initialize */
1217  if (!g_initlib) blosc_init();
1218
1219  /* Check for a BLOSC_CLEVEL environment variable */
1220  envvar = getenv("BLOSC_CLEVEL");
1221  if (envvar != NULL) {
1222    long value;
1223    value = strtol(envvar, NULL, 10);
1224    if ((value != EINVAL) && (value >= 0)) {
1225      clevel = (int)value;
1226    }
1227  }
1228
1229  /* Check for a BLOSC_SHUFFLE environment variable */
1230  envvar = getenv("BLOSC_SHUFFLE");
1231  if (envvar != NULL) {
1232    if (strcmp(envvar, "NOSHUFFLE") == 0) {
1233      doshuffle = BLOSC_NOSHUFFLE;
1234    }
1235    if (strcmp(envvar, "SHUFFLE") == 0) {
1236      doshuffle = BLOSC_SHUFFLE;
1237    }
1238    if (strcmp(envvar, "BITSHUFFLE") == 0) {
1239      doshuffle = BLOSC_BITSHUFFLE;
1240    }
1241  }
1242
1243  /* Check for a BLOSC_TYPESIZE environment variable */
1244  envvar = getenv("BLOSC_TYPESIZE");
1245  if (envvar != NULL) {
1246    long value;
1247    value = strtol(envvar, NULL, 10);
1248    if ((value != EINVAL) && (value > 0)) {
1249      typesize = (int)value;
1250    }
1251  }
1252
1253  /* Check for a BLOSC_COMPRESSOR environment variable */
1254  envvar = getenv("BLOSC_COMPRESSOR");
1255  if (envvar != NULL) {
1256    result = blosc_set_compressor(envvar);
1257    if (result < 0) { return result; }
1258  }
1259
1260  /* Check for a BLOSC_COMPRESSOR environment variable */
1261  envvar = getenv("BLOSC_BLOCKSIZE");
1262  if (envvar != NULL) {
1263    long blocksize;
1264    blocksize = strtol(envvar, NULL, 10);
1265    if ((blocksize != EINVAL) && (blocksize > 0)) {
1266      blosc_set_blocksize((size_t)blocksize);
1267    }
1268  }
1269
1270  /* Check for a BLOSC_NTHREADS environment variable */
1271  envvar = getenv("BLOSC_NTHREADS");
1272  if (envvar != NULL) {
1273    long nthreads;
1274    nthreads = strtol(envvar, NULL, 10);
1275    if ((nthreads != EINVAL) && (nthreads > 0)) {
1276      result = blosc_set_nthreads((int)nthreads);
1277      if (result < 0) { return result; }
1278    }
1279  }
1280
1281  /* Check for a BLOSC_NOLOCK environment variable.  It is important
1282     that this should be the last env var so that it can take the
1283     previous ones into account */
1284  envvar = getenv("BLOSC_NOLOCK");
1285  if (envvar != NULL) {
1286    char *compname;
1287    blosc_compcode_to_compname(g_compressor, &compname);
1288    result = blosc_compress_ctx(clevel, doshuffle, typesize,
1289                                nbytes, src, dest, destsize,
1290                                compname, g_force_blocksize, g_threads);
1291    return result;
1292  }
1293
1294  pthread_mutex_lock(&global_comp_mutex);
1295
1296  error = initialize_context_compression(g_global_context, clevel, doshuffle,
1297                                         typesize, nbytes, src, dest, destsize,
1298                                         g_compressor, g_force_blocksize,
1299                                         g_threads);
1300  if (error < 0) { return error; }
1301
1302  error = write_compression_header(g_global_context, clevel, doshuffle);
1303  if (error < 0) { return error; }
1304
1305  result = blosc_compress_context(g_global_context);
1306
1307  pthread_mutex_unlock(&global_comp_mutex);
1308
1309  return result;
1310}
1311
1312int blosc_run_decompression_with_context(struct blosc_context* context,
1313                                         const void* src,
1314                                         void* dest,
1315                                         size_t destsize,
1316                                         int numinternalthreads)
1317{
1318  uint8_t version;
1319  uint8_t versionlz;
1320  uint32_t ctbytes;
1321  int32_t ntbytes;
1322
1323  context->compress = 0;
1324  context->src = (const uint8_t*)src;
1325  context->dest = (uint8_t*)dest;
1326  context->destsize = destsize;
1327  context->num_output_bytes = 0;
1328  context->numthreads = numinternalthreads;
1329  context->end_threads = 0;
1330
1331  /* Read the header block */
1332  version = context->src[0];                        /* blosc format version */
1333  versionlz = context->src[1];                      /* blosclz format version */
1334
1335  context->header_flags = (uint8_t*)(context->src + 2);           /* flags */
1336  context->typesize = (int32_t)context->src[3];      /* typesize */
1337  context->sourcesize = sw32_(context->src + 4);     /* buffer size */
1338  context->blocksize = sw32_(context->src + 8);      /* block size */
1339  ctbytes = sw32_(context->src + 12);               /* compressed buffer size */
1340
1341  /* Unused values */
1342  version += 0;                             /* shut up compiler warning */
1343  versionlz += 0;                           /* shut up compiler warning */
1344  ctbytes += 0;                             /* shut up compiler warning */
1345
1346  context->bstarts = (uint8_t*)(context->src + 16);
1347  /* Compute some params */
1348  /* Total blocks */
1349  context->nblocks = context->sourcesize / context->blocksize;
1350  context->leftover = context->sourcesize % context->blocksize;
1351  context->nblocks = (context->leftover>0)? context->nblocks+1: context->nblocks;
1352
1353  /* Check that we have enough space to decompress */
1354  if (context->sourcesize > (int32_t)destsize) {
1355    return -1;
1356  }
1357
1358  /* Check whether this buffer is memcpy'ed */
1359  if (*(context->header_flags) & BLOSC_MEMCPYED) {
1360      memcpy(dest, (uint8_t *)src+BLOSC_MAX_OVERHEAD, context->sourcesize);
1361      ntbytes = context->sourcesize;
1362  }
1363  else {
1364    /* Do the actual decompression */
1365    ntbytes = do_job(context);
1366    if (ntbytes < 0) {
1367      return -1;
1368    }
1369  }
1370
1371  assert(ntbytes <= (int32_t)destsize);
1372  return ntbytes;
1373}
1374
1375/* The public routine for decompression with context. */
1376int blosc_decompress_ctx(const void *src, void *dest, size_t destsize,
1377                         int numinternalthreads)
1378{
1379  int result;
1380  struct blosc_context context;
1381
1382  context.threads_started = 0;
1383  result = blosc_run_decompression_with_context(&context, src, dest, destsize, numinternalthreads);
1384
1385  if (numinternalthreads > 1)
1386  {
1387    blosc_release_threadpool(&context);
1388  }
1389
1390  return result;
1391}
1392
1393
1394/* The public routine for decompression.  See blosc.h for docstrings. */
1395int blosc_decompress(const void *src, void *dest, size_t destsize)
1396{
1397  int result;
1398  char* envvar;
1399  long nthreads;
1400
1401  /* Check if should initialize */
1402  if (!g_initlib) blosc_init();
1403
1404  /* Check for a BLOSC_NTHREADS environment variable */
1405  envvar = getenv("BLOSC_NTHREADS");
1406  if (envvar != NULL) {
1407    nthreads = strtol(envvar, NULL, 10);
1408    if ((nthreads != EINVAL) && (nthreads > 0)) {
1409      result = blosc_set_nthreads((int)nthreads);
1410      if (result < 0) { return result; }
1411    }
1412  }
1413
1414  /* Check for a BLOSC_NOLOCK environment variable.  It is important
1415     that this should be the last env var so that it can take the
1416     previous ones into account */
1417  envvar = getenv("BLOSC_NOLOCK");
1418  if (envvar != NULL) {
1419    result = blosc_decompress_ctx(src, dest, destsize, g_threads);
1420    return result;
1421  }
1422
1423  pthread_mutex_lock(&global_comp_mutex);
1424
1425  result = blosc_run_decompression_with_context(g_global_context, src, dest,
1426                                                destsize, g_threads);
1427
1428  pthread_mutex_unlock(&global_comp_mutex);
1429
1430  return result;
1431}
1432
1433
1434/* Specific routine optimized for decompression a small number of
1435   items out of a compressed chunk.  This does not use threads because
1436   it would affect negatively to performance. */
1437int blosc_getitem(const void *src, int start, int nitems, void *dest)
1438{
1439  uint8_t *_src=NULL;               /* current pos for source buffer */
1440  uint8_t version, versionlz;       /* versions for compressed header */
1441  uint8_t flags;                    /* flags for header */
1442  int32_t ntbytes = 0;              /* the number of uncompressed bytes */
1443  int32_t nblocks;                  /* number of total blocks in buffer */
1444  int32_t leftover;                 /* extra bytes at end of buffer */
1445  uint8_t *bstarts;                 /* start pointers for each block */
1446  int tmp_init = 0;
1447  int32_t typesize, blocksize, nbytes, ctbytes;
1448  int32_t j, bsize, bsize2, leftoverblock;
1449  int32_t cbytes, startb, stopb;
1450  int stop = start + nitems;
1451  uint8_t *tmp;
1452  uint8_t *tmp2;
1453  uint8_t *tmp3;
1454  int32_t ebsize;
1455
1456  _src = (uint8_t *)(src);
1457
1458  /* Read the header block */
1459  version = _src[0];                        /* blosc format version */
1460  versionlz = _src[1];                      /* blosclz format version */
1461  flags = _src[2];                          /* flags */
1462  typesize = (int32_t)_src[3];              /* typesize */
1463  nbytes = sw32_(_src + 4);                 /* buffer size */
1464  blocksize = sw32_(_src + 8);              /* block size */
1465  ctbytes = sw32_(_src + 12);               /* compressed buffer size */
1466
1467  ebsize = blocksize + typesize * (int32_t)sizeof(int32_t);
1468  tmp = my_malloc(blocksize + ebsize + blocksize);
1469  tmp2 = tmp + blocksize;
1470  tmp3 = tmp + blocksize + ebsize;
1471
1472  version += 0;                             /* shut up compiler warning */
1473  versionlz += 0;                           /* shut up compiler warning */
1474  ctbytes += 0;                             /* shut up compiler warning */
1475
1476  _src += 16;
1477  bstarts = _src;
1478  /* Compute some params */
1479  /* Total blocks */
1480  nblocks = nbytes / blocksize;
1481  leftover = nbytes % blocksize;
1482  nblocks = (leftover>0)? nblocks+1: nblocks;
1483  _src += sizeof(int32_t)*nblocks;
1484
1485  /* Check region boundaries */
1486  if ((start < 0) || (start*typesize > nbytes)) {
1487    fprintf(stderr, "`start` out of bounds");
1488    return -1;
1489  }
1490
1491  if ((stop < 0) || (stop*typesize > nbytes)) {
1492    fprintf(stderr, "`start`+`nitems` out of bounds");
1493    return -1;
1494  }
1495
1496  for (j = 0; j < nblocks; j++) {
1497    bsize = blocksize;
1498    leftoverblock = 0;
1499    if ((j == nblocks - 1) && (leftover > 0)) {
1500      bsize = leftover;
1501      leftoverblock = 1;
1502    }
1503
1504    /* Compute start & stop for each block */
1505    startb = start * typesize - j * blocksize;
1506    stopb = stop * typesize - j * blocksize;
1507    if ((startb >= (int)blocksize) || (stopb <= 0)) {
1508      continue;
1509    }
1510    if (startb < 0) {
1511      startb = 0;
1512    }
1513    if (stopb > (int)blocksize) {
1514      stopb = blocksize;
1515    }
1516    bsize2 = stopb - startb;
1517
1518    /* Do the actual data copy */
1519    if (flags & BLOSC_MEMCPYED) {
1520      /* We want to memcpy only */
1521      memcpy((uint8_t *)dest + ntbytes,
1522          (uint8_t *)src + BLOSC_MAX_OVERHEAD + j*blocksize + startb,
1523             bsize2);
1524      cbytes = bsize2;
1525    }
1526    else {
1527      struct blosc_context context;
1528      /* blosc_d only uses typesize and flags */
1529      context.typesize = typesize;
1530      context.header_flags = &flags;
1531
1532      /* Regular decompression.  Put results in tmp2. */
1533      cbytes = blosc_d(&context, bsize, leftoverblock,
1534                       (uint8_t *)src + sw32_(bstarts + j * 4),
1535                       tmp2, tmp, tmp3);
1536      if (cbytes < 0) {
1537        ntbytes = cbytes;
1538        break;
1539      }
1540      /* Copy to destination */
1541      memcpy((uint8_t *)dest + ntbytes, tmp2 + startb, bsize2);
1542      cbytes = bsize2;
1543    }
1544    ntbytes += cbytes;
1545  }
1546
1547  my_free(tmp);
1548
1549  return ntbytes;
1550}
1551
1552
1553/* Decompress & unshuffle several blocks in a single thread */
1554static void *t_blosc(void *ctxt)
1555{
1556  struct thread_context* context = (struct thread_context*)ctxt;
1557  int32_t cbytes, ntdest;
1558  int32_t tblocks;              /* number of blocks per thread */
1559  int32_t leftover2;
1560  int32_t tblock;               /* limit block on a thread */
1561  int32_t nblock_;              /* private copy of nblock */
1562  int32_t bsize, leftoverblock;
1563  /* Parameters for threads */
1564  int32_t blocksize;
1565  int32_t ebsize;
1566  int32_t compress;
1567  int32_t maxbytes;
1568  int32_t ntbytes;
1569  int32_t flags;
1570  int32_t nblocks;
1571  int32_t leftover;
1572  uint8_t *bstarts;
1573  const uint8_t *src;
1574  uint8_t *dest;
1575  uint8_t *tmp;
1576  uint8_t *tmp2;
1577  uint8_t *tmp3;
1578  int rc;
1579
1580  while(1)
1581  {
1582    /* Synchronization point for all threads (wait for initialization) */
1583    WAIT_INIT(NULL, context->parent_context);
1584
1585    if(context->parent_context->end_threads)
1586    {
1587      break;
1588    }
1589
1590    /* Get parameters for this thread before entering the main loop */
1591    blocksize = context->parent_context->blocksize;
1592    ebsize = blocksize + context->parent_context->typesize * (int32_t)sizeof(int32_t);
1593    compress = context->parent_context->compress;
1594    flags = *(context->parent_context->header_flags);
1595    maxbytes = context->parent_context->destsize;
1596    nblocks = context->parent_context->nblocks;
1597    leftover = context->parent_context->leftover;
1598    bstarts = context->parent_context->bstarts;
1599    src = context->parent_context->src;
1600    dest = context->parent_context->dest;
1601
1602    if (blocksize > context->tmpblocksize)
1603    {
1604      my_free(context->tmp);
1605      context->tmp = my_malloc(blocksize + ebsize + blocksize);
1606      context->tmp2 = context->tmp + blocksize;
1607      context->tmp3 = context->tmp + blocksize + ebsize;
1608    }
1609
1610    tmp = context->tmp;
1611    tmp2 = context->tmp2;
1612    tmp3 = context->tmp3;
1613
1614    ntbytes = 0;                /* only useful for decompression */
1615
1616    if (compress && !(flags & BLOSC_MEMCPYED)) {
1617      /* Compression always has to follow the block order */
1618      pthread_mutex_lock(&context->parent_context->count_mutex);
1619      context->parent_context->thread_nblock++;
1620      nblock_ = context->parent_context->thread_nblock;
1621      pthread_mutex_unlock(&context->parent_context->count_mutex);
1622      tblock = nblocks;
1623    }
1624    else {
1625      /* Decompression can happen using any order.  We choose
1626       sequential block order on each thread */
1627
1628      /* Blocks per thread */
1629      tblocks = nblocks / context->parent_context->numthreads;
1630      leftover2 = nblocks % context->parent_context->numthreads;
1631      tblocks = (leftover2>0)? tblocks+1: tblocks;
1632
1633      nblock_ = context->tid*tblocks;
1634      tblock = nblock_ + tblocks;
1635      if (tblock > nblocks) {
1636        tblock = nblocks;
1637      }
1638    }
1639
1640    /* Loop over blocks */
1641    leftoverblock = 0;
1642    while ((nblock_ < tblock) && context->parent_context->thread_giveup_code > 0) {
1643      bsize = blocksize;
1644      if (nblock_ == (nblocks - 1) && (leftover > 0)) {
1645        bsize = leftover;
1646        leftoverblock = 1;
1647      }
1648      if (compress) {
1649        if (flags & BLOSC_MEMCPYED) {
1650          /* We want to memcpy only */
1651          memcpy(dest+BLOSC_MAX_OVERHEAD+nblock_*blocksize,
1652                 src+nblock_*blocksize, bsize);
1653          cbytes = bsize;
1654        }
1655        else {
1656          /* Regular compression */
1657          cbytes = blosc_c(context->parent_context, bsize, leftoverblock, 0, ebsize,
1658                           src+nblock_*blocksize, tmp2, tmp, tmp3);
1659        }
1660      }
1661      else {
1662        if (flags & BLOSC_MEMCPYED) {
1663          /* We want to memcpy only */
1664          memcpy(dest+nblock_*blocksize,
1665                 src+BLOSC_MAX_OVERHEAD+nblock_*blocksize, bsize);
1666          cbytes = bsize;
1667        }
1668        else {
1669          cbytes = blosc_d(context->parent_context, bsize, leftoverblock,
1670                           src + sw32_(bstarts + nblock_ * 4),
1671                           dest+nblock_*blocksize,
1672                           tmp, tmp2);
1673        }
1674      }
1675
1676      /* Check whether current thread has to giveup */
1677      if (context->parent_context->thread_giveup_code <= 0) {
1678        break;
1679      }
1680
1681      /* Check results for the compressed/decompressed block */
1682      if (cbytes < 0) {            /* compr/decompr failure */
1683        /* Set giveup_code error */
1684        pthread_mutex_lock(&context->parent_context->count_mutex);
1685        context->parent_context->thread_giveup_code = cbytes;
1686        pthread_mutex_unlock(&context->parent_context->count_mutex);
1687        break;
1688      }
1689
1690      if (compress && !(flags & BLOSC_MEMCPYED)) {
1691        /* Start critical section */
1692        pthread_mutex_lock(&context->parent_context->count_mutex);
1693        ntdest = context->parent_context->num_output_bytes;
1694        _sw32(bstarts + nblock_ * 4, ntdest); /* update block start counter */
1695        if ( (cbytes == 0) || (ntdest+cbytes > maxbytes) ) {
1696          context->parent_context->thread_giveup_code = 0;  /* uncompressible buffer */
1697          pthread_mutex_unlock(&context->parent_context->count_mutex);
1698          break;
1699        }
1700        context->parent_context->thread_nblock++;
1701        nblock_ = context->parent_context->thread_nblock;
1702        context->parent_context->num_output_bytes += cbytes;           /* update return bytes counter */
1703        pthread_mutex_unlock(&context->parent_context->count_mutex);
1704        /* End of critical section */
1705
1706        /* Copy the compressed buffer to destination */
1707        memcpy(dest+ntdest, tmp2, cbytes);
1708      }
1709      else {
1710        nblock_++;
1711        /* Update counter for this thread */
1712        ntbytes += cbytes;
1713      }
1714
1715    } /* closes while (nblock_) */
1716
1717    /* Sum up all the bytes decompressed */
1718    if ((!compress || (flags & BLOSC_MEMCPYED)) && context->parent_context->thread_giveup_code > 0) {
1719      /* Update global counter for all threads (decompression only) */
1720      pthread_mutex_lock(&context->parent_context->count_mutex);
1721      context->parent_context->num_output_bytes += ntbytes;
1722      pthread_mutex_unlock(&context->parent_context->count_mutex);
1723    }
1724
1725    /* Meeting point for all threads (wait for finalization) */
1726    WAIT_FINISH(NULL, context->parent_context);
1727  }
1728
1729  /* Cleanup our working space and context */
1730  my_free(context->tmp);
1731  my_free(context);
1732
1733  return(NULL);
1734}
1735
1736
1737static int init_threads(struct blosc_context* context)
1738{
1739  int32_t tid;
1740  int rc2;
1741  int32_t ebsize;
1742  struct thread_context* thread_context;
1743
1744  /* Initialize mutex and condition variable objects */
1745  pthread_mutex_init(&context->count_mutex, NULL);
1746
1747  /* Set context thread sentinels */
1748  context->thread_giveup_code = 1;
1749  context->thread_nblock = -1;
1750
1751  /* Barrier initialization */
1752#ifdef _POSIX_BARRIERS_MINE
1753  pthread_barrier_init(&context->barr_init, NULL, context->numthreads+1);
1754  pthread_barrier_init(&context->barr_finish, NULL, context->numthreads+1);
1755#else
1756  pthread_mutex_init(&context->count_threads_mutex, NULL);
1757  pthread_cond_init(&context->count_threads_cv, NULL);
1758  context->count_threads = 0;      /* Reset threads counter */
1759#endif
1760
1761#if !defined(_WIN32)
1762  /* Initialize and set thread detached attribute */
1763  pthread_attr_init(&context->ct_attr);
1764  pthread_attr_setdetachstate(&context->ct_attr, PTHREAD_CREATE_JOINABLE);
1765#endif
1766
1767  /* Finally, create the threads in detached state */
1768  for (tid = 0; tid < context->numthreads; tid++) {
1769    context->tids[tid] = tid;
1770
1771    /* Create a thread context thread owns context (will destroy when finished) */
1772    thread_context = (struct thread_context*)my_malloc(sizeof(struct thread_context));
1773    thread_context->parent_context = context;
1774    thread_context->tid = tid;
1775
1776    ebsize = context->blocksize + context->typesize * (int32_t)sizeof(int32_t);
1777    thread_context->tmp = my_malloc(context->blocksize + ebsize + context->blocksize);
1778    thread_context->tmp2 = thread_context->tmp + context->blocksize;
1779    thread_context->tmp3 = thread_context->tmp + context->blocksize + ebsize;
1780    thread_context->tmpblocksize = context->blocksize;
1781
1782#if !defined(_WIN32)
1783    rc2 = pthread_create(&context->threads[tid], &context->ct_attr, t_blosc, (void *)thread_context);
1784#else
1785    rc2 = pthread_create(&context->threads[tid], NULL, t_blosc, (void *)thread_context);
1786#endif
1787    if (rc2) {
1788      fprintf(stderr, "ERROR; return code from pthread_create() is %d\n", rc2);
1789      fprintf(stderr, "\tError detail: %s\n", strerror(rc2));
1790      return(-1);
1791    }
1792  }
1793
1794
1795  return(0);
1796}
1797
1798int blosc_get_nthreads(void)
1799{
1800  int ret = g_threads;
1801
1802  return ret;
1803}
1804
1805int blosc_set_nthreads(int nthreads_new)
1806{
1807  int ret = g_threads;
1808
1809  /* Check if should initialize */
1810  if (!g_initlib) blosc_init();
1811
1812  if (nthreads_new != ret){
1813    /* Re-initialize Blosc */
1814    blosc_destroy();
1815    blosc_init();
1816    g_threads = nthreads_new;
1817  }
1818
1819  return ret;
1820}
1821
1822int blosc_set_nthreads_(struct blosc_context* context)
1823{
1824  if (context->numthreads > BLOSC_MAX_THREADS) {
1825    fprintf(stderr,
1826            "Error.  nthreads cannot be larger than BLOSC_MAX_THREADS (%d)",
1827            BLOSC_MAX_THREADS);
1828    return -1;
1829  }
1830  else if (context->numthreads <= 0) {
1831    fprintf(stderr, "Error.  nthreads must be a positive integer");
1832    return -1;
1833  }
1834
1835  /* Launch a new pool of threads */
1836  if (context->numthreads > 1 && context->numthreads != context->threads_started) {
1837    blosc_release_threadpool(context);
1838    init_threads(context);
1839  }
1840
1841  /* We have now started the threads */
1842  context->threads_started = context->numthreads;
1843
1844  return context->numthreads;
1845}
1846
1847char* blosc_get_compressor(void)
1848{
1849  char* compname;
1850  blosc_compcode_to_compname(g_compressor, &compname);
1851
1852  return compname;
1853}
1854
1855int blosc_set_compressor(const char *compname)
1856{
1857  int code = blosc_compname_to_compcode(compname);
1858
1859  g_compressor = code;
1860
1861  /* Check if should initialize */
1862  if (!g_initlib) blosc_init();
1863
1864  return code;
1865}
1866
1867char* blosc_list_compressors(void)
1868{
1869  static int compressors_list_done = 0;
1870  static char ret[256];
1871
1872  if (compressors_list_done) return ret;
1873  ret[0] = '\0';
1874  strcat(ret, BLOSC_BLOSCLZ_COMPNAME);
1875#if defined(HAVE_LZ4)
1876  strcat(ret, ","); strcat(ret, BLOSC_LZ4_COMPNAME);
1877  strcat(ret, ","); strcat(ret, BLOSC_LZ4HC_COMPNAME);
1878#endif /* HAVE_LZ4 */
1879#if defined(HAVE_SNAPPY)
1880  strcat(ret, ","); strcat(ret, BLOSC_SNAPPY_COMPNAME);
1881#endif /* HAVE_SNAPPY */
1882#if defined(HAVE_ZLIB)
1883  strcat(ret, ","); strcat(ret, BLOSC_ZLIB_COMPNAME);
1884#endif /* HAVE_ZLIB */
1885#if defined(HAVE_ZSTD)
1886  strcat(ret, ","); strcat(ret, BLOSC_ZSTD_COMPNAME);
1887#endif /* HAVE_ZSTD */
1888  compressors_list_done = 1;
1889  return ret;
1890}
1891
1892char* blosc_get_version_string(void)
1893{
1894  static char ret[256];
1895  strcpy(ret, BLOSC_VERSION_STRING);
1896  return ret;
1897}
1898
1899int blosc_get_complib_info(char *compname, char **complib, char **version)
1900{
1901  int clibcode;
1902  char *clibname;
1903  char *clibversion = "unknown";
1904
1905#if (defined(HAVE_LZ4) && defined(LZ4_VERSION_MAJOR)) || (defined(HAVE_SNAPPY) && defined(SNAPPY_VERSION)) || defined(ZSTD_VERSION_MAJOR)
1906  char sbuffer[256];
1907#endif
1908
1909  clibcode = compname_to_clibcode(compname);
1910  clibname = clibcode_to_clibname(clibcode);
1911
1912  /* complib version */
1913  if (clibcode == BLOSC_BLOSCLZ_LIB) {
1914    clibversion = BLOSCLZ_VERSION_STRING;
1915  }
1916#if defined(HAVE_LZ4)
1917  else if (clibcode == BLOSC_LZ4_LIB) {
1918#if defined(LZ4_VERSION_MAJOR)
1919    sprintf(sbuffer, "%d.%d.%d",
1920            LZ4_VERSION_MAJOR, LZ4_VERSION_MINOR, LZ4_VERSION_RELEASE);
1921    clibversion = sbuffer;
1922#endif /* LZ4_VERSION_MAJOR */
1923  }
1924#endif /* HAVE_LZ4 */
1925#if defined(HAVE_SNAPPY)
1926  else if (clibcode == BLOSC_SNAPPY_LIB) {
1927#if defined(SNAPPY_VERSION)
1928    sprintf(sbuffer, "%d.%d.%d", SNAPPY_MAJOR, SNAPPY_MINOR, SNAPPY_PATCHLEVEL);
1929    clibversion = sbuffer;
1930#endif /* SNAPPY_VERSION */
1931  }
1932#endif /* HAVE_SNAPPY */
1933#if defined(HAVE_ZLIB)
1934  else if (clibcode == BLOSC_ZLIB_LIB) {
1935    clibversion = ZLIB_VERSION;
1936  }
1937#endif /* HAVE_ZLIB */
1938#if defined(HAVE_ZSTD)
1939  else if (clibcode == BLOSC_ZSTD_LIB) {
1940    sprintf(sbuffer, "%d.%d.%d",
1941            ZSTD_VERSION_MAJOR, ZSTD_VERSION_MINOR, ZSTD_VERSION_RELEASE);
1942    clibversion = sbuffer;
1943  }
1944#endif /* HAVE_ZSTD */
1945
1946  *complib = strdup(clibname);
1947  *version = strdup(clibversion);
1948  return clibcode;
1949}
1950
1951/* Return `nbytes`, `cbytes` and `blocksize` from a compressed buffer. */
1952void blosc_cbuffer_sizes(const void *cbuffer, size_t *nbytes,
1953                         size_t *cbytes, size_t *blocksize)
1954{
1955  uint8_t *_src = (uint8_t *)(cbuffer);    /* current pos for source buffer */
1956  uint8_t version, versionlz;              /* versions for compressed header */
1957
1958  /* Read the version info (could be useful in the future) */
1959  version = _src[0];                       /* blosc format version */
1960  versionlz = _src[1];                     /* blosclz format version */
1961
1962  version += 0;                            /* shut up compiler warning */
1963  versionlz += 0;                          /* shut up compiler warning */
1964
1965  /* Read the interesting values */
1966  *nbytes = (size_t)sw32_(_src + 4);       /* uncompressed buffer size */
1967  *blocksize = (size_t)sw32_(_src + 8);    /* block size */
1968  *cbytes = (size_t)sw32_(_src + 12);      /* compressed buffer size */
1969}
1970
1971
1972/* Return `typesize` and `flags` from a compressed buffer. */
1973void blosc_cbuffer_metainfo(const void *cbuffer, size_t *typesize,
1974                            int *flags)
1975{
1976  uint8_t *_src = (uint8_t *)(cbuffer);  /* current pos for source buffer */
1977  uint8_t version, versionlz;            /* versions for compressed header */
1978
1979  /* Read the version info (could be useful in the future) */
1980  version = _src[0];                     /* blosc format version */
1981  versionlz = _src[1];                   /* blosclz format version */
1982
1983  version += 0;                             /* shut up compiler warning */
1984  versionlz += 0;                           /* shut up compiler warning */
1985
1986  /* Read the interesting values */
1987  *flags = (int)_src[2];                 /* flags */
1988  *typesize = (size_t)_src[3];           /* typesize */
1989}
1990
1991
1992/* Return version information from a compressed buffer. */
1993void blosc_cbuffer_versions(const void *cbuffer, int *version,
1994                            int *versionlz)
1995{
1996  uint8_t *_src = (uint8_t *)(cbuffer);  /* current pos for source buffer */
1997
1998  /* Read the version info */
1999  *version = (int)_src[0];         /* blosc format version */
2000  *versionlz = (int)_src[1];       /* Lempel-Ziv compressor format version */
2001}
2002
2003
2004/* Return the compressor library/format used in a compressed buffer. */
2005char *blosc_cbuffer_complib(const void *cbuffer)
2006{
2007  uint8_t *_src = (uint8_t *)(cbuffer);  /* current pos for source buffer */
2008  int clibcode;
2009  char *complib;
2010
2011  /* Read the compressor format/library info */
2012  clibcode = (_src[2] & 0xe0) >> 5;
2013  complib = clibcode_to_clibname(clibcode);
2014  return complib;
2015}
2016
2017/* Get the internal blocksize to be used during compression.  0 means
2018   that an automatic blocksize is computed internally. */
2019int blosc_get_blocksize(void)
2020{
2021  return (int)g_force_blocksize;
2022}
2023
2024/* Force the use of a specific blocksize.  If 0, an automatic
2025   blocksize will be used (the default). */
2026void blosc_set_blocksize(size_t size)
2027{
2028  g_force_blocksize = (int32_t)size;
2029}
2030
2031void blosc_init(void)
2032{
2033  /* Return if we are already initialized */
2034  if (g_initlib) return;
2035
2036  pthread_mutex_init(&global_comp_mutex, NULL);
2037  g_global_context = (struct blosc_context*)my_malloc(sizeof(struct blosc_context));
2038  g_global_context->threads_started = 0;
2039  g_initlib = 1;
2040}
2041
2042void blosc_destroy(void)
2043{
2044  /* Return if Blosc is not initialized */
2045  if (!g_initlib) return;
2046
2047  g_initlib = 0;
2048  blosc_release_threadpool(g_global_context);
2049  my_free(g_global_context);
2050  pthread_mutex_destroy(&global_comp_mutex);
2051}
2052
2053int blosc_release_threadpool(struct blosc_context* context)
2054{
2055  int32_t t;
2056  void* status;
2057  int rc;
2058  int rc2;
2059
2060  if (context->threads_started > 0)
2061  {
2062    /* Tell all existing threads to finish */
2063    context->end_threads = 1;
2064
2065    /* Sync threads */
2066    WAIT_INIT(-1, context);
2067
2068    /* Join exiting threads */
2069    for (t=0; t<context->threads_started; t++) {
2070      rc2 = pthread_join(context->threads[t], &status);
2071      if (rc2) {
2072        fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc2);
2073        fprintf(stderr, "\tError detail: %s\n", strerror(rc2));
2074      }
2075    }
2076
2077    /* Release mutex and condition variable objects */
2078    pthread_mutex_destroy(&context->count_mutex);
2079
2080    /* Barriers */
2081  #ifdef _POSIX_BARRIERS_MINE
2082      pthread_barrier_destroy(&context->barr_init);
2083      pthread_barrier_destroy(&context->barr_finish);
2084  #else
2085      pthread_mutex_destroy(&context->count_threads_mutex);
2086      pthread_cond_destroy(&context->count_threads_cv);
2087  #endif
2088
2089      /* Thread attributes */
2090  #if !defined(_WIN32)
2091      pthread_attr_destroy(&context->ct_attr);
2092  #endif
2093
2094  }
2095
2096  context->threads_started = 0;
2097
2098  return 0;
2099}
2100
2101int blosc_free_resources(void)
2102{
2103  /* Return if Blosc is not initialized */
2104  if (!g_initlib) return -1;
2105
2106  return blosc_release_threadpool(g_global_context);
2107}
Note: See TracBrowser for help on using the repository browser.