Ignore:
Timestamp:
08/26/16 19:35:26 (8 years ago)
Author:
Hal Finkel <hfinkel@…>
Branches:
master, pympi
Children:
8ebc79b
Parents:
cda87e9
git-author:
Hal Finkel <hfinkel@…> (08/26/16 19:35:26)
git-committer:
Hal Finkel <hfinkel@…> (08/26/16 19:35:26)
Message:

Upgrade to latest blosc library

blosc git: e394f327ccc78319d90a06af0b88bce07034b8dd

File:
1 edited

Legend:

Unmodified
Added
Removed
  • thirdparty/blosc/blosc.c

    r00587dc r981e22c  
    11/********************************************************************* 
    2   Blosc - Blocked Suffling and Compression Library 
    3  
    4   Author: Francesc Alted <f[email protected]> 
     2  Blosc - Blocked Shuffling and Compression Library 
     3 
     4  Author: Francesc Alted <f[email protected]> 
    55  Creation date: 2009-05-20 
    66 
     
    99 
    1010 
     11#include <stdio.h> 
    1112#include <stdlib.h> 
    12 #include <stdio.h> 
     13#include <errno.h> 
    1314#include <string.h> 
    1415#include <sys/types.h> 
    1516#include <sys/stat.h> 
    1617#include <assert.h> 
     18#if defined(USING_CMAKE) 
     19  #include "config.h" 
     20#endif /*  USING_CMAKE */ 
    1721#include "blosc.h" 
     22#include "shuffle.h" 
    1823#include "blosclz.h" 
    19 #include "shuffle.h" 
     24#if defined(HAVE_LZ4) 
     25  #include "lz4.h" 
     26  #include "lz4hc.h" 
     27#endif /*  HAVE_LZ4 */ 
     28#if defined(HAVE_SNAPPY) 
     29  #include "snappy-c.h" 
     30#endif /*  HAVE_SNAPPY */ 
     31#if defined(HAVE_ZLIB) 
     32  #include "zlib.h" 
     33#endif /*  HAVE_ZLIB */ 
     34#if defined(HAVE_ZSTD) 
     35  #include "zstd.h" 
     36#endif /*  HAVE_ZSTD */ 
    2037 
    2138#if defined(_WIN32) && !defined(__MINGW32__) 
    2239  #include <windows.h> 
    23   #include "win32/stdint-windows.h" 
     40  #include <malloc.h> 
     41 
     42  /* stdint.h only available in VS2010 (VC++ 16.0) and newer */ 
     43  #if defined(_MSC_VER) && _MSC_VER < 1600 
     44    #include "win32/stdint-windows.h" 
     45  #else 
     46    #include <stdint.h> 
     47  #endif 
     48 
    2449  #include <process.h> 
    2550  #define getpid _getpid 
     
    3055#endif  /* _WIN32 */ 
    3156 
    32 #if defined(_WIN32) 
     57#if defined(_WIN32) && !defined(__GNUC__) 
    3358  #include "win32/pthread.h" 
    3459  #include "win32/pthread.c" 
     
    3762#endif 
    3863 
     64/* If C11 is supported, use it's built-in aligned allocation. */ 
     65#if __STDC_VERSION__ >= 201112L 
     66  #include <stdalign.h> 
     67#endif 
     68 
    3969 
    4070/* Some useful units */ 
     
    5080/* The size of L1 cache.  32 KB is quite common nowadays. */ 
    5181#define L1 (32*KB) 
    52  
    53 /* Wrapped function to adjust the number of threads used by blosc */ 
    54 int blosc_set_nthreads_(int); 
    55  
    56 /* Global variables for main logic */ 
    57 static int32_t init_temps_done = 0;    /* temp for compr/decompr initialized? */ 
    58 static int32_t force_blocksize = 0;    /* force the use of a blocksize? */ 
    59 static int pid = 0;                    /* the PID for this process */ 
    60 static int init_lib = 0;               /* is library initalized? */ 
    61  
    62 /* Global variables for threads */ 
    63 static int32_t nthreads = 1;            /* number of desired threads in pool */ 
    64 static int32_t init_threads_done = 0;   /* pool of threads initialized? */ 
    65 static int32_t end_threads = 0;         /* should exisiting threads end? */ 
    66 static int32_t init_sentinels_done = 0; /* sentinels initialized? */ 
    67 static int32_t giveup_code;             /* error code when give up */ 
    68 static int32_t nblock;                  /* block counter */ 
    69 static pthread_t threads[BLOSC_MAX_THREADS];  /* opaque structure for threads */ 
    70 static int32_t tids[BLOSC_MAX_THREADS];       /* ID per each thread */ 
    71 #if !defined(_WIN32) 
    72 static pthread_attr_t ct_attr;          /* creation time attrs for threads */ 
    73 #endif 
    7482 
    7583/* Have problems using posix barriers when symbol value is 200112L */ 
     
    7886#define _POSIX_BARRIERS_MINE 
    7987#endif 
    80  
    8188/* Synchronization variables */ 
    82 static pthread_mutex_t count_mutex; 
     89 
     90 
     91struct blosc_context { 
     92  int32_t compress;               /* 1 if we are doing compression 0 if decompress */ 
     93 
     94  const uint8_t* src; 
     95  uint8_t* dest;                  /* The current pos in the destination buffer */ 
     96  uint8_t* header_flags;          /* Flags for header.  Currently booked: 
     97                                    - 0: byte-shuffled? 
     98                                    - 1: memcpy'ed? 
     99                                    - 2: bit-shuffled? */ 
     100  int32_t sourcesize;             /* Number of bytes in source buffer (or uncompressed bytes in compressed file) */ 
     101  int32_t nblocks;                /* Number of total blocks in buffer */ 
     102  int32_t leftover;               /* Extra bytes at end of buffer */ 
     103  int32_t blocksize;              /* Length of the block in bytes */ 
     104  int32_t typesize;               /* Type size */ 
     105  int32_t num_output_bytes;       /* Counter for the number of output bytes */ 
     106  int32_t destsize;               /* Maximum size for destination buffer */ 
     107  uint8_t* bstarts;               /* Start of the buffer past header info */ 
     108  int32_t compcode;               /* Compressor code to use */ 
     109  int clevel;                     /* Compression level (1-9) */ 
     110 
     111  /* Threading */ 
     112  int32_t numthreads; 
     113  int32_t threads_started; 
     114  int32_t end_threads; 
     115  pthread_t threads[BLOSC_MAX_THREADS]; 
     116  int32_t tids[BLOSC_MAX_THREADS]; 
     117  pthread_mutex_t count_mutex; 
     118  #ifdef _POSIX_BARRIERS_MINE 
     119  pthread_barrier_t barr_init; 
     120  pthread_barrier_t barr_finish; 
     121  #else 
     122  int32_t count_threads; 
     123  pthread_mutex_t count_threads_mutex; 
     124  pthread_cond_t count_threads_cv; 
     125  #endif 
     126  #if !defined(_WIN32) 
     127  pthread_attr_t ct_attr;            /* creation time attrs for threads */ 
     128  #endif 
     129  int32_t thread_giveup_code;               /* error code when give up */ 
     130  int32_t thread_nblock;                    /* block counter */ 
     131}; 
     132 
     133struct thread_context { 
     134  struct blosc_context* parent_context; 
     135  int32_t tid; 
     136  uint8_t* tmp; 
     137  uint8_t* tmp2; 
     138  uint8_t* tmp3; 
     139  int32_t tmpblocksize; /* Used to keep track of how big the temporary buffers are */ 
     140}; 
     141 
     142/* Global context for non-contextual API */ 
     143static struct blosc_context* g_global_context; 
    83144static pthread_mutex_t global_comp_mutex; 
    84 #ifdef _POSIX_BARRIERS_MINE 
    85 static pthread_barrier_t barr_init; 
    86 static pthread_barrier_t barr_finish; 
    87 #else 
    88 static int32_t count_threads; 
    89 static pthread_mutex_t count_threads_mutex; 
    90 static pthread_cond_t count_threads_cv; 
    91 #endif 
    92  
    93  
    94 /* Structure for parameters in (de-)compression threads */ 
    95 static struct thread_data { 
    96   int32_t typesize; 
    97   int32_t blocksize; 
    98   int32_t compress; 
    99   int32_t clevel; 
    100   int32_t flags; 
    101   int32_t memcpyed; 
    102   int32_t ntbytes; 
    103   int32_t nbytes; 
    104   int32_t maxbytes; 
    105   int32_t nblocks; 
    106   int32_t leftover; 
    107   int32_t *bstarts;             /* start pointers for each block */ 
    108   uint8_t *src; 
    109   uint8_t *dest; 
    110   uint8_t *tmp[BLOSC_MAX_THREADS]; 
    111   uint8_t *tmp2[BLOSC_MAX_THREADS]; 
    112 } params; 
    113  
    114  
    115 /* Structure for parameters meant for keeping track of current temporaries */ 
    116 static struct temp_data { 
    117   int32_t nthreads; 
    118   int32_t typesize; 
    119   int32_t blocksize; 
    120 } current_temp; 
    121  
     145static int32_t g_compressor = BLOSC_BLOSCLZ;  /* the compressor to use by default */ 
     146static int32_t g_threads = 1; 
     147static int32_t g_force_blocksize = 0; 
     148static int32_t g_initlib = 0; 
     149 
     150 
     151 
     152/* Wrapped function to adjust the number of threads used by blosc */ 
     153int blosc_set_nthreads_(struct blosc_context*); 
     154 
     155/* Releases the global threadpool */ 
     156int blosc_release_threadpool(struct blosc_context* context); 
    122157 
    123158/* Macros for synchronization */ 
     
    125160/* Wait until all threads are initialized */ 
    126161#ifdef _POSIX_BARRIERS_MINE 
    127 static int rc; 
    128 #define WAIT_INIT \ 
    129   rc = pthread_barrier_wait(&barr_init); \ 
     162#define WAIT_INIT(RET_VAL, CONTEXT_PTR)  \ 
     163  rc = pthread_barrier_wait(&CONTEXT_PTR->barr_init); \ 
    130164  if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { \ 
    131     printf("Could not wait on barrier (init)\n"); \ 
    132     return(-1); \ 
     165    printf("Could not wait on barrier (init): %d\n", rc); \ 
     166    return((RET_VAL));                            \ 
    133167  } 
    134168#else 
    135 #define WAIT_INIT \ 
    136   pthread_mutex_lock(&count_threads_mutex); \ 
    137   if (count_threads < nthreads) { \ 
    138     count_threads++; \ 
    139     pthread_cond_wait(&count_threads_cv, &count_threads_mutex); \ 
     169#define WAIT_INIT(RET_VAL, CONTEXT_PTR)  \ 
     170  pthread_mutex_lock(&CONTEXT_PTR->count_threads_mutex); \ 
     171  if (CONTEXT_PTR->count_threads < CONTEXT_PTR->numthreads) { \ 
     172    CONTEXT_PTR->count_threads++; \ 
     173    pthread_cond_wait(&CONTEXT_PTR->count_threads_cv, &CONTEXT_PTR->count_threads_mutex); \ 
    140174  } \ 
    141175  else { \ 
    142     pthread_cond_broadcast(&count_threads_cv); \ 
     176    pthread_cond_broadcast(&CONTEXT_PTR->count_threads_cv); \ 
    143177  } \ 
    144   pthread_mutex_unlock(&count_threads_mutex); 
     178  pthread_mutex_unlock(&CONTEXT_PTR->count_threads_mutex); 
    145179#endif 
    146180 
    147181/* Wait for all threads to finish */ 
    148182#ifdef _POSIX_BARRIERS_MINE 
    149 #define WAIT_FINISH \ 
    150   rc = pthread_barrier_wait(&barr_finish); \ 
     183#define WAIT_FINISH(RET_VAL, CONTEXT_PTR)  \ 
     184  rc = pthread_barrier_wait(&CONTEXT_PTR->barr_finish); \ 
    151185  if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { \ 
    152186    printf("Could not wait on barrier (finish)\n"); \ 
    153     return(-1);                                       \ 
     187    return((RET_VAL));                              \ 
    154188  } 
    155189#else 
    156 #define WAIT_FINISH \ 
    157   pthread_mutex_lock(&count_threads_mutex); \ 
    158   if (count_threads > 0) { \ 
    159     count_threads--; \ 
    160     pthread_cond_wait(&count_threads_cv, &count_threads_mutex); \ 
     190#define WAIT_FINISH(RET_VAL, CONTEXT_PTR)                          \ 
     191  pthread_mutex_lock(&CONTEXT_PTR->count_threads_mutex); \ 
     192  if (CONTEXT_PTR->count_threads > 0) { \ 
     193    CONTEXT_PTR->count_threads--; \ 
     194    pthread_cond_wait(&CONTEXT_PTR->count_threads_cv, &CONTEXT_PTR->count_threads_mutex); \ 
    161195  } \ 
    162196  else { \ 
    163     pthread_cond_broadcast(&count_threads_cv); \ 
     197    pthread_cond_broadcast(&CONTEXT_PTR->count_threads_cv); \ 
    164198  } \ 
    165   pthread_mutex_unlock(&count_threads_mutex); 
     199  pthread_mutex_unlock(&CONTEXT_PTR->count_threads_mutex); 
    166200#endif 
    167201 
     
    173207  int res = 0; 
    174208 
    175 #if defined(_WIN32) 
     209/* Do an alignment to 32 bytes because AVX2 is supported */ 
     210#if _ISOC11_SOURCE 
     211  /* C11 aligned allocation. 'size' must be a multiple of the alignment. */ 
     212  block = aligned_alloc(32, size); 
     213#elif defined(_WIN32) 
    176214  /* A (void *) cast needed for avoiding a warning with MINGW :-/ */ 
    177   block = (void *)_aligned_malloc(size, 16); 
     215  block = (void *)_aligned_malloc(size, 32); 
    178216#elif defined __APPLE__ 
    179217  /* Mac OS X guarantees 16-byte alignment in small allocs */ 
     
    181219#elif _POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600 
    182220  /* Platform does have an implementation of posix_memalign */ 
    183   res = posix_memalign(&block, 16, size); 
     221  res = posix_memalign(&block, 32, size); 
    184222#else 
    185223  block = malloc(size); 
     
    206244 
    207245 
    208 /* If `a` is little-endian, return it as-is.  If not, return a copy, 
    209    with the endianness changed */ 
    210 static int32_t sw32(int32_t a) 
    211 { 
    212   int32_t tmp; 
    213   char *pa = (char *)&a; 
    214   char *ptmp = (char *)&tmp; 
     246/* Copy 4 bytes from `*pa` to int32_t, changing endianness if necessary. */ 
     247static int32_t sw32_(const uint8_t *pa) 
     248{ 
     249  int32_t idest; 
     250  uint8_t *dest = (uint8_t *)&idest; 
    215251  int i = 1;                    /* for big/little endian detection */ 
    216252  char *p = (char *)&i; 
     
    218254  if (p[0] != 1) { 
    219255    /* big endian */ 
    220     ptmp[0] = pa[3]; 
    221     ptmp[1] = pa[2]; 
    222     ptmp[2] = pa[1]; 
    223     ptmp[3] = pa[0]; 
    224     return tmp; 
     256    dest[0] = pa[3]; 
     257    dest[1] = pa[2]; 
     258    dest[2] = pa[1]; 
     259    dest[3] = pa[0]; 
    225260  } 
    226261  else { 
    227262    /* little endian */ 
    228     return a; 
    229   } 
    230 } 
    231  
     263    dest[0] = pa[0]; 
     264    dest[1] = pa[1]; 
     265    dest[2] = pa[2]; 
     266    dest[3] = pa[3]; 
     267  } 
     268  return idest; 
     269} 
     270 
     271 
     272/* Copy 4 bytes from `*pa` to `*dest`, changing endianness if necessary. */ 
     273static void _sw32(uint8_t* dest, int32_t a) 
     274{ 
     275  uint8_t *pa = (uint8_t *)&a; 
     276  int i = 1;                    /* for big/little endian detection */ 
     277  char *p = (char *)&i; 
     278 
     279  if (p[0] != 1) { 
     280    /* big endian */ 
     281    dest[0] = pa[3]; 
     282    dest[1] = pa[2]; 
     283    dest[2] = pa[1]; 
     284    dest[3] = pa[0]; 
     285  } 
     286  else { 
     287    /* little endian */ 
     288    dest[0] = pa[0]; 
     289    dest[1] = pa[1]; 
     290    dest[2] = pa[2]; 
     291    dest[3] = pa[3]; 
     292  } 
     293} 
     294 
     295 
     296/* 
     297 * Conversion routines between compressor and compression libraries 
     298 */ 
     299 
     300/* Return the library code associated with the compressor name */ 
     301static int compname_to_clibcode(const char *compname) 
     302{ 
     303  if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0) 
     304    return BLOSC_BLOSCLZ_LIB; 
     305  if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0) 
     306    return BLOSC_LZ4_LIB; 
     307  if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0) 
     308    return BLOSC_LZ4_LIB; 
     309  if (strcmp(compname, BLOSC_SNAPPY_COMPNAME) == 0) 
     310    return BLOSC_SNAPPY_LIB; 
     311  if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0) 
     312    return BLOSC_ZLIB_LIB; 
     313  if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0) 
     314    return BLOSC_ZSTD_LIB; 
     315  return -1; 
     316} 
     317 
     318/* Return the library name associated with the compressor code */ 
     319static char *clibcode_to_clibname(int clibcode) 
     320{ 
     321  if (clibcode == BLOSC_BLOSCLZ_LIB) return BLOSC_BLOSCLZ_LIBNAME; 
     322  if (clibcode == BLOSC_LZ4_LIB) return BLOSC_LZ4_LIBNAME; 
     323  if (clibcode == BLOSC_SNAPPY_LIB) return BLOSC_SNAPPY_LIBNAME; 
     324  if (clibcode == BLOSC_ZLIB_LIB) return BLOSC_ZLIB_LIBNAME; 
     325  if (clibcode == BLOSC_ZSTD_LIB) return BLOSC_ZSTD_LIBNAME; 
     326  return NULL;                  /* should never happen */ 
     327} 
     328 
     329 
     330/* 
     331 * Conversion routines between compressor names and compressor codes 
     332 */ 
     333 
     334/* Get the compressor name associated with the compressor code */ 
     335int blosc_compcode_to_compname(int compcode, char **compname) 
     336{ 
     337  int code = -1;    /* -1 means non-existent compressor code */ 
     338  char *name = NULL; 
     339 
     340  /* Map the compressor code */ 
     341  if (compcode == BLOSC_BLOSCLZ) 
     342    name = BLOSC_BLOSCLZ_COMPNAME; 
     343  else if (compcode == BLOSC_LZ4) 
     344    name = BLOSC_LZ4_COMPNAME; 
     345  else if (compcode == BLOSC_LZ4HC) 
     346    name = BLOSC_LZ4HC_COMPNAME; 
     347  else if (compcode == BLOSC_SNAPPY) 
     348    name = BLOSC_SNAPPY_COMPNAME; 
     349  else if (compcode == BLOSC_ZLIB) 
     350    name = BLOSC_ZLIB_COMPNAME; 
     351  else if (compcode == BLOSC_ZSTD) 
     352    name = BLOSC_ZSTD_COMPNAME; 
     353 
     354  *compname = name; 
     355 
     356  /* Guess if there is support for this code */ 
     357  if (compcode == BLOSC_BLOSCLZ) 
     358    code = BLOSC_BLOSCLZ; 
     359#if defined(HAVE_LZ4) 
     360  else if (compcode == BLOSC_LZ4) 
     361    code = BLOSC_LZ4; 
     362  else if (compcode == BLOSC_LZ4HC) 
     363    code = BLOSC_LZ4HC; 
     364#endif /*  HAVE_LZ4 */ 
     365#if defined(HAVE_SNAPPY) 
     366  else if (compcode == BLOSC_SNAPPY) 
     367    code = BLOSC_SNAPPY; 
     368#endif /*  HAVE_SNAPPY */ 
     369#if defined(HAVE_ZLIB) 
     370  else if (compcode == BLOSC_ZLIB) 
     371    code = BLOSC_ZLIB; 
     372#endif /*  HAVE_ZLIB */ 
     373#if defined(HAVE_ZSTD) 
     374  else if (compcode == BLOSC_ZSTD) 
     375    code = BLOSC_ZSTD; 
     376#endif /*  HAVE_ZSTD */ 
     377 
     378  return code; 
     379} 
     380 
     381/* Get the compressor code for the compressor name. -1 if it is not available */ 
     382int blosc_compname_to_compcode(const char *compname) 
     383{ 
     384  int code = -1;  /* -1 means non-existent compressor code */ 
     385 
     386  if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0) { 
     387    code = BLOSC_BLOSCLZ; 
     388  } 
     389#if defined(HAVE_LZ4) 
     390  else if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0) { 
     391    code = BLOSC_LZ4; 
     392  } 
     393  else if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0) { 
     394    code = BLOSC_LZ4HC; 
     395  } 
     396#endif /*  HAVE_LZ4 */ 
     397#if defined(HAVE_SNAPPY) 
     398  else if (strcmp(compname, BLOSC_SNAPPY_COMPNAME) == 0) { 
     399    code = BLOSC_SNAPPY; 
     400  } 
     401#endif /*  HAVE_SNAPPY */ 
     402#if defined(HAVE_ZLIB) 
     403  else if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0) { 
     404    code = BLOSC_ZLIB; 
     405  } 
     406#endif /*  HAVE_ZLIB */ 
     407#if defined(HAVE_ZSTD) 
     408  else if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0) { 
     409    code = BLOSC_ZSTD; 
     410  } 
     411#endif /*  HAVE_ZSTD */ 
     412 
     413return code; 
     414} 
     415 
     416 
     417#if defined(HAVE_LZ4) 
     418static int lz4_wrap_compress(const char* input, size_t input_length, 
     419                             char* output, size_t maxout, int accel) 
     420{ 
     421  int cbytes; 
     422  cbytes = LZ4_compress_fast(input, output, (int)input_length, (int)maxout, 
     423                             accel); 
     424  return cbytes; 
     425} 
     426 
     427static int lz4hc_wrap_compress(const char* input, size_t input_length, 
     428                               char* output, size_t maxout, int clevel) 
     429{ 
     430  int cbytes; 
     431  if (input_length > (size_t)(2<<30)) 
     432    return -1;   /* input larger than 1 GB is not supported */ 
     433  /* clevel for lz4hc goes up to 16, at least in LZ4 1.1.3 */ 
     434  cbytes = LZ4_compressHC2_limitedOutput(input, output, (int)input_length, 
     435                                         (int)maxout, clevel*2-1); 
     436  return cbytes; 
     437} 
     438 
     439static int lz4_wrap_decompress(const char* input, size_t compressed_length, 
     440                               char* output, size_t maxout) 
     441{ 
     442  size_t cbytes; 
     443  cbytes = LZ4_decompress_fast(input, output, (int)maxout); 
     444  if (cbytes != compressed_length) { 
     445    return 0; 
     446  } 
     447  return (int)maxout; 
     448} 
     449 
     450#endif /* HAVE_LZ4 */ 
     451 
     452#if defined(HAVE_SNAPPY) 
     453static int snappy_wrap_compress(const char* input, size_t input_length, 
     454                                char* output, size_t maxout) 
     455{ 
     456  snappy_status status; 
     457  size_t cl = maxout; 
     458  status = snappy_compress(input, input_length, output, &cl); 
     459  if (status != SNAPPY_OK){ 
     460    return 0; 
     461  } 
     462  return (int)cl; 
     463} 
     464 
     465static int snappy_wrap_decompress(const char* input, size_t compressed_length, 
     466                                  char* output, size_t maxout) 
     467{ 
     468  snappy_status status; 
     469  size_t ul = maxout; 
     470  status = snappy_uncompress(input, compressed_length, output, &ul); 
     471  if (status != SNAPPY_OK){ 
     472    return 0; 
     473  } 
     474  return (int)ul; 
     475} 
     476#endif /* HAVE_SNAPPY */ 
     477 
     478#if defined(HAVE_ZLIB) 
     479/* zlib is not very respectful with sharing name space with others. 
     480 Fortunately, its names do not collide with those already in blosc. */ 
     481static int zlib_wrap_compress(const char* input, size_t input_length, 
     482                              char* output, size_t maxout, int clevel) 
     483{ 
     484  int status; 
     485  uLongf cl = maxout; 
     486  status = compress2( 
     487             (Bytef*)output, &cl, (Bytef*)input, (uLong)input_length, clevel); 
     488  if (status != Z_OK){ 
     489    return 0; 
     490  } 
     491  return (int)cl; 
     492} 
     493 
     494static int zlib_wrap_decompress(const char* input, size_t compressed_length, 
     495                                char* output, size_t maxout) 
     496{ 
     497  int status; 
     498  uLongf ul = maxout; 
     499  status = uncompress( 
     500             (Bytef*)output, &ul, (Bytef*)input, (uLong)compressed_length); 
     501  if (status != Z_OK){ 
     502    return 0; 
     503  } 
     504  return (int)ul; 
     505} 
     506#endif /*  HAVE_ZLIB */ 
     507 
     508#if defined(HAVE_ZSTD) 
     509static int zstd_wrap_compress(const char* input, size_t input_length, 
     510                              char* output, size_t maxout, int clevel) { 
     511  size_t code; 
     512  // clevel = (clevel < 9) ? clevel * 2 - 1 : ZSTD_maxCLevel();  // see zstd#254 
     513  clevel = (clevel < 9) ? clevel * 2 - 1 : 22; 
     514  code = ZSTD_compress( 
     515      (void*)output, maxout, (void*)input, input_length, clevel); 
     516  if (ZSTD_isError(code)) { 
     517    return 0; 
     518  } 
     519  return (int)code; 
     520} 
     521 
     522static int zstd_wrap_decompress(const char* input, size_t compressed_length, 
     523                                char* output, size_t maxout) { 
     524  size_t code; 
     525  code = ZSTD_decompress( 
     526      (void*)output, maxout, (void*)input, compressed_length); 
     527  if (ZSTD_isError(code)) { 
     528    fprintf(stderr, "error decompressing with Zstd: %s \n", ZSTD_getErrorName(code)); 
     529    return 0; 
     530  } 
     531  return (int)code; 
     532} 
     533#endif /*  HAVE_ZSTD */ 
     534 
     535/* Compute acceleration for blosclz */ 
     536static int get_accel(const struct blosc_context* context) { 
     537  int32_t clevel = context->clevel; 
     538  int32_t typesize = context->typesize; 
     539 
     540  if (clevel == 9) { 
     541    return 1; 
     542  } 
     543  if (context->compcode == BLOSC_BLOSCLZ) { 
     544    /* Compute the power of 2. See: 
     545     * http://www.exploringbinary.com/ten-ways-to-check-if-an-integer-is-a-power-of-two-in-c/ 
     546     */ 
     547    int32_t tspow2 = ((typesize != 0) && !(typesize & (typesize - 1))); 
     548    if (tspow2 && typesize < 32) { 
     549      return 32; 
     550    } 
     551  } 
     552  else if (context->compcode == BLOSC_LZ4) { 
     553    /* This acceleration setting based on discussions held in: 
     554     * https://groups.google.com/forum/#!topic/lz4c/zosy90P8MQw 
     555     */ 
     556    return (10 - clevel); 
     557  } 
     558  return 1; 
     559} 
    232560 
    233561/* Shuffle & compress a single block */ 
    234 static int blosc_c(int32_t blocksize, int32_t leftoverblock, 
    235                    int32_t ntbytes, int32_t maxbytes, 
    236                    uint8_t *src, uint8_t *dest, uint8_t *tmp) 
     562static int blosc_c(const struct blosc_context* context, int32_t blocksize, 
     563                   int32_t leftoverblock, int32_t ntbytes, int32_t maxbytes, 
     564                   const uint8_t *src, uint8_t *dest, uint8_t *tmp, 
     565                   uint8_t *tmp2) 
    237566{ 
    238567  int32_t j, neblock, nsplits; 
     
    240569  int32_t ctbytes = 0;              /* number of compressed bytes in block */ 
    241570  int32_t maxout; 
    242   int32_t typesize = params.typesize; 
    243   uint8_t *_tmp; 
    244  
    245   if ((params.flags & BLOSC_DOSHUFFLE) && (typesize > 1)) { 
    246     /* Shuffle this block (this makes sense only if typesize > 1) */ 
     571  int32_t typesize = context->typesize; 
     572  const uint8_t *_tmp = src; 
     573  char *compname; 
     574  int accel; 
     575  int bscount; 
     576 
     577  if (*(context->header_flags) & BLOSC_DOSHUFFLE & (typesize > 1)) { 
     578    /* Byte shuffling only makes sense if typesize > 1 */ 
    247579    shuffle(typesize, blocksize, src, tmp); 
    248580    _tmp = tmp; 
    249581  } 
    250   else { 
    251     _tmp = src; 
    252   } 
     582  /* We don't allow more than 1 filter at the same time (yet) */ 
     583  else if (*(context->header_flags) & BLOSC_DOBITSHUFFLE) { 
     584    bscount = bitshuffle(typesize, blocksize, src, tmp, tmp2); 
     585    if (bscount < 0) 
     586      return bscount; 
     587    _tmp = tmp; 
     588  } 
     589 
     590  /* Calculate acceleration for different compressors */ 
     591  accel = get_accel(context); 
    253592 
    254593  /* Compress for each shuffled slice split for this block. */ 
     
    268607    ctbytes += (int32_t)sizeof(int32_t); 
    269608    maxout = neblock; 
     609    #if defined(HAVE_SNAPPY) 
     610    if (context->compcode == BLOSC_SNAPPY) { 
     611      /* TODO perhaps refactor this to keep the value stashed somewhere */ 
     612      maxout = snappy_max_compressed_length(neblock); 
     613    } 
     614    #endif /*  HAVE_SNAPPY */ 
    270615    if (ntbytes+maxout > maxbytes) { 
    271616      maxout = maxbytes - ntbytes;   /* avoid buffer overrun */ 
     
    274619      } 
    275620    } 
    276     cbytes = blosclz_compress(params.clevel, _tmp+j*neblock, neblock, 
    277                               dest, maxout); 
    278     if (cbytes >= maxout) { 
    279       /* Buffer overrun caused by blosclz_compress (should never happen) */ 
     621    if (context->compcode == BLOSC_BLOSCLZ) { 
     622      cbytes = blosclz_compress(context->clevel, _tmp+j*neblock, neblock, 
     623                                dest, maxout, accel); 
     624    } 
     625    #if defined(HAVE_LZ4) 
     626    else if (context->compcode == BLOSC_LZ4) { 
     627      cbytes = lz4_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock, 
     628                                 (char *)dest, (size_t)maxout, accel); 
     629    } 
     630    else if (context->compcode == BLOSC_LZ4HC) { 
     631      cbytes = lz4hc_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock, 
     632                                   (char *)dest, (size_t)maxout, 
     633                                   context->clevel); 
     634    } 
     635    #endif /* HAVE_LZ4 */ 
     636    #if defined(HAVE_SNAPPY) 
     637    else if (context->compcode == BLOSC_SNAPPY) { 
     638      cbytes = snappy_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock, 
     639                                    (char *)dest, (size_t)maxout); 
     640    } 
     641    #endif /* HAVE_SNAPPY */ 
     642    #if defined(HAVE_ZLIB) 
     643    else if (context->compcode == BLOSC_ZLIB) { 
     644      cbytes = zlib_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock, 
     645                                  (char *)dest, (size_t)maxout, 
     646                                  context->clevel); 
     647    } 
     648    #endif /* HAVE_ZLIB */ 
     649    #if defined(HAVE_ZSTD) 
     650    else if (context->compcode == BLOSC_ZSTD) { 
     651      cbytes = zstd_wrap_compress((char*)_tmp + j * neblock, (size_t)neblock, 
     652                                  (char*)dest, (size_t)maxout, context->clevel); 
     653    } 
     654    #endif /* HAVE_ZSTD */ 
     655 
     656    else { 
     657      blosc_compcode_to_compname(context->compcode, &compname); 
     658      fprintf(stderr, "Blosc has not been compiled with '%s' ", compname); 
     659      fprintf(stderr, "compression support.  Please use one having it."); 
     660      return -5;    /* signals no compression support */ 
     661    } 
     662 
     663    if (cbytes > maxout) { 
     664      /* Buffer overrun caused by compression (should never happen) */ 
    280665      return -1; 
    281666    } 
     
    284669      return -2; 
    285670    } 
    286     else if (cbytes == 0) { 
    287       /* The compressor has been unable to compress data significantly. */ 
     671    else if (cbytes == 0 || cbytes == neblock) { 
     672      /* The compressor has been unable to compress data at all. */ 
    288673      /* Before doing the copy, check that we are not running into a 
    289674         buffer overflow. */ 
     
    294679      cbytes = neblock; 
    295680    } 
    296     ((int32_t *)(dest))[-1] = sw32(cbytes); 
     681    _sw32(dest - 4, cbytes); 
    297682    dest += cbytes; 
    298683    ntbytes += cbytes; 
     
    303688} 
    304689 
    305  
    306690/* Decompress & unshuffle a single block */ 
    307 static int blosc_d(int32_t blocksize, int32_t leftoverblock, 
    308                    uint8_t *src, uint8_t *dest, uint8_t *tmp, uint8_t *tmp2) 
     691static int blosc_d(struct blosc_context* context, int32_t blocksize, int32_t leftoverblock, 
     692                   const uint8_t *src, uint8_t *dest, uint8_t *tmp, uint8_t *tmp2) 
    309693{ 
    310694  int32_t j, neblock, nsplits; 
     
    313697  int32_t ctbytes = 0;           /* number of compressed bytes in block */ 
    314698  int32_t ntbytes = 0;           /* number of uncompressed bytes in block */ 
    315   uint8_t *_tmp; 
    316   int32_t typesize = params.typesize; 
    317  
    318   if ((params.flags & BLOSC_DOSHUFFLE) && (typesize > 1)) { 
     699  uint8_t *_tmp = dest; 
     700  int32_t typesize = context->typesize; 
     701  int32_t compformat; 
     702  char *compname; 
     703  int bscount; 
     704 
     705  if ((*(context->header_flags) & BLOSC_DOSHUFFLE & (typesize > 1)) ||  \ 
     706      (*(context->header_flags) & BLOSC_DOBITSHUFFLE)) { 
    319707    _tmp = tmp; 
    320708  } 
    321   else { 
    322     _tmp = dest; 
    323   } 
     709 
     710  compformat = (*(context->header_flags) & 0xe0) >> 5; 
    324711 
    325712  /* Compress for each shuffled slice split for this block. */ 
     
    333720  neblock = blocksize / nsplits; 
    334721  for (j = 0; j < nsplits; j++) { 
    335     cbytes = sw32(((int32_t *)(src))[0]);   /* amount of compressed bytes */ 
     722    cbytes = sw32_(src);      /* amount of compressed bytes */ 
    336723    src += sizeof(int32_t); 
    337724    ctbytes += (int32_t)sizeof(int32_t); 
     
    342729    } 
    343730    else { 
    344       nbytes = blosclz_decompress(src, cbytes, _tmp, neblock); 
     731      if (compformat == BLOSC_BLOSCLZ_FORMAT) { 
     732        nbytes = blosclz_decompress(src, cbytes, _tmp, neblock); 
     733      } 
     734      #if defined(HAVE_LZ4) 
     735      else if (compformat == BLOSC_LZ4_FORMAT) { 
     736        nbytes = lz4_wrap_decompress((char *)src, (size_t)cbytes, 
     737                                     (char*)_tmp, (size_t)neblock); 
     738      } 
     739      #endif /*  HAVE_LZ4 */ 
     740      #if defined(HAVE_SNAPPY) 
     741      else if (compformat == BLOSC_SNAPPY_FORMAT) { 
     742        nbytes = snappy_wrap_decompress((char *)src, (size_t)cbytes, 
     743                                        (char*)_tmp, (size_t)neblock); 
     744      } 
     745      #endif /*  HAVE_SNAPPY */ 
     746      #if defined(HAVE_ZLIB) 
     747      else if (compformat == BLOSC_ZLIB_FORMAT) { 
     748        nbytes = zlib_wrap_decompress((char *)src, (size_t)cbytes, 
     749                                      (char*)_tmp, (size_t)neblock); 
     750      } 
     751      #endif /*  HAVE_ZLIB */ 
     752      #if defined(HAVE_ZSTD) 
     753      else if (compformat == BLOSC_ZSTD_FORMAT) { 
     754        nbytes = zstd_wrap_decompress((char*)src, (size_t)cbytes, 
     755                                      (char*)_tmp, (size_t)neblock); 
     756      } 
     757      #endif /*  HAVE_ZSTD */ 
     758      else { 
     759        compname = clibcode_to_clibname(compformat); 
     760        fprintf(stderr, 
     761                "Blosc has not been compiled with decompression " 
     762                "support for '%s' format. ", compname); 
     763        fprintf(stderr, "Please recompile for adding this support.\n"); 
     764        return -5;    /* signals no decompression support */ 
     765      } 
     766 
     767      /* Check that decompressed bytes number is correct */ 
    345768      if (nbytes != neblock) { 
    346         return -2; 
    347       } 
     769          return -2; 
     770      } 
     771 
    348772    } 
    349773    src += cbytes; 
     
    353777  } /* Closes j < nsplits */ 
    354778 
    355   if ((params.flags & BLOSC_DOSHUFFLE) && (typesize > 1)) { 
    356     if ((uintptr_t)dest % 16 == 0) { 
    357       /* 16-bytes aligned dest.  SSE2 unshuffle will work. */ 
    358       unshuffle(typesize, blocksize, tmp, dest); 
    359     } 
    360     else { 
    361       /* dest is not aligned.  Use tmp2, which is aligned, and copy. */ 
    362       unshuffle(typesize, blocksize, tmp, tmp2); 
    363       if (tmp2 != dest) { 
    364         /* Copy only when dest is not tmp2 (e.g. not blosc_getitem())  */ 
    365         memcpy(dest, tmp2, blocksize); 
    366       } 
    367     } 
     779  if (*(context->header_flags) & BLOSC_DOSHUFFLE & (typesize > 1)) { 
     780    unshuffle(typesize, blocksize, tmp, dest); 
     781  } 
     782  else if (*(context->header_flags) & BLOSC_DOBITSHUFFLE) { 
     783    bscount = bitunshuffle(typesize, blocksize, tmp, dest, tmp2); 
     784    if (bscount < 0) 
     785      return bscount; 
    368786  } 
    369787 
     
    374792 
    375793/* Serial version for compression/decompression */ 
    376 static int serial_blosc(void) 
     794static int serial_blosc(struct blosc_context* context) 
    377795{ 
    378796  int32_t j, bsize, leftoverblock; 
    379797  int32_t cbytes; 
    380   int32_t compress = params.compress; 
    381   int32_t blocksize = params.blocksize; 
    382   int32_t ntbytes = params.ntbytes; 
    383   int32_t flags = params.flags; 
    384   int32_t maxbytes = params.maxbytes; 
    385   int32_t nblocks = params.nblocks; 
    386   int32_t leftover = params.nbytes % params.blocksize; 
    387   int32_t *bstarts = params.bstarts; 
    388   uint8_t *src = params.src; 
    389   uint8_t *dest = params.dest; 
    390   uint8_t *tmp = params.tmp[0];     /* tmp for thread 0 */ 
    391   uint8_t *tmp2 = params.tmp2[0];   /* tmp2 for thread 0 */ 
    392  
    393   for (j = 0; j < nblocks; j++) { 
    394     if (compress && !(flags & BLOSC_MEMCPYED)) { 
    395       bstarts[j] = sw32(ntbytes); 
    396     } 
    397     bsize = blocksize; 
     798 
     799  int32_t ebsize = context->blocksize + context->typesize * (int32_t)sizeof(int32_t); 
     800  int32_t ntbytes = context->num_output_bytes; 
     801 
     802  uint8_t *tmp = my_malloc(context->blocksize + ebsize); 
     803  uint8_t *tmp2 = tmp + context->blocksize; 
     804 
     805  for (j = 0; j < context->nblocks; j++) { 
     806    if (context->compress && !(*(context->header_flags) & BLOSC_MEMCPYED)) { 
     807      _sw32(context->bstarts + j * 4, ntbytes); 
     808    } 
     809    bsize = context->blocksize; 
    398810    leftoverblock = 0; 
    399     if ((j == nblocks - 1) && (leftover > 0)) { 
    400       bsize = leftover; 
     811    if ((j == context->nblocks - 1) && (context->leftover > 0)) { 
     812      bsize = context->leftover; 
    401813      leftoverblock = 1; 
    402814    } 
    403     if (compress) { 
    404       if (flags & BLOSC_MEMCPYED) { 
     815    if (context->compress) { 
     816      if (*(context->header_flags) & BLOSC_MEMCPYED) { 
    405817        /* We want to memcpy only */ 
    406         memcpy(dest+BLOSC_MAX_OVERHEAD+j*blocksize, src+j*blocksize, bsize); 
     818        memcpy(context->dest+BLOSC_MAX_OVERHEAD+j*context->blocksize, 
     819                context->src+j*context->blocksize, 
     820                bsize); 
    407821        cbytes = bsize; 
    408822      } 
    409823      else { 
    410824        /* Regular compression */ 
    411         cbytes = blosc_c(bsize, leftoverblock, ntbytes, maxbytes, 
    412                          src+j*blocksize, dest+ntbytes, tmp); 
     825        cbytes = blosc_c(context, bsize, leftoverblock, ntbytes, 
     826                         context->destsize, context->src+j*context->blocksize, 
     827                         context->dest+ntbytes, tmp, tmp2); 
    413828        if (cbytes == 0) { 
    414829          ntbytes = 0;              /* uncompressible data */ 
     
    418833    } 
    419834    else { 
    420       if (flags & BLOSC_MEMCPYED) { 
     835      if (*(context->header_flags) & BLOSC_MEMCPYED) { 
    421836        /* We want to memcpy only */ 
    422         memcpy(dest+j*blocksize, src+BLOSC_MAX_OVERHEAD+j*blocksize, bsize); 
     837        memcpy(context->dest+j*context->blocksize, 
     838                context->src+BLOSC_MAX_OVERHEAD+j*context->blocksize, 
     839                bsize); 
    423840        cbytes = bsize; 
    424841      } 
    425842      else { 
    426843        /* Regular decompression */ 
    427         cbytes = blosc_d(bsize, leftoverblock, 
    428                          src+sw32(bstarts[j]), dest+j*blocksize, tmp, tmp2); 
     844        cbytes = blosc_d(context, bsize, leftoverblock, 
     845                          context->src + sw32_(context->bstarts + j * 4), 
     846                          context->dest+j*context->blocksize, tmp, tmp2); 
    429847      } 
    430848    } 
     
    436854  } 
    437855 
     856  // Free temporaries 
     857  my_free(tmp); 
     858 
    438859  return ntbytes; 
    439860} 
     
    441862 
    442863/* Threaded version for compression/decompression */ 
    443 static int parallel_blosc(void) 
    444 { 
     864static int parallel_blosc(struct blosc_context* context) 
     865{ 
     866  int rc; 
    445867 
    446868  /* Check whether we need to restart threads */ 
    447   if (!init_threads_done || pid != getpid()) { 
    448     blosc_set_nthreads_(nthreads); 
    449   } 
     869  blosc_set_nthreads_(context); 
     870 
     871  /* Set sentinels */ 
     872  context->thread_giveup_code = 1; 
     873  context->thread_nblock = -1; 
    450874 
    451875  /* Synchronization point for all threads (wait for initialization) */ 
    452   WAIT_INIT; 
     876  WAIT_INIT(-1, context); 
     877 
    453878  /* Synchronization point for all threads (wait for finalization) */ 
    454   WAIT_FINISH; 
    455  
    456   if (giveup_code > 0) { 
     879  WAIT_FINISH(-1, context); 
     880 
     881  if (context->thread_giveup_code > 0) { 
    457882    /* Return the total bytes (de-)compressed in threads */ 
    458     return params.ntbytes; 
     883    return context->num_output_bytes; 
    459884  } 
    460885  else { 
    461886    /* Compression/decompression gave up.  Return error code. */ 
    462     return giveup_code; 
    463   } 
    464 } 
    465  
    466  
    467 /* Convenience functions for creating and releasing temporaries */ 
    468 static int create_temporaries(void) 
    469 { 
    470   int32_t tid; 
    471   int32_t typesize = params.typesize; 
    472   int32_t blocksize = params.blocksize; 
    473   /* Extended blocksize for temporary destination.  Extended blocksize 
    474    is only useful for compression in parallel mode, but it doesn't 
    475    hurt serial mode either. */ 
    476   int32_t ebsize = blocksize + typesize*(int32_t)sizeof(int32_t); 
    477  
    478   /* Create temporary area for each thread */ 
    479   for (tid = 0; tid < nthreads; tid++) { 
    480     uint8_t *tmp = my_malloc(blocksize); 
    481     uint8_t *tmp2; 
    482     if (tmp == NULL) { 
    483       return -1; 
    484     } 
    485     params.tmp[tid] = tmp; 
    486     tmp2 = my_malloc(ebsize); 
    487     if (tmp2 == NULL) { 
    488       return -1; 
    489     } 
    490     params.tmp2[tid] = tmp2; 
    491   } 
    492  
    493   init_temps_done = 1; 
    494   /* Update params for current temporaries */ 
    495   current_temp.nthreads = nthreads; 
    496   current_temp.typesize = typesize; 
    497   current_temp.blocksize = blocksize; 
    498   return 0; 
    499 } 
    500  
    501  
    502 static void release_temporaries(void) 
    503 { 
    504   int32_t tid; 
    505  
    506   /* Release buffers */ 
    507   for (tid = 0; tid < nthreads; tid++) { 
    508     my_free(params.tmp[tid]); 
    509     my_free(params.tmp2[tid]); 
    510   } 
    511  
    512   init_temps_done = 0; 
     887    return context->thread_giveup_code; 
     888  } 
    513889} 
    514890 
     
    516892/* Do the compression or decompression of the buffer depending on the 
    517893   global params. */ 
    518 static int do_job(void) 
     894static int do_job(struct blosc_context* context) 
    519895{ 
    520896  int32_t ntbytes; 
    521  
    522   /* Initialize/reset temporaries if needed */ 
    523   if (!init_temps_done) { 
    524     int ret; 
    525     ret = create_temporaries(); 
    526     if (ret < 0) { 
    527       return -1; 
    528     } 
    529   } 
    530   else if (current_temp.nthreads != nthreads || 
    531            current_temp.typesize != params.typesize || 
    532            current_temp.blocksize != params.blocksize) { 
    533     int ret; 
    534     release_temporaries(); 
    535     ret = create_temporaries(); 
    536     if (ret < 0) { 
    537       return -1; 
    538     } 
    539   } 
    540897 
    541898  /* Run the serial version when nthreads is 1 or when the buffers are 
    542899     not much larger than blocksize */ 
    543   if (nthreads == 1 || (params.nbytes / params.blocksize) <= 1) { 
    544     ntbytes = serial_blosc(); 
     900  if (context->numthreads == 1 || (context->sourcesize / context->blocksize) <= 1) { 
     901    ntbytes = serial_blosc(context); 
    545902  } 
    546903  else { 
    547     ntbytes = parallel_blosc(); 
     904    ntbytes = parallel_blosc(context); 
    548905  } 
    549906 
     
    552909 
    553910 
    554 static int32_t compute_blocksize(int32_t clevel, int32_t typesize, 
    555                                  int32_t nbytes) 
     911static int32_t compute_blocksize(struct blosc_context* context, int32_t clevel, 
     912                                 int32_t typesize, int32_t nbytes, 
     913                                 int32_t forced_blocksize) 
    556914{ 
    557915  int32_t blocksize; 
     
    564922  blocksize = nbytes;           /* Start by a whole buffer as blocksize */ 
    565923 
    566   if (force_blocksize) { 
    567     blocksize = force_blocksize; 
    568     /* Check that forced blocksize is not too small nor too large */ 
     924  if (forced_blocksize) { 
     925    blocksize = forced_blocksize; 
     926    /* Check that forced blocksize is not too small */ 
    569927    if (blocksize < MIN_BUFFERSIZE) { 
    570928      blocksize = MIN_BUFFERSIZE; 
    571929    } 
    572930  } 
    573   else if (nbytes >= L1*4) { 
    574     blocksize = L1 * 4; 
     931  else if (nbytes >= L1) { 
     932    blocksize = L1; 
     933 
     934    /* For LZ4HC, increase the block sizes by a factor of 8 because it 
     935       is meant for compressing large blocks (it shows a big overhead 
     936       when compressing small ones). */ 
     937    if (context->compcode == BLOSC_LZ4HC) { 
     938      blocksize *= 8; 
     939    } 
     940 
     941    /* For Zlib, increase the block sizes by a factor of 8 because it 
     942       is meant for compressing large blocks (it shows a big overhead 
     943       when compressing small ones). */ 
     944    if (context->compcode == BLOSC_ZLIB) { 
     945      blocksize *= 8; 
     946    } 
     947 
     948    /* For Zstd, increase the block sizes by a factor of 8 because it 
     949       is meant for compressing large blocks (it shows a big overhead 
     950       when compressing small ones). */ 
     951    if (context->compcode == BLOSC_ZSTD) { 
     952      blocksize *= 8; 
     953    } 
     954 
    575955    if (clevel == 0) { 
    576       blocksize /= 16; 
     956      blocksize /= 4; 
    577957    } 
    578958    else if (clevel <= 3) { 
    579       blocksize /= 8; 
     959      blocksize /= 2; 
    580960    } 
    581961    else if (clevel <= 5) { 
    582       blocksize /= 4; 
     962      blocksize *= 1; 
    583963    } 
    584964    else if (clevel <= 6) { 
    585       blocksize /= 2; 
     965      blocksize *= 2; 
    586966    } 
    587967    else if (clevel < 9) { 
    588       blocksize *= 1; 
     968      blocksize *= 4; 
    589969    } 
    590970    else { 
    591       blocksize *= 2; 
     971      blocksize *= 16; 
    592972    } 
    593973  } 
     
    598978  } 
    599979 
    600   /* blocksize must be a multiple of the typesize */ 
     980  /* blocksize *must absolutely* be a multiple of the typesize */ 
    601981  if (blocksize > typesize) { 
    602982    blocksize = blocksize / typesize * typesize; 
    603983  } 
    604984 
    605   /* blocksize must not exceed (64 KB * typesize) in order to allow 
    606      BloscLZ to achieve better compression ratios (the ultimate reason 
    607      for this is that hash_log in BloscLZ cannot be larger than 15) */ 
    608   if ((blocksize / typesize) > 64*KB) { 
    609     blocksize = 64 * KB * typesize; 
    610   } 
    611  
    612985  return blocksize; 
    613986} 
    614987 
    615  
    616 /* The public routine for compression.  See blosc.h for docstrings. */ 
    617 int blosc_compress(int clevel, int doshuffle, size_t typesize, size_t nbytes, 
    618       const void *src, void *dest, size_t destsize) 
    619 { 
    620   uint8_t *_dest=NULL;         /* current pos for destination buffer */ 
    621   uint8_t *flags;              /* flags for header.  Currently booked: 
    622                                   - 0: shuffled? 
    623                                   - 1: memcpy'ed? */ 
    624   int32_t nbytes_;            /* number of bytes in source buffer */ 
    625   int32_t nblocks;            /* number of total blocks in buffer */ 
    626   int32_t leftover;           /* extra bytes at end of buffer */ 
    627   int32_t *bstarts;           /* start pointers for each block */ 
    628   int32_t blocksize;          /* length of the block in bytes */ 
    629   int32_t ntbytes = 0;        /* the number of compressed bytes */ 
    630   int32_t *ntbytes_;          /* placeholder for bytes in output buffer */ 
    631   int32_t maxbytes = (int32_t)destsize;  /* maximum size for dest buffer */ 
     988static int initialize_context_compression(struct blosc_context* context, 
     989                          int clevel, 
     990                          int doshuffle, 
     991                          size_t typesize, 
     992                          size_t sourcesize, 
     993                          const void* src, 
     994                          void* dest, 
     995                          size_t destsize, 
     996                          int32_t compressor, 
     997                          int32_t blocksize, 
     998                          int32_t numthreads) 
     999{ 
     1000  /* Set parameters */ 
     1001  context->compress = 1; 
     1002  context->src = (const uint8_t*)src; 
     1003  context->dest = (uint8_t *)(dest); 
     1004  context->num_output_bytes = 0; 
     1005  context->destsize = (int32_t)destsize; 
     1006  context->sourcesize = sourcesize; 
     1007  context->typesize = typesize; 
     1008  context->compcode = compressor; 
     1009  context->numthreads = numthreads; 
     1010  context->end_threads = 0; 
     1011  context->clevel = clevel; 
    6321012 
    6331013  /* Check buffer size limits */ 
    634   if (nbytes > BLOSC_MAX_BUFFERSIZE) { 
     1014  if (sourcesize > BLOSC_MAX_BUFFERSIZE) { 
    6351015    /* If buffer is too large, give up. */ 
    6361016    fprintf(stderr, "Input buffer size cannot exceed %d bytes\n", 
     
    6381018    return -1; 
    6391019  } 
    640  
    641   /* We can safely do this assignation now */ 
    642   nbytes_ = (int32_t)nbytes; 
    6431020 
    6441021  /* Compression level */ 
     
    6501027 
    6511028  /* Shuffle */ 
    652   if (doshuffle != 0 && doshuffle != 1) { 
    653     fprintf(stderr, "`shuffle` parameter must be either 0 or 1!\n"); 
     1029  if (doshuffle != 0 && doshuffle != 1 && doshuffle != 2) { 
     1030    fprintf(stderr, "`shuffle` parameter must be either 0, 1 or 2!\n"); 
    6541031    return -10; 
    6551032  } 
    6561033 
    6571034  /* Check typesize limits */ 
    658   if (typesize > BLOSC_MAX_TYPESIZE) { 
     1035  if (context->typesize > BLOSC_MAX_TYPESIZE) { 
    6591036    /* If typesize is too large, treat buffer as an 1-byte stream. */ 
    660     typesize = 1; 
     1037    context->typesize = 1; 
    6611038  } 
    6621039 
    6631040  /* Get the blocksize */ 
    664   blocksize = compute_blocksize(clevel, (int32_t)typesize, nbytes_); 
     1041  context->blocksize = compute_blocksize(context, clevel, (int32_t)context->typesize, context->sourcesize, blocksize); 
    6651042 
    6661043  /* Compute number of blocks in buffer */ 
    667   nblocks = nbytes_ / blocksize; 
    668   leftover = nbytes_ % blocksize; 
    669   nblocks = (leftover>0)? nblocks+1: nblocks; 
    670  
    671   _dest = (uint8_t *)(dest); 
    672   /* Write header for this block */ 
    673   _dest[0] = BLOSC_VERSION_FORMAT;         /* blosc format version */ 
    674   _dest[1] = BLOSCLZ_VERSION_FORMAT;       /* blosclz format version */ 
    675   flags = _dest+2;                         /* flags */ 
    676   _dest[2] = 0;                            /* zeroes flags */ 
    677   _dest[3] = (uint8_t)typesize;            /* type size */ 
    678   _dest += 4; 
    679   ((int32_t *)_dest)[0] = sw32(nbytes_);  /* size of the buffer */ 
    680   ((int32_t *)_dest)[1] = sw32(blocksize);/* block size */ 
    681   ntbytes_ = (int32_t *)(_dest+8);        /* compressed buffer size */ 
    682   _dest += sizeof(int32_t)*3; 
    683   bstarts = (int32_t *)_dest;             /* starts for every block */ 
    684   _dest += sizeof(int32_t)*nblocks;        /* space for pointers to blocks */ 
    685   ntbytes = (int32_t)(_dest - (uint8_t *)dest); 
    686  
    687   if (clevel == 0) { 
     1044  context->nblocks = context->sourcesize / context->blocksize; 
     1045  context->leftover = context->sourcesize % context->blocksize; 
     1046  context->nblocks = (context->leftover > 0) ? (context->nblocks + 1) : context->nblocks; 
     1047 
     1048  return 1; 
     1049} 
     1050 
     1051static int write_compression_header(struct blosc_context* context, int clevel, int doshuffle) 
     1052{ 
     1053  int32_t compformat; 
     1054 
     1055  /* Write version header for this block */ 
     1056  context->dest[0] = BLOSC_VERSION_FORMAT;              /* blosc format version */ 
     1057 
     1058  /* Write compressor format */ 
     1059  compformat = -1; 
     1060  switch (context->compcode) 
     1061  { 
     1062  case BLOSC_BLOSCLZ: 
     1063    compformat = BLOSC_BLOSCLZ_FORMAT; 
     1064    context->dest[1] = BLOSC_BLOSCLZ_VERSION_FORMAT; /* blosclz format version */ 
     1065    break; 
     1066 
     1067#if defined(HAVE_LZ4) 
     1068  case BLOSC_LZ4: 
     1069    compformat = BLOSC_LZ4_FORMAT; 
     1070    context->dest[1] = BLOSC_LZ4_VERSION_FORMAT;  /* lz4 format version */ 
     1071    break; 
     1072  case BLOSC_LZ4HC: 
     1073    compformat = BLOSC_LZ4HC_FORMAT; 
     1074    context->dest[1] = BLOSC_LZ4HC_VERSION_FORMAT; /* lz4hc is the same as lz4 */ 
     1075    break; 
     1076#endif /* HAVE_LZ4 */ 
     1077 
     1078#if defined(HAVE_SNAPPY) 
     1079  case BLOSC_SNAPPY: 
     1080    compformat = BLOSC_SNAPPY_FORMAT; 
     1081    context->dest[1] = BLOSC_SNAPPY_VERSION_FORMAT;    /* snappy format version */ 
     1082    break; 
     1083#endif /* HAVE_SNAPPY */ 
     1084 
     1085#if defined(HAVE_ZLIB) 
     1086  case BLOSC_ZLIB: 
     1087    compformat = BLOSC_ZLIB_FORMAT; 
     1088    context->dest[1] = BLOSC_ZLIB_VERSION_FORMAT;      /* zlib format version */ 
     1089    break; 
     1090#endif /* HAVE_ZLIB */ 
     1091 
     1092#if defined(HAVE_ZSTD) 
     1093  case BLOSC_ZSTD: 
     1094    compformat = BLOSC_ZSTD_FORMAT; 
     1095    context->dest[1] = BLOSC_ZSTD_VERSION_FORMAT;      /* zstd format version */ 
     1096    break; 
     1097#endif /* HAVE_ZSTD */ 
     1098 
     1099  default: 
     1100  { 
     1101    char *compname; 
     1102    compname = clibcode_to_clibname(compformat); 
     1103    fprintf(stderr, "Blosc has not been compiled with '%s' ", compname); 
     1104    fprintf(stderr, "compression support.  Please use one having it."); 
     1105    return -5;    /* signals no compression support */ 
     1106    break; 
     1107  } 
     1108  } 
     1109 
     1110  context->header_flags = context->dest+2;  /* flags */ 
     1111  context->dest[2] = 0;  /* zeroes flags */ 
     1112  context->dest[3] = (uint8_t)context->typesize;  /* type size */ 
     1113  _sw32(context->dest + 4, context->sourcesize);  /* size of the buffer */ 
     1114  _sw32(context->dest + 8, context->blocksize);  /* block size */ 
     1115  context->bstarts = context->dest + 16;  /* starts for every block */ 
     1116  context->num_output_bytes = 16 + sizeof(int32_t)*context->nblocks;  /* space for header and pointers */ 
     1117 
     1118  if (context->clevel == 0) { 
    6881119    /* Compression level 0 means buffer to be memcpy'ed */ 
    689     *flags |= BLOSC_MEMCPYED; 
    690   } 
    691  
    692   if (nbytes_ < MIN_BUFFERSIZE) { 
     1120    *(context->header_flags) |= BLOSC_MEMCPYED; 
     1121  } 
     1122 
     1123  if (context->sourcesize < MIN_BUFFERSIZE) { 
    6931124    /* Buffer is too small.  Try memcpy'ing. */ 
    694     *flags |= BLOSC_MEMCPYED; 
    695   } 
    696  
    697   if (doshuffle == 1) { 
    698     /* Shuffle is active */ 
    699     *flags |= BLOSC_DOSHUFFLE;              /* bit 0 set to one in flags */ 
    700   } 
    701  
    702   /* Take global lock for the time of compression */ 
    703   pthread_mutex_lock(&global_comp_mutex); 
    704   /* Populate parameters for compression routines */ 
    705   params.compress = 1; 
    706   params.clevel = clevel; 
    707   params.flags = (int32_t)*flags; 
    708   params.typesize = (int32_t)typesize; 
    709   params.blocksize = blocksize; 
    710   params.ntbytes = ntbytes; 
    711   params.nbytes = nbytes_; 
    712   params.maxbytes = maxbytes; 
    713   params.nblocks = nblocks; 
    714   params.leftover = leftover; 
    715   params.bstarts = bstarts; 
    716   params.src = (uint8_t *)src; 
    717   params.dest = (uint8_t *)dest; 
    718  
    719   if (!(*flags & BLOSC_MEMCPYED)) { 
     1125    *(context->header_flags) |= BLOSC_MEMCPYED; 
     1126  } 
     1127 
     1128  if (doshuffle == BLOSC_SHUFFLE) { 
     1129    /* Byte-shuffle is active */ 
     1130    *(context->header_flags) |= BLOSC_DOSHUFFLE;     /* bit 0 set to one in flags */ 
     1131  } 
     1132 
     1133  if (doshuffle == BLOSC_BITSHUFFLE) { 
     1134    /* Bit-shuffle is active */ 
     1135    *(context->header_flags) |= BLOSC_DOBITSHUFFLE;  /* bit 2 set to one in flags */ 
     1136  } 
     1137 
     1138  *(context->header_flags) |= compformat << 5;      /* compressor format start at bit 5 */ 
     1139 
     1140  return 1; 
     1141} 
     1142 
     1143int blosc_compress_context(struct blosc_context* context) 
     1144{ 
     1145  int32_t ntbytes = 0; 
     1146 
     1147  if (!(*(context->header_flags) & BLOSC_MEMCPYED)) { 
    7201148    /* Do the actual compression */ 
    721     ntbytes = do_job(); 
     1149    ntbytes = do_job(context); 
    7221150    if (ntbytes < 0) { 
    7231151      return -1; 
    7241152    } 
    725     if ((ntbytes == 0) && (nbytes_+BLOSC_MAX_OVERHEAD <= maxbytes)) { 
     1153    if ((ntbytes == 0) && (context->sourcesize+BLOSC_MAX_OVERHEAD <= context->destsize)) { 
    7261154      /* Last chance for fitting `src` buffer in `dest`.  Update flags 
    7271155       and do a memcpy later on. */ 
    728       *flags |= BLOSC_MEMCPYED; 
    729       params.flags |= BLOSC_MEMCPYED; 
    730     } 
    731   } 
    732  
    733   if (*flags & BLOSC_MEMCPYED) { 
    734     if (nbytes_+BLOSC_MAX_OVERHEAD > maxbytes) { 
     1156      *(context->header_flags) |= BLOSC_MEMCPYED; 
     1157    } 
     1158  } 
     1159 
     1160  if (*(context->header_flags) & BLOSC_MEMCPYED) { 
     1161    if (context->sourcesize + BLOSC_MAX_OVERHEAD > context->destsize) { 
    7351162      /* We are exceeding maximum output size */ 
    7361163      ntbytes = 0; 
    7371164    } 
    738     else if (((nbytes_ % L1) == 0) || (nthreads > 1)) { 
    739       /* More effective with large buffers that are multiples of the 
    740        cache size or multi-cores */ 
    741       params.ntbytes = BLOSC_MAX_OVERHEAD; 
    742       ntbytes = do_job(); 
    743       if (ntbytes < 0) { 
    744         return -1; 
    745       } 
    746     } 
    7471165    else { 
    748       memcpy((uint8_t *)dest+BLOSC_MAX_OVERHEAD, src, nbytes_); 
    749       ntbytes = nbytes_ + BLOSC_MAX_OVERHEAD; 
     1166      memcpy(context->dest+BLOSC_MAX_OVERHEAD, context->src, 
     1167             context->sourcesize); 
     1168      ntbytes = context->sourcesize + BLOSC_MAX_OVERHEAD; 
    7501169    } 
    7511170  } 
    7521171 
    7531172  /* Set the number of compressed bytes in header */ 
    754   *ntbytes_ = sw32(ntbytes); 
    755  
    756   /* Release global lock */ 
     1173  _sw32(context->dest + 12, ntbytes); 
     1174 
     1175  assert(ntbytes <= context->destsize); 
     1176  return ntbytes; 
     1177} 
     1178 
     1179/* The public routine for compression with context. */ 
     1180int blosc_compress_ctx(int clevel, int doshuffle, size_t typesize, 
     1181                       size_t nbytes, const void* src, void* dest, 
     1182                       size_t destsize, const char* compressor, 
     1183                       size_t blocksize, int numinternalthreads) 
     1184{ 
     1185  int error, result; 
     1186  struct blosc_context context; 
     1187 
     1188  context.threads_started = 0; 
     1189  error = initialize_context_compression(&context, clevel, doshuffle, typesize, 
     1190                                         nbytes, src, dest, destsize, 
     1191                                         blosc_compname_to_compcode(compressor), 
     1192                                         blocksize, numinternalthreads); 
     1193  if (error < 0) { return error; } 
     1194 
     1195  error = write_compression_header(&context, clevel, doshuffle); 
     1196  if (error < 0) { return error; } 
     1197 
     1198  result = blosc_compress_context(&context); 
     1199 
     1200  if (numinternalthreads > 1) 
     1201  { 
     1202    blosc_release_threadpool(&context); 
     1203  } 
     1204 
     1205  return result; 
     1206} 
     1207 
     1208/* The public routine for compression.  See blosc.h for docstrings. */ 
     1209int blosc_compress(int clevel, int doshuffle, size_t typesize, size_t nbytes, 
     1210                   const void *src, void *dest, size_t destsize) 
     1211{ 
     1212  int error; 
     1213  int result; 
     1214  char* envvar; 
     1215 
     1216  /* Check if should initialize */ 
     1217  if (!g_initlib) blosc_init(); 
     1218 
     1219  /* Check for a BLOSC_CLEVEL environment variable */ 
     1220  envvar = getenv("BLOSC_CLEVEL"); 
     1221  if (envvar != NULL) { 
     1222    long value; 
     1223    value = strtol(envvar, NULL, 10); 
     1224    if ((value != EINVAL) && (value >= 0)) { 
     1225      clevel = (int)value; 
     1226    } 
     1227  } 
     1228 
     1229  /* Check for a BLOSC_SHUFFLE environment variable */ 
     1230  envvar = getenv("BLOSC_SHUFFLE"); 
     1231  if (envvar != NULL) { 
     1232    if (strcmp(envvar, "NOSHUFFLE") == 0) { 
     1233      doshuffle = BLOSC_NOSHUFFLE; 
     1234    } 
     1235    if (strcmp(envvar, "SHUFFLE") == 0) { 
     1236      doshuffle = BLOSC_SHUFFLE; 
     1237    } 
     1238    if (strcmp(envvar, "BITSHUFFLE") == 0) { 
     1239      doshuffle = BLOSC_BITSHUFFLE; 
     1240    } 
     1241  } 
     1242 
     1243  /* Check for a BLOSC_TYPESIZE environment variable */ 
     1244  envvar = getenv("BLOSC_TYPESIZE"); 
     1245  if (envvar != NULL) { 
     1246    long value; 
     1247    value = strtol(envvar, NULL, 10); 
     1248    if ((value != EINVAL) && (value > 0)) { 
     1249      typesize = (int)value; 
     1250    } 
     1251  } 
     1252 
     1253  /* Check for a BLOSC_COMPRESSOR environment variable */ 
     1254  envvar = getenv("BLOSC_COMPRESSOR"); 
     1255  if (envvar != NULL) { 
     1256    result = blosc_set_compressor(envvar); 
     1257    if (result < 0) { return result; } 
     1258  } 
     1259 
     1260  /* Check for a BLOSC_COMPRESSOR environment variable */ 
     1261  envvar = getenv("BLOSC_BLOCKSIZE"); 
     1262  if (envvar != NULL) { 
     1263    long blocksize; 
     1264    blocksize = strtol(envvar, NULL, 10); 
     1265    if ((blocksize != EINVAL) && (blocksize > 0)) { 
     1266      blosc_set_blocksize((size_t)blocksize); 
     1267    } 
     1268  } 
     1269 
     1270  /* Check for a BLOSC_NTHREADS environment variable */ 
     1271  envvar = getenv("BLOSC_NTHREADS"); 
     1272  if (envvar != NULL) { 
     1273    long nthreads; 
     1274    nthreads = strtol(envvar, NULL, 10); 
     1275    if ((nthreads != EINVAL) && (nthreads > 0)) { 
     1276      result = blosc_set_nthreads((int)nthreads); 
     1277      if (result < 0) { return result; } 
     1278    } 
     1279  } 
     1280 
     1281  /* Check for a BLOSC_NOLOCK environment variable.  It is important 
     1282     that this should be the last env var so that it can take the 
     1283     previous ones into account */ 
     1284  envvar = getenv("BLOSC_NOLOCK"); 
     1285  if (envvar != NULL) { 
     1286    char *compname; 
     1287    blosc_compcode_to_compname(g_compressor, &compname); 
     1288    result = blosc_compress_ctx(clevel, doshuffle, typesize, 
     1289                                nbytes, src, dest, destsize, 
     1290                                compname, g_force_blocksize, g_threads); 
     1291    return result; 
     1292  } 
     1293 
     1294  pthread_mutex_lock(&global_comp_mutex); 
     1295 
     1296  error = initialize_context_compression(g_global_context, clevel, doshuffle, 
     1297                                         typesize, nbytes, src, dest, destsize, 
     1298                                         g_compressor, g_force_blocksize, 
     1299                                         g_threads); 
     1300  if (error < 0) { return error; } 
     1301 
     1302  error = write_compression_header(g_global_context, clevel, doshuffle); 
     1303  if (error < 0) { return error; } 
     1304 
     1305  result = blosc_compress_context(g_global_context); 
     1306 
    7571307  pthread_mutex_unlock(&global_comp_mutex); 
    758    
    759   assert((int32_t)ntbytes <= (int32_t)maxbytes); 
    760   return ntbytes; 
    761 } 
    762  
    763  
    764 /* The public routine for decompression.  See blosc.h for docstrings. */ 
    765 int blosc_decompress(const void *src, void *dest, size_t destsize) 
    766 { 
    767   uint8_t *_src=NULL;            /* current pos for source buffer */ 
    768   uint8_t version, versionlz;    /* versions for compressed header */ 
    769   uint8_t flags;                 /* flags for header */ 
    770   int32_t ntbytes;               /* the number of uncompressed bytes */ 
    771   int32_t nblocks;              /* number of total blocks in buffer */ 
    772   int32_t leftover;             /* extra bytes at end of buffer */ 
    773   int32_t *bstarts;             /* start pointers for each block */ 
    774   int32_t typesize, blocksize, nbytes, ctbytes; 
    775  
    776   _src = (uint8_t *)(src); 
     1308 
     1309  return result; 
     1310} 
     1311 
     1312int blosc_run_decompression_with_context(struct blosc_context* context, 
     1313                                         const void* src, 
     1314                                         void* dest, 
     1315                                         size_t destsize, 
     1316                                         int numinternalthreads) 
     1317{ 
     1318  uint8_t version; 
     1319  uint8_t versionlz; 
     1320  uint32_t ctbytes; 
     1321  int32_t ntbytes; 
     1322 
     1323  context->compress = 0; 
     1324  context->src = (const uint8_t*)src; 
     1325  context->dest = (uint8_t*)dest; 
     1326  context->destsize = destsize; 
     1327  context->num_output_bytes = 0; 
     1328  context->numthreads = numinternalthreads; 
     1329  context->end_threads = 0; 
    7771330 
    7781331  /* Read the header block */ 
    779   version = _src[0];                         /* blosc format version */ 
    780   versionlz = _src[1];                       /* blosclz format version */ 
    781   flags = _src[2];                           /* flags */ 
    782   typesize = (int32_t)_src[3];              /* typesize */ 
    783   _src += 4; 
    784   nbytes = sw32(((int32_t *)_src)[0]);      /* buffer size */ 
    785   blocksize = sw32(((int32_t *)_src)[1]);   /* block size */ 
    786   ctbytes = sw32(((int32_t *)_src)[2]);     /* compressed buffer size */ 
    787  
     1332  version = context->src[0];                        /* blosc format version */ 
     1333  versionlz = context->src[1];                      /* blosclz format version */ 
     1334 
     1335  context->header_flags = (uint8_t*)(context->src + 2);           /* flags */ 
     1336  context->typesize = (int32_t)context->src[3];      /* typesize */ 
     1337  context->sourcesize = sw32_(context->src + 4);     /* buffer size */ 
     1338  context->blocksize = sw32_(context->src + 8);      /* block size */ 
     1339  ctbytes = sw32_(context->src + 12);               /* compressed buffer size */ 
     1340 
     1341  /* Unused values */ 
    7881342  version += 0;                             /* shut up compiler warning */ 
    7891343  versionlz += 0;                           /* shut up compiler warning */ 
    7901344  ctbytes += 0;                             /* shut up compiler warning */ 
    7911345 
    792   _src += sizeof(int32_t)*3; 
    793   bstarts = (int32_t *)_src; 
     1346  context->bstarts = (uint8_t*)(context->src + 16); 
     1347  /* Compute some params */ 
     1348  /* Total blocks */ 
     1349  context->nblocks = context->sourcesize / context->blocksize; 
     1350  context->leftover = context->sourcesize % context->blocksize; 
     1351  context->nblocks = (context->leftover>0)? context->nblocks+1: context->nblocks; 
     1352 
     1353  /* Check that we have enough space to decompress */ 
     1354  if (context->sourcesize > (int32_t)destsize) { 
     1355    return -1; 
     1356  } 
     1357 
     1358  /* Check whether this buffer is memcpy'ed */ 
     1359  if (*(context->header_flags) & BLOSC_MEMCPYED) { 
     1360      memcpy(dest, (uint8_t *)src+BLOSC_MAX_OVERHEAD, context->sourcesize); 
     1361      ntbytes = context->sourcesize; 
     1362  } 
     1363  else { 
     1364    /* Do the actual decompression */ 
     1365    ntbytes = do_job(context); 
     1366    if (ntbytes < 0) { 
     1367      return -1; 
     1368    } 
     1369  } 
     1370 
     1371  assert(ntbytes <= (int32_t)destsize); 
     1372  return ntbytes; 
     1373} 
     1374 
     1375/* The public routine for decompression with context. */ 
     1376int blosc_decompress_ctx(const void *src, void *dest, size_t destsize, 
     1377                         int numinternalthreads) 
     1378{ 
     1379  int result; 
     1380  struct blosc_context context; 
     1381 
     1382  context.threads_started = 0; 
     1383  result = blosc_run_decompression_with_context(&context, src, dest, destsize, numinternalthreads); 
     1384 
     1385  if (numinternalthreads > 1) 
     1386  { 
     1387    blosc_release_threadpool(&context); 
     1388  } 
     1389 
     1390  return result; 
     1391} 
     1392 
     1393 
     1394/* The public routine for decompression.  See blosc.h for docstrings. */ 
     1395int blosc_decompress(const void *src, void *dest, size_t destsize) 
     1396{ 
     1397  int result; 
     1398  char* envvar; 
     1399  long nthreads; 
     1400 
     1401  /* Check if should initialize */ 
     1402  if (!g_initlib) blosc_init(); 
     1403 
     1404  /* Check for a BLOSC_NTHREADS environment variable */ 
     1405  envvar = getenv("BLOSC_NTHREADS"); 
     1406  if (envvar != NULL) { 
     1407    nthreads = strtol(envvar, NULL, 10); 
     1408    if ((nthreads != EINVAL) && (nthreads > 0)) { 
     1409      result = blosc_set_nthreads((int)nthreads); 
     1410      if (result < 0) { return result; } 
     1411    } 
     1412  } 
     1413 
     1414  /* Check for a BLOSC_NOLOCK environment variable.  It is important 
     1415     that this should be the last env var so that it can take the 
     1416     previous ones into account */ 
     1417  envvar = getenv("BLOSC_NOLOCK"); 
     1418  if (envvar != NULL) { 
     1419    result = blosc_decompress_ctx(src, dest, destsize, g_threads); 
     1420    return result; 
     1421  } 
     1422 
     1423  pthread_mutex_lock(&global_comp_mutex); 
     1424 
     1425  result = blosc_run_decompression_with_context(g_global_context, src, dest, 
     1426                                                destsize, g_threads); 
     1427 
     1428  pthread_mutex_unlock(&global_comp_mutex); 
     1429 
     1430  return result; 
     1431} 
     1432 
     1433 
     1434/* Specific routine optimized for decompression a small number of 
     1435   items out of a compressed chunk.  This does not use threads because 
     1436   it would affect negatively to performance. */ 
     1437int blosc_getitem(const void *src, int start, int nitems, void *dest) 
     1438{ 
     1439  uint8_t *_src=NULL;               /* current pos for source buffer */ 
     1440  uint8_t version, versionlz;       /* versions for compressed header */ 
     1441  uint8_t flags;                    /* flags for header */ 
     1442  int32_t ntbytes = 0;              /* the number of uncompressed bytes */ 
     1443  int32_t nblocks;                  /* number of total blocks in buffer */ 
     1444  int32_t leftover;                 /* extra bytes at end of buffer */ 
     1445  uint8_t *bstarts;                 /* start pointers for each block */ 
     1446  int tmp_init = 0; 
     1447  int32_t typesize, blocksize, nbytes, ctbytes; 
     1448  int32_t j, bsize, bsize2, leftoverblock; 
     1449  int32_t cbytes, startb, stopb; 
     1450  int stop = start + nitems; 
     1451  uint8_t *tmp; 
     1452  uint8_t *tmp2; 
     1453  uint8_t *tmp3; 
     1454  int32_t ebsize; 
     1455 
     1456  _src = (uint8_t *)(src); 
     1457 
     1458  /* Read the header block */ 
     1459  version = _src[0];                        /* blosc format version */ 
     1460  versionlz = _src[1];                      /* blosclz format version */ 
     1461  flags = _src[2];                          /* flags */ 
     1462  typesize = (int32_t)_src[3];              /* typesize */ 
     1463  nbytes = sw32_(_src + 4);                 /* buffer size */ 
     1464  blocksize = sw32_(_src + 8);              /* block size */ 
     1465  ctbytes = sw32_(_src + 12);               /* compressed buffer size */ 
     1466 
     1467  ebsize = blocksize + typesize * (int32_t)sizeof(int32_t); 
     1468  tmp = my_malloc(blocksize + ebsize + blocksize); 
     1469  tmp2 = tmp + blocksize; 
     1470  tmp3 = tmp + blocksize + ebsize; 
     1471 
     1472  version += 0;                             /* shut up compiler warning */ 
     1473  versionlz += 0;                           /* shut up compiler warning */ 
     1474  ctbytes += 0;                             /* shut up compiler warning */ 
     1475 
     1476  _src += 16; 
     1477  bstarts = _src; 
    7941478  /* Compute some params */ 
    7951479  /* Total blocks */ 
     
    7991483  _src += sizeof(int32_t)*nblocks; 
    8001484 
    801   /* Check that we have enough space to decompress */ 
    802   if (nbytes > (int32_t)destsize) { 
    803     return -1; 
    804   } 
    805  
    806   /* Take global lock for the time of decompression */ 
    807   pthread_mutex_lock(&global_comp_mutex); 
    808    
    809   /* Populate parameters for decompression routines */ 
    810   params.compress = 0; 
    811   params.clevel = 0;            /* specific for compression */ 
    812   params.flags = (int32_t)flags; 
    813   params.typesize = typesize; 
    814   params.blocksize = blocksize; 
    815   params.ntbytes = 0; 
    816   params.nbytes = nbytes; 
    817   params.nblocks = nblocks; 
    818   params.leftover = leftover; 
    819   params.bstarts = bstarts; 
    820   params.src = (uint8_t *)src; 
    821   params.dest = (uint8_t *)dest; 
    822  
    823   /* Check whether this buffer is memcpy'ed */ 
    824   if (flags & BLOSC_MEMCPYED) { 
    825     if (((nbytes % L1) == 0) || (nthreads > 1)) { 
    826       /* More effective with large buffers that are multiples of the 
    827        cache size or multi-cores */ 
    828       ntbytes = do_job(); 
    829       if (ntbytes < 0) { 
    830         return -1; 
    831       } 
    832     } 
    833     else { 
    834       memcpy(dest, (uint8_t *)src+BLOSC_MAX_OVERHEAD, nbytes); 
    835       ntbytes = nbytes; 
    836     } 
    837   } 
    838   else { 
    839     /* Do the actual decompression */ 
    840     ntbytes = do_job(); 
    841     if (ntbytes < 0) { 
    842       return -1; 
    843     } 
    844   } 
    845   /* Release global lock */ 
    846   pthread_mutex_unlock(&global_comp_mutex); 
    847    
    848   assert(ntbytes <= (int32_t)destsize); 
    849   return ntbytes; 
    850 } 
    851  
    852  
    853 /* Specific routine optimized for decompression a small number of 
    854    items out of a compressed chunk.  This does not use threads because 
    855    it would affect negatively to performance. */ 
    856 int blosc_getitem(const void *src, int start, int nitems, void *dest) 
    857 { 
    858   uint8_t *_src=NULL;               /* current pos for source buffer */ 
    859   uint8_t version, versionlz;       /* versions for compressed header */ 
    860   uint8_t flags;                    /* flags for header */ 
    861   int32_t ntbytes = 0;              /* the number of uncompressed bytes */ 
    862   int32_t nblocks;                 /* number of total blocks in buffer */ 
    863   int32_t leftover;                /* extra bytes at end of buffer */ 
    864   int32_t *bstarts;                /* start pointers for each block */ 
    865   uint8_t *tmp = params.tmp[0];     /* tmp for thread 0 */ 
    866   uint8_t *tmp2 = params.tmp2[0];   /* tmp2 for thread 0 */ 
    867   int tmp_init = 0; 
    868   int32_t typesize, blocksize, nbytes, ctbytes; 
    869   int32_t j, bsize, bsize2, leftoverblock; 
    870   int32_t cbytes, startb, stopb; 
    871   int stop = start + nitems; 
    872  
    873   _src = (uint8_t *)(src); 
    874  
    875   /* Take global lock  */ 
    876   pthread_mutex_lock(&global_comp_mutex); 
    877    
    878   /* Read the header block */ 
    879   version = _src[0];                         /* blosc format version */ 
    880   versionlz = _src[1];                       /* blosclz format version */ 
    881   flags = _src[2];                           /* flags */ 
    882   typesize = (int32_t)_src[3];              /* typesize */ 
    883   _src += 4; 
    884   nbytes = sw32(((int32_t *)_src)[0]);      /* buffer size */ 
    885   blocksize = sw32(((int32_t *)_src)[1]);   /* block size */ 
    886   ctbytes = sw32(((int32_t *)_src)[2]);     /* compressed buffer size */ 
    887  
    888   version += 0;                             /* shut up compiler warning */ 
    889   versionlz += 0;                           /* shut up compiler warning */ 
    890   ctbytes += 0;                             /* shut up compiler warning */ 
    891  
    892   _src += sizeof(int32_t)*3; 
    893   bstarts = (int32_t *)_src; 
    894   /* Compute some params */ 
    895   /* Total blocks */ 
    896   nblocks = nbytes / blocksize; 
    897   leftover = nbytes % blocksize; 
    898   nblocks = (leftover>0)? nblocks+1: nblocks; 
    899   _src += sizeof(int32_t)*nblocks; 
    900  
    9011485  /* Check region boundaries */ 
    9021486  if ((start < 0) || (start*typesize > nbytes)) { 
    9031487    fprintf(stderr, "`start` out of bounds"); 
    904     return (-1); 
     1488    return -1; 
    9051489  } 
    9061490 
    9071491  if ((stop < 0) || (stop*typesize > nbytes)) { 
    9081492    fprintf(stderr, "`start`+`nitems` out of bounds"); 
    909     return (-1); 
    910   } 
    911  
    912   /* Parameters needed by blosc_d */ 
    913   params.typesize = typesize; 
    914   params.flags = flags; 
    915  
    916   /* Initialize temporaries if needed */ 
    917   if (tmp == NULL || tmp2 == NULL || current_temp.blocksize < blocksize) { 
    918     tmp = my_malloc(blocksize); 
    919     if (tmp == NULL) { 
    920       return -1; 
    921     } 
    922     tmp2 = my_malloc(blocksize); 
    923     if (tmp2 == NULL) { 
    924       return -1; 
    925     } 
    926     tmp_init = 1; 
     1493    return -1; 
    9271494  } 
    9281495 
     
    9581525    } 
    9591526    else { 
     1527      struct blosc_context context; 
     1528      /* blosc_d only uses typesize and flags */ 
     1529      context.typesize = typesize; 
     1530      context.header_flags = &flags; 
     1531 
    9601532      /* Regular decompression.  Put results in tmp2. */ 
    961       cbytes = blosc_d(bsize, leftoverblock, 
    962                        (uint8_t *)src+sw32(bstarts[j]), tmp2, tmp, tmp2); 
     1533      cbytes = blosc_d(&context, bsize, leftoverblock, 
     1534                       (uint8_t *)src + sw32_(bstarts + j * 4), 
     1535                       tmp2, tmp, tmp3); 
    9631536      if (cbytes < 0) { 
    9641537        ntbytes = cbytes; 
     
    9711544    ntbytes += cbytes; 
    9721545  } 
    973    
    974   /* Release global lock */ 
    975   pthread_mutex_unlock(&global_comp_mutex); 
    976  
    977   if (tmp_init) { 
    978     my_free(tmp); 
    979     my_free(tmp2); 
    980   } 
     1546 
     1547  my_free(tmp); 
    9811548 
    9821549  return ntbytes; 
     
    9851552 
    9861553/* Decompress & unshuffle several blocks in a single thread */ 
    987 static int t_blosc(void *tids) 
    988 { 
    989   int32_t tid = *(int32_t *)tids; 
     1554static void *t_blosc(void *ctxt) 
     1555{ 
     1556  struct thread_context* context = (struct thread_context*)ctxt; 
    9901557  int32_t cbytes, ntdest; 
    9911558  int32_t tblocks;              /* number of blocks per thread */ 
     
    10031570  int32_t nblocks; 
    10041571  int32_t leftover; 
    1005   int32_t *bstarts; 
    1006   uint8_t *src; 
     1572  uint8_t *bstarts; 
     1573  const uint8_t *src; 
    10071574  uint8_t *dest; 
    10081575  uint8_t *tmp; 
    10091576  uint8_t *tmp2; 
    1010  
    1011   while (1) { 
    1012  
    1013     init_sentinels_done = 0;     /* sentinels have to be initialised yet */ 
    1014  
     1577  uint8_t *tmp3; 
     1578  int rc; 
     1579 
     1580  while(1) 
     1581  { 
    10151582    /* Synchronization point for all threads (wait for initialization) */ 
    1016     WAIT_INIT; 
    1017  
    1018     /* Check if thread has been asked to return */ 
    1019     if (end_threads) { 
    1020       return(0); 
    1021     } 
    1022  
    1023     pthread_mutex_lock(&count_mutex); 
    1024     if (!init_sentinels_done) { 
    1025       /* Set sentinels and other global variables */ 
    1026       giveup_code = 1;            /* no error code initially */ 
    1027       nblock = -1;                /* block counter */ 
    1028       init_sentinels_done = 1;    /* sentinels have been initialised */ 
    1029     } 
    1030     pthread_mutex_unlock(&count_mutex); 
     1583    WAIT_INIT(NULL, context->parent_context); 
     1584 
     1585    if(context->parent_context->end_threads) 
     1586    { 
     1587      break; 
     1588    } 
    10311589 
    10321590    /* Get parameters for this thread before entering the main loop */ 
    1033     blocksize = params.blocksize; 
    1034     ebsize = blocksize + params.typesize*(int32_t)sizeof(int32_t); 
    1035     compress = params.compress; 
    1036     flags = params.flags; 
    1037     maxbytes = params.maxbytes; 
    1038     nblocks = params.nblocks; 
    1039     leftover = params.leftover; 
    1040     bstarts = params.bstarts; 
    1041     src = params.src; 
    1042     dest = params.dest; 
    1043     tmp = params.tmp[tid]; 
    1044     tmp2 = params.tmp2[tid]; 
     1591    blocksize = context->parent_context->blocksize; 
     1592    ebsize = blocksize + context->parent_context->typesize * (int32_t)sizeof(int32_t); 
     1593    compress = context->parent_context->compress; 
     1594    flags = *(context->parent_context->header_flags); 
     1595    maxbytes = context->parent_context->destsize; 
     1596    nblocks = context->parent_context->nblocks; 
     1597    leftover = context->parent_context->leftover; 
     1598    bstarts = context->parent_context->bstarts; 
     1599    src = context->parent_context->src; 
     1600    dest = context->parent_context->dest; 
     1601 
     1602    if (blocksize > context->tmpblocksize) 
     1603    { 
     1604      my_free(context->tmp); 
     1605      context->tmp = my_malloc(blocksize + ebsize + blocksize); 
     1606      context->tmp2 = context->tmp + blocksize; 
     1607      context->tmp3 = context->tmp + blocksize + ebsize; 
     1608    } 
     1609 
     1610    tmp = context->tmp; 
     1611    tmp2 = context->tmp2; 
     1612    tmp3 = context->tmp3; 
    10451613 
    10461614    ntbytes = 0;                /* only useful for decompression */ 
     
    10481616    if (compress && !(flags & BLOSC_MEMCPYED)) { 
    10491617      /* Compression always has to follow the block order */ 
    1050       pthread_mutex_lock(&count_mutex); 
    1051       nblock++; 
    1052       nblock_ = nblock; 
    1053       pthread_mutex_unlock(&count_mutex); 
     1618      pthread_mutex_lock(&context->parent_context->count_mutex); 
     1619      context->parent_context->thread_nblock++; 
     1620      nblock_ = context->parent_context->thread_nblock; 
     1621      pthread_mutex_unlock(&context->parent_context->count_mutex); 
    10541622      tblock = nblocks; 
    10551623    } 
     
    10591627 
    10601628      /* Blocks per thread */ 
    1061       tblocks = nblocks / nthreads; 
    1062       leftover2 = nblocks % nthreads; 
     1629      tblocks = nblocks / context->parent_context->numthreads; 
     1630      leftover2 = nblocks % context->parent_context->numthreads; 
    10631631      tblocks = (leftover2>0)? tblocks+1: tblocks; 
    10641632 
    1065       nblock_ = tid*tblocks; 
     1633      nblock_ = context->tid*tblocks; 
    10661634      tblock = nblock_ + tblocks; 
    10671635      if (tblock > nblocks) { 
     
    10721640    /* Loop over blocks */ 
    10731641    leftoverblock = 0; 
    1074     while ((nblock_ < tblock) && giveup_code > 0) { 
     1642    while ((nblock_ < tblock) && context->parent_context->thread_giveup_code > 0) { 
    10751643      bsize = blocksize; 
    10761644      if (nblock_ == (nblocks - 1) && (leftover > 0)) { 
     
    10871655        else { 
    10881656          /* Regular compression */ 
    1089           cbytes = blosc_c(bsize, leftoverblock, 0, ebsize, 
    1090                            src+nblock_*blocksize, tmp2, tmp); 
     1657          cbytes = blosc_c(context->parent_context, bsize, leftoverblock, 0, ebsize, 
     1658                           src+nblock_*blocksize, tmp2, tmp, tmp3); 
    10911659        } 
    10921660      } 
     
    10991667        } 
    11001668        else { 
    1101           cbytes = blosc_d(bsize, leftoverblock, 
    1102                            src+sw32(bstarts[nblock_]), dest+nblock_*blocksize, 
     1669          cbytes = blosc_d(context->parent_context, bsize, leftoverblock, 
     1670                           src + sw32_(bstarts + nblock_ * 4), 
     1671                           dest+nblock_*blocksize, 
    11031672                           tmp, tmp2); 
    11041673        } 
     
    11061675 
    11071676      /* Check whether current thread has to giveup */ 
    1108       if (giveup_code <= 0) { 
     1677      if (context->parent_context->thread_giveup_code <= 0) { 
    11091678        break; 
    11101679      } 
     
    11131682      if (cbytes < 0) {            /* compr/decompr failure */ 
    11141683        /* Set giveup_code error */ 
    1115         pthread_mutex_lock(&count_mutex); 
    1116         giveup_code = cbytes; 
    1117         pthread_mutex_unlock(&count_mutex); 
     1684        pthread_mutex_lock(&context->parent_context->count_mutex); 
     1685        context->parent_context->thread_giveup_code = cbytes; 
     1686        pthread_mutex_unlock(&context->parent_context->count_mutex); 
    11181687        break; 
    11191688      } 
     
    11211690      if (compress && !(flags & BLOSC_MEMCPYED)) { 
    11221691        /* Start critical section */ 
    1123         pthread_mutex_lock(&count_mutex); 
    1124         ntdest = params.ntbytes; 
    1125         bstarts[nblock_] = sw32(ntdest);    /* update block start counter */ 
    1126         if ( (cbytes == 0) || (ntdest+cbytes > (int32_t)maxbytes) ) { 
    1127           giveup_code = 0;                  /* uncompressible buffer */ 
    1128           pthread_mutex_unlock(&count_mutex); 
     1692        pthread_mutex_lock(&context->parent_context->count_mutex); 
     1693        ntdest = context->parent_context->num_output_bytes; 
     1694        _sw32(bstarts + nblock_ * 4, ntdest); /* update block start counter */ 
     1695        if ( (cbytes == 0) || (ntdest+cbytes > maxbytes) ) { 
     1696          context->parent_context->thread_giveup_code = 0;  /* uncompressible buffer */ 
     1697          pthread_mutex_unlock(&context->parent_context->count_mutex); 
    11291698          break; 
    11301699        } 
    1131         nblock++; 
    1132         nblock_ = nblock; 
    1133         params.ntbytes += cbytes;           /* update return bytes counter */ 
    1134         pthread_mutex_unlock(&count_mutex); 
     1700        context->parent_context->thread_nblock++; 
     1701        nblock_ = context->parent_context->thread_nblock; 
     1702        context->parent_context->num_output_bytes += cbytes;           /* update return bytes counter */ 
     1703        pthread_mutex_unlock(&context->parent_context->count_mutex); 
    11351704        /* End of critical section */ 
    11361705 
     
    11471716 
    11481717    /* Sum up all the bytes decompressed */ 
    1149     if ((!compress || (flags & BLOSC_MEMCPYED)) && giveup_code > 0) { 
     1718    if ((!compress || (flags & BLOSC_MEMCPYED)) && context->parent_context->thread_giveup_code > 0) { 
    11501719      /* Update global counter for all threads (decompression only) */ 
    1151       pthread_mutex_lock(&count_mutex); 
    1152       params.ntbytes += ntbytes; 
    1153       pthread_mutex_unlock(&count_mutex); 
     1720      pthread_mutex_lock(&context->parent_context->count_mutex); 
     1721      context->parent_context->num_output_bytes += ntbytes; 
     1722      pthread_mutex_unlock(&context->parent_context->count_mutex); 
    11541723    } 
    11551724 
    11561725    /* Meeting point for all threads (wait for finalization) */ 
    1157     WAIT_FINISH; 
    1158  
    1159   }  /* closes while(1) */ 
    1160  
    1161   /* This should never be reached, but anyway */ 
    1162   return(0); 
    1163 } 
    1164  
    1165  
    1166 static int init_threads(void) 
     1726    WAIT_FINISH(NULL, context->parent_context); 
     1727  } 
     1728 
     1729  /* Cleanup our working space and context */ 
     1730  my_free(context->tmp); 
     1731  my_free(context); 
     1732 
     1733  return(NULL); 
     1734} 
     1735 
     1736 
     1737static int init_threads(struct blosc_context* context) 
    11671738{ 
    11681739  int32_t tid; 
    11691740  int rc2; 
     1741  int32_t ebsize; 
     1742  struct thread_context* thread_context; 
    11701743 
    11711744  /* Initialize mutex and condition variable objects */ 
    1172   pthread_mutex_init(&count_mutex, NULL); 
     1745  pthread_mutex_init(&context->count_mutex, NULL); 
     1746 
     1747  /* Set context thread sentinels */ 
     1748  context->thread_giveup_code = 1; 
     1749  context->thread_nblock = -1; 
    11731750 
    11741751  /* Barrier initialization */ 
    11751752#ifdef _POSIX_BARRIERS_MINE 
    1176   pthread_barrier_init(&barr_init, NULL, nthreads+1); 
    1177   pthread_barrier_init(&barr_finish, NULL, nthreads+1); 
     1753  pthread_barrier_init(&context->barr_init, NULL, context->numthreads+1); 
     1754  pthread_barrier_init(&context->barr_finish, NULL, context->numthreads+1); 
    11781755#else 
    1179   pthread_mutex_init(&count_threads_mutex, NULL); 
    1180   pthread_cond_init(&count_threads_cv, NULL); 
    1181   count_threads = 0;      /* Reset threads counter */ 
     1756  pthread_mutex_init(&context->count_threads_mutex, NULL); 
     1757  pthread_cond_init(&context->count_threads_cv, NULL); 
     1758  context->count_threads = 0;      /* Reset threads counter */ 
    11821759#endif 
    11831760 
    11841761#if !defined(_WIN32) 
    11851762  /* Initialize and set thread detached attribute */ 
    1186   pthread_attr_init(&ct_attr); 
    1187   pthread_attr_setdetachstate(&ct_attr, PTHREAD_CREATE_JOINABLE); 
     1763  pthread_attr_init(&context->ct_attr); 
     1764  pthread_attr_setdetachstate(&context->ct_attr, PTHREAD_CREATE_JOINABLE); 
    11881765#endif 
    11891766 
    11901767  /* Finally, create the threads in detached state */ 
    1191   for (tid = 0; tid < nthreads; tid++) { 
    1192     tids[tid] = tid; 
     1768  for (tid = 0; tid < context->numthreads; tid++) { 
     1769    context->tids[tid] = tid; 
     1770 
     1771    /* Create a thread context thread owns context (will destroy when finished) */ 
     1772    thread_context = (struct thread_context*)my_malloc(sizeof(struct thread_context)); 
     1773    thread_context->parent_context = context; 
     1774    thread_context->tid = tid; 
     1775 
     1776    ebsize = context->blocksize + context->typesize * (int32_t)sizeof(int32_t); 
     1777    thread_context->tmp = my_malloc(context->blocksize + ebsize + context->blocksize); 
     1778    thread_context->tmp2 = thread_context->tmp + context->blocksize; 
     1779    thread_context->tmp3 = thread_context->tmp + context->blocksize + ebsize; 
     1780    thread_context->tmpblocksize = context->blocksize; 
     1781 
    11931782#if !defined(_WIN32) 
    1194     rc2 = pthread_create(&threads[tid], &ct_attr, (void*)t_blosc, 
    1195                         (void *)&tids[tid]); 
     1783    rc2 = pthread_create(&context->threads[tid], &context->ct_attr, t_blosc, (void *)thread_context); 
    11961784#else 
    1197     rc2 = pthread_create(&threads[tid], NULL, (void*)t_blosc, 
    1198                         (void *)&tids[tid]); 
     1785    rc2 = pthread_create(&context->threads[tid], NULL, t_blosc, (void *)thread_context); 
    11991786#endif 
    12001787    if (rc2) { 
     
    12051792  } 
    12061793 
    1207   init_threads_done = 1;                 /* Initialization done! */ 
    1208   pid = (int)getpid();                   /* save the PID for this process */ 
    12091794 
    12101795  return(0); 
    12111796} 
    12121797 
    1213 void blosc_init(void) { 
    1214   /* Init global lock  */ 
    1215   pthread_mutex_init(&global_comp_mutex, NULL); 
    1216   init_lib = 1; 
    1217 } 
    1218  
    1219 int blosc_set_nthreads(int nthreads_new)  
    1220 { 
    1221   int ret; 
    1222  
    1223   /* Check if should initialize (implementing previous 1.2.3 behaviour, 
    1224      where calling blosc_set_nthreads was enough) */ 
    1225   if (!init_lib) blosc_init(); 
    1226  
    1227   /* Take global lock  */ 
    1228   pthread_mutex_lock(&global_comp_mutex); 
    1229    
    1230   ret = blosc_set_nthreads_(nthreads_new); 
    1231   /* Release global lock  */ 
    1232   pthread_mutex_unlock(&global_comp_mutex); 
    1233    
     1798int blosc_get_nthreads(void) 
     1799{ 
     1800  int ret = g_threads; 
     1801 
    12341802  return ret; 
    12351803} 
    12361804 
    1237 int blosc_set_nthreads_(int nthreads_new) 
    1238 { 
    1239   int32_t nthreads_old = nthreads; 
    1240   int32_t t; 
    1241   int rc2; 
    1242   void *status; 
    1243  
    1244   if (nthreads_new > BLOSC_MAX_THREADS) { 
     1805int blosc_set_nthreads(int nthreads_new) 
     1806{ 
     1807  int ret = g_threads; 
     1808 
     1809  /* Check if should initialize */ 
     1810  if (!g_initlib) blosc_init(); 
     1811 
     1812  if (nthreads_new != ret){ 
     1813    /* Re-initialize Blosc */ 
     1814    blosc_destroy(); 
     1815    blosc_init(); 
     1816    g_threads = nthreads_new; 
     1817  } 
     1818 
     1819  return ret; 
     1820} 
     1821 
     1822int blosc_set_nthreads_(struct blosc_context* context) 
     1823{ 
     1824  if (context->numthreads > BLOSC_MAX_THREADS) { 
    12451825    fprintf(stderr, 
    12461826            "Error.  nthreads cannot be larger than BLOSC_MAX_THREADS (%d)", 
     
    12481828    return -1; 
    12491829  } 
    1250   else if (nthreads_new <= 0) { 
     1830  else if (context->numthreads <= 0) { 
    12511831    fprintf(stderr, "Error.  nthreads must be a positive integer"); 
    12521832    return -1; 
    12531833  } 
    12541834 
    1255   /* Only join threads if they are not initialized or if our PID is 
    1256      different from that in pid var (probably means that we are a 
    1257      subprocess, and thus threads are non-existent). */ 
    1258   if (nthreads > 1 && init_threads_done && pid == getpid()) { 
    1259       /* Tell all existing threads to finish */ 
    1260       end_threads = 1; 
    1261       /* Synchronization point for all threads (wait for initialization) */ 
    1262       WAIT_INIT; 
    1263       /* Join exiting threads */ 
    1264       for (t=0; t<nthreads; t++) { 
    1265         rc2 = pthread_join(threads[t], &status); 
    1266         if (rc2) { 
    1267           fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc2); 
    1268           fprintf(stderr, "\tError detail: %s\n", strerror(rc2)); 
    1269           return(-1); 
    1270         } 
    1271       } 
    1272       init_threads_done = 0; 
    1273       end_threads = 0; 
    1274     } 
    1275  
    1276   /* Launch a new pool of threads (if necessary) */ 
    1277   nthreads = nthreads_new; 
    1278   if (nthreads > 1 && (!init_threads_done || pid != getpid())) { 
    1279     init_threads(); 
    1280   } 
    1281  
    1282   return nthreads_old; 
    1283 } 
    1284  
    1285  
    1286 /* Free possible memory temporaries and thread resources */ 
    1287 int blosc_free_resources(void) 
     1835  /* Launch a new pool of threads */ 
     1836  if (context->numthreads > 1 && context->numthreads != context->threads_started) { 
     1837    blosc_release_threadpool(context); 
     1838    init_threads(context); 
     1839  } 
     1840 
     1841  /* We have now started the threads */ 
     1842  context->threads_started = context->numthreads; 
     1843 
     1844  return context->numthreads; 
     1845} 
     1846 
     1847char* blosc_get_compressor(void) 
     1848{ 
     1849  char* compname; 
     1850  blosc_compcode_to_compname(g_compressor, &compname); 
     1851 
     1852  return compname; 
     1853} 
     1854 
     1855int blosc_set_compressor(const char *compname) 
     1856{ 
     1857  int code = blosc_compname_to_compcode(compname); 
     1858 
     1859  g_compressor = code; 
     1860 
     1861  /* Check if should initialize */ 
     1862  if (!g_initlib) blosc_init(); 
     1863 
     1864  return code; 
     1865} 
     1866 
     1867char* blosc_list_compressors(void) 
     1868{ 
     1869  static int compressors_list_done = 0; 
     1870  static char ret[256]; 
     1871 
     1872  if (compressors_list_done) return ret; 
     1873  ret[0] = '\0'; 
     1874  strcat(ret, BLOSC_BLOSCLZ_COMPNAME); 
     1875#if defined(HAVE_LZ4) 
     1876  strcat(ret, ","); strcat(ret, BLOSC_LZ4_COMPNAME); 
     1877  strcat(ret, ","); strcat(ret, BLOSC_LZ4HC_COMPNAME); 
     1878#endif /* HAVE_LZ4 */ 
     1879#if defined(HAVE_SNAPPY) 
     1880  strcat(ret, ","); strcat(ret, BLOSC_SNAPPY_COMPNAME); 
     1881#endif /* HAVE_SNAPPY */ 
     1882#if defined(HAVE_ZLIB) 
     1883  strcat(ret, ","); strcat(ret, BLOSC_ZLIB_COMPNAME); 
     1884#endif /* HAVE_ZLIB */ 
     1885#if defined(HAVE_ZSTD) 
     1886  strcat(ret, ","); strcat(ret, BLOSC_ZSTD_COMPNAME); 
     1887#endif /* HAVE_ZSTD */ 
     1888  compressors_list_done = 1; 
     1889  return ret; 
     1890} 
     1891 
     1892char* blosc_get_version_string(void) 
     1893{ 
     1894  static char ret[256]; 
     1895  strcpy(ret, BLOSC_VERSION_STRING); 
     1896  return ret; 
     1897} 
     1898 
     1899int blosc_get_complib_info(char *compname, char **complib, char **version) 
     1900{ 
     1901  int clibcode; 
     1902  char *clibname; 
     1903  char *clibversion = "unknown"; 
     1904 
     1905#if (defined(HAVE_LZ4) && defined(LZ4_VERSION_MAJOR)) || (defined(HAVE_SNAPPY) && defined(SNAPPY_VERSION)) || defined(ZSTD_VERSION_MAJOR) 
     1906  char sbuffer[256]; 
     1907#endif 
     1908 
     1909  clibcode = compname_to_clibcode(compname); 
     1910  clibname = clibcode_to_clibname(clibcode); 
     1911 
     1912  /* complib version */ 
     1913  if (clibcode == BLOSC_BLOSCLZ_LIB) { 
     1914    clibversion = BLOSCLZ_VERSION_STRING; 
     1915  } 
     1916#if defined(HAVE_LZ4) 
     1917  else if (clibcode == BLOSC_LZ4_LIB) { 
     1918#if defined(LZ4_VERSION_MAJOR) 
     1919    sprintf(sbuffer, "%d.%d.%d", 
     1920            LZ4_VERSION_MAJOR, LZ4_VERSION_MINOR, LZ4_VERSION_RELEASE); 
     1921    clibversion = sbuffer; 
     1922#endif /* LZ4_VERSION_MAJOR */ 
     1923  } 
     1924#endif /* HAVE_LZ4 */ 
     1925#if defined(HAVE_SNAPPY) 
     1926  else if (clibcode == BLOSC_SNAPPY_LIB) { 
     1927#if defined(SNAPPY_VERSION) 
     1928    sprintf(sbuffer, "%d.%d.%d", SNAPPY_MAJOR, SNAPPY_MINOR, SNAPPY_PATCHLEVEL); 
     1929    clibversion = sbuffer; 
     1930#endif /* SNAPPY_VERSION */ 
     1931  } 
     1932#endif /* HAVE_SNAPPY */ 
     1933#if defined(HAVE_ZLIB) 
     1934  else if (clibcode == BLOSC_ZLIB_LIB) { 
     1935    clibversion = ZLIB_VERSION; 
     1936  } 
     1937#endif /* HAVE_ZLIB */ 
     1938#if defined(HAVE_ZSTD) 
     1939  else if (clibcode == BLOSC_ZSTD_LIB) { 
     1940    sprintf(sbuffer, "%d.%d.%d", 
     1941            ZSTD_VERSION_MAJOR, ZSTD_VERSION_MINOR, ZSTD_VERSION_RELEASE); 
     1942    clibversion = sbuffer; 
     1943  } 
     1944#endif /* HAVE_ZSTD */ 
     1945 
     1946  *complib = strdup(clibname); 
     1947  *version = strdup(clibversion); 
     1948  return clibcode; 
     1949} 
     1950 
     1951/* Return `nbytes`, `cbytes` and `blocksize` from a compressed buffer. */ 
     1952void blosc_cbuffer_sizes(const void *cbuffer, size_t *nbytes, 
     1953                         size_t *cbytes, size_t *blocksize) 
     1954{ 
     1955  uint8_t *_src = (uint8_t *)(cbuffer);    /* current pos for source buffer */ 
     1956  uint8_t version, versionlz;              /* versions for compressed header */ 
     1957 
     1958  /* Read the version info (could be useful in the future) */ 
     1959  version = _src[0];                       /* blosc format version */ 
     1960  versionlz = _src[1];                     /* blosclz format version */ 
     1961 
     1962  version += 0;                            /* shut up compiler warning */ 
     1963  versionlz += 0;                          /* shut up compiler warning */ 
     1964 
     1965  /* Read the interesting values */ 
     1966  *nbytes = (size_t)sw32_(_src + 4);       /* uncompressed buffer size */ 
     1967  *blocksize = (size_t)sw32_(_src + 8);    /* block size */ 
     1968  *cbytes = (size_t)sw32_(_src + 12);      /* compressed buffer size */ 
     1969} 
     1970 
     1971 
     1972/* Return `typesize` and `flags` from a compressed buffer. */ 
     1973void blosc_cbuffer_metainfo(const void *cbuffer, size_t *typesize, 
     1974                            int *flags) 
     1975{ 
     1976  uint8_t *_src = (uint8_t *)(cbuffer);  /* current pos for source buffer */ 
     1977  uint8_t version, versionlz;            /* versions for compressed header */ 
     1978 
     1979  /* Read the version info (could be useful in the future) */ 
     1980  version = _src[0];                     /* blosc format version */ 
     1981  versionlz = _src[1];                   /* blosclz format version */ 
     1982 
     1983  version += 0;                             /* shut up compiler warning */ 
     1984  versionlz += 0;                           /* shut up compiler warning */ 
     1985 
     1986  /* Read the interesting values */ 
     1987  *flags = (int)_src[2];                 /* flags */ 
     1988  *typesize = (size_t)_src[3];           /* typesize */ 
     1989} 
     1990 
     1991 
     1992/* Return version information from a compressed buffer. */ 
     1993void blosc_cbuffer_versions(const void *cbuffer, int *version, 
     1994                            int *versionlz) 
     1995{ 
     1996  uint8_t *_src = (uint8_t *)(cbuffer);  /* current pos for source buffer */ 
     1997 
     1998  /* Read the version info */ 
     1999  *version = (int)_src[0];         /* blosc format version */ 
     2000  *versionlz = (int)_src[1];       /* Lempel-Ziv compressor format version */ 
     2001} 
     2002 
     2003 
     2004/* Return the compressor library/format used in a compressed buffer. */ 
     2005char *blosc_cbuffer_complib(const void *cbuffer) 
     2006{ 
     2007  uint8_t *_src = (uint8_t *)(cbuffer);  /* current pos for source buffer */ 
     2008  int clibcode; 
     2009  char *complib; 
     2010 
     2011  /* Read the compressor format/library info */ 
     2012  clibcode = (_src[2] & 0xe0) >> 5; 
     2013  complib = clibcode_to_clibname(clibcode); 
     2014  return complib; 
     2015} 
     2016 
     2017/* Get the internal blocksize to be used during compression.  0 means 
     2018   that an automatic blocksize is computed internally. */ 
     2019int blosc_get_blocksize(void) 
     2020{ 
     2021  return (int)g_force_blocksize; 
     2022} 
     2023 
     2024/* Force the use of a specific blocksize.  If 0, an automatic 
     2025   blocksize will be used (the default). */ 
     2026void blosc_set_blocksize(size_t size) 
     2027{ 
     2028  g_force_blocksize = (int32_t)size; 
     2029} 
     2030 
     2031void blosc_init(void) 
     2032{ 
     2033  /* Return if we are already initialized */ 
     2034  if (g_initlib) return; 
     2035 
     2036  pthread_mutex_init(&global_comp_mutex, NULL); 
     2037  g_global_context = (struct blosc_context*)my_malloc(sizeof(struct blosc_context)); 
     2038  g_global_context->threads_started = 0; 
     2039  g_initlib = 1; 
     2040} 
     2041 
     2042void blosc_destroy(void) 
     2043{ 
     2044  /* Return if Blosc is not initialized */ 
     2045  if (!g_initlib) return; 
     2046 
     2047  g_initlib = 0; 
     2048  blosc_release_threadpool(g_global_context); 
     2049  my_free(g_global_context); 
     2050  pthread_mutex_destroy(&global_comp_mutex); 
     2051} 
     2052 
     2053int blosc_release_threadpool(struct blosc_context* context) 
    12882054{ 
    12892055  int32_t t; 
     2056  void* status; 
     2057  int rc; 
    12902058  int rc2; 
    1291   void *status; 
    1292   
    1293    /* Take global lock  */ 
    1294   pthread_mutex_lock(&global_comp_mutex); 
    1295  
    1296   /* Release temporaries */ 
    1297   if (init_temps_done) { 
    1298     release_temporaries(); 
    1299   } 
    1300  
    1301   /* Finish the possible thread pool */ 
    1302   if (nthreads > 1 && init_threads_done) { 
     2059 
     2060  if (context->threads_started > 0) 
     2061  { 
    13032062    /* Tell all existing threads to finish */ 
    1304     end_threads = 1; 
    1305     /* Synchronization point for all threads (wait for initialization) */ 
    1306     WAIT_INIT; 
     2063    context->end_threads = 1; 
     2064 
     2065    /* Sync threads */ 
     2066    WAIT_INIT(-1, context); 
     2067 
    13072068    /* Join exiting threads */ 
    1308     for (t=0; t<nthreads; t++) { 
    1309       rc2 = pthread_join(threads[t], &status); 
     2069    for (t=0; t<context->threads_started; t++) { 
     2070      rc2 = pthread_join(context->threads[t], &status); 
    13102071      if (rc2) { 
    13112072        fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc2); 
    13122073        fprintf(stderr, "\tError detail: %s\n", strerror(rc2)); 
    1313         return(-1); 
    13142074      } 
    13152075    } 
    13162076 
    13172077    /* Release mutex and condition variable objects */ 
    1318     pthread_mutex_destroy(&count_mutex); 
     2078    pthread_mutex_destroy(&context->count_mutex); 
    13192079 
    13202080    /* Barriers */ 
    1321 #ifdef _POSIX_BARRIERS_MINE 
    1322     pthread_barrier_destroy(&barr_init); 
    1323     pthread_barrier_destroy(&barr_finish); 
    1324 #else 
    1325     pthread_mutex_destroy(&count_threads_mutex); 
    1326     pthread_cond_destroy(&count_threads_cv); 
    1327 #endif 
    1328  
    1329     /* Thread attributes */ 
    1330 #if !defined(_WIN32) 
    1331     pthread_attr_destroy(&ct_attr); 
    1332 #endif 
    1333  
    1334     init_threads_done = 0; 
    1335     end_threads = 0; 
    1336   } 
    1337    /* Release global lock  */ 
    1338   pthread_mutex_unlock(&global_comp_mutex); 
    1339   return(0); 
    1340  
    1341 } 
    1342  
    1343 void blosc_destroy(void) { 
    1344   /* Free the resources */ 
    1345   blosc_free_resources(); 
    1346   /* Destroy global lock */ 
    1347   pthread_mutex_destroy(&global_comp_mutex); 
    1348 } 
    1349  
    1350 /* Return `nbytes`, `cbytes` and `blocksize` from a compressed buffer. */ 
    1351 void blosc_cbuffer_sizes(const void *cbuffer, size_t *nbytes, 
    1352                          size_t *cbytes, size_t *blocksize) 
    1353 { 
    1354   uint8_t *_src = (uint8_t *)(cbuffer);    /* current pos for source buffer */ 
    1355   uint8_t version, versionlz;              /* versions for compressed header */ 
    1356  
    1357   /* Read the version info (could be useful in the future) */ 
    1358   version = _src[0];                         /* blosc format version */ 
    1359   versionlz = _src[1];                       /* blosclz format version */ 
    1360  
    1361   version += 0;                             /* shut up compiler warning */ 
    1362   versionlz += 0;                           /* shut up compiler warning */ 
    1363  
    1364   /* Read the interesting values */ 
    1365   _src += 4; 
    1366   *nbytes = (size_t)sw32(((int32_t *)_src)[0]);  /* uncompressed buffer size */ 
    1367   *blocksize = (size_t)sw32(((int32_t *)_src)[1]);   /* block size */ 
    1368   *cbytes = (size_t)sw32(((int32_t *)_src)[2]);  /* compressed buffer size */ 
    1369 } 
    1370  
    1371  
    1372 /* Return `typesize` and `flags` from a compressed buffer. */ 
    1373 void blosc_cbuffer_metainfo(const void *cbuffer, size_t *typesize, 
    1374                             int *flags) 
    1375 { 
    1376   uint8_t *_src = (uint8_t *)(cbuffer);  /* current pos for source buffer */ 
    1377   uint8_t version, versionlz;            /* versions for compressed header */ 
    1378  
    1379   /* Read the version info (could be useful in the future) */ 
    1380   version = _src[0];                     /* blosc format version */ 
    1381   versionlz = _src[1];                   /* blosclz format version */ 
    1382  
    1383   version += 0;                             /* shut up compiler warning */ 
    1384   versionlz += 0;                           /* shut up compiler warning */ 
    1385  
    1386   /* Read the interesting values */ 
    1387   *flags = (int)_src[2];                 /* flags */ 
    1388   *typesize = (size_t)_src[3];           /* typesize */ 
    1389 } 
    1390  
    1391  
    1392 /* Return version information from a compressed buffer. */ 
    1393 void blosc_cbuffer_versions(const void *cbuffer, int *version, 
    1394                             int *versionlz) 
    1395 { 
    1396   uint8_t *_src = (uint8_t *)(cbuffer);  /* current pos for source buffer */ 
    1397  
    1398   /* Read the version info */ 
    1399   *version = (int)_src[0];             /* blosc format version */ 
    1400   *versionlz = (int)_src[1];           /* blosclz format version */ 
    1401 } 
    1402  
    1403  
    1404 /* Force the use of a specific blocksize.  If 0, an automatic 
    1405    blocksize will be used (the default). */ 
    1406 void blosc_set_blocksize(size_t size) 
    1407 { 
    1408   /* Take global lock  */ 
    1409   pthread_mutex_lock(&global_comp_mutex); 
    1410    
    1411   force_blocksize = (int32_t)size; 
    1412    
    1413    /* Release global lock  */ 
    1414   pthread_mutex_unlock(&global_comp_mutex); 
    1415 } 
     2081  #ifdef _POSIX_BARRIERS_MINE 
     2082      pthread_barrier_destroy(&context->barr_init); 
     2083      pthread_barrier_destroy(&context->barr_finish); 
     2084  #else 
     2085      pthread_mutex_destroy(&context->count_threads_mutex); 
     2086      pthread_cond_destroy(&context->count_threads_cv); 
     2087  #endif 
     2088 
     2089      /* Thread attributes */ 
     2090  #if !defined(_WIN32) 
     2091      pthread_attr_destroy(&context->ct_attr); 
     2092  #endif 
     2093 
     2094  } 
     2095 
     2096  context->threads_started = 0; 
     2097 
     2098  return 0; 
     2099} 
     2100 
     2101int blosc_free_resources(void) 
     2102{ 
     2103  /* Return if Blosc is not initialized */ 
     2104  if (!g_initlib) return -1; 
     2105 
     2106  return blosc_release_threadpool(g_global_context); 
     2107} 
Note: See TracChangeset for help on using the changeset viewer.