source: thirdparty/blosc/blosc.c @ 00587dc

Revision 00587dc, 40.8 KB checked in by Hal Finkel <hfinkel@…>, 9 years ago (diff)

Initial Commit (gio-base-20150317)

  • Property mode set to 100644
Line 
1/*********************************************************************
2  Blosc - Blocked Suffling and Compression Library
3
4  Author: Francesc Alted <[email protected]>
5  Creation date: 2009-05-20
6
7  See LICENSES/BLOSC.txt for details about copyright and rights to use.
8**********************************************************************/
9
10
11#include <stdlib.h>
12#include <stdio.h>
13#include <string.h>
14#include <sys/types.h>
15#include <sys/stat.h>
16#include <assert.h>
17#include "blosc.h"
18#include "blosclz.h"
19#include "shuffle.h"
20
21#if defined(_WIN32) && !defined(__MINGW32__)
22  #include <windows.h>
23  #include "win32/stdint-windows.h"
24  #include <process.h>
25  #define getpid _getpid
26#else
27  #include <stdint.h>
28  #include <unistd.h>
29  #include <inttypes.h>
30#endif  /* _WIN32 */
31
32#if defined(_WIN32)
33  #include "win32/pthread.h"
34  #include "win32/pthread.c"
35#else
36  #include <pthread.h>
37#endif
38
39
40/* Some useful units */
41#define KB 1024
42#define MB (1024*KB)
43
44/* Minimum buffer size to be compressed */
45#define MIN_BUFFERSIZE 128       /* Cannot be smaller than 66 */
46
47/* The maximum number of splits in a block for compression */
48#define MAX_SPLITS 16            /* Cannot be larger than 128 */
49
50/* The size of L1 cache.  32 KB is quite common nowadays. */
51#define L1 (32*KB)
52
53/* Wrapped function to adjust the number of threads used by blosc */
54int blosc_set_nthreads_(int);
55
56/* Global variables for main logic */
57static int32_t init_temps_done = 0;    /* temp for compr/decompr initialized? */
58static int32_t force_blocksize = 0;    /* force the use of a blocksize? */
59static int pid = 0;                    /* the PID for this process */
60static int init_lib = 0;               /* is library initalized? */
61
62/* Global variables for threads */
63static int32_t nthreads = 1;            /* number of desired threads in pool */
64static int32_t init_threads_done = 0;   /* pool of threads initialized? */
65static int32_t end_threads = 0;         /* should exisiting threads end? */
66static int32_t init_sentinels_done = 0; /* sentinels initialized? */
67static int32_t giveup_code;             /* error code when give up */
68static int32_t nblock;                  /* block counter */
69static pthread_t threads[BLOSC_MAX_THREADS];  /* opaque structure for threads */
70static int32_t tids[BLOSC_MAX_THREADS];       /* ID per each thread */
71#if !defined(_WIN32)
72static pthread_attr_t ct_attr;          /* creation time attrs for threads */
73#endif
74
75/* Have problems using posix barriers when symbol value is 200112L */
76/* This requires more investigation, but will work for the moment */
77#if defined(_POSIX_BARRIERS) && ( (_POSIX_BARRIERS - 20012L) >= 0 && _POSIX_BARRIERS != 200112L)
78#define _POSIX_BARRIERS_MINE
79#endif
80
81/* Synchronization variables */
82static pthread_mutex_t count_mutex;
83static pthread_mutex_t global_comp_mutex;
84#ifdef _POSIX_BARRIERS_MINE
85static pthread_barrier_t barr_init;
86static pthread_barrier_t barr_finish;
87#else
88static int32_t count_threads;
89static pthread_mutex_t count_threads_mutex;
90static pthread_cond_t count_threads_cv;
91#endif
92
93
94/* Structure for parameters in (de-)compression threads */
95static struct thread_data {
96  int32_t typesize;
97  int32_t blocksize;
98  int32_t compress;
99  int32_t clevel;
100  int32_t flags;
101  int32_t memcpyed;
102  int32_t ntbytes;
103  int32_t nbytes;
104  int32_t maxbytes;
105  int32_t nblocks;
106  int32_t leftover;
107  int32_t *bstarts;             /* start pointers for each block */
108  uint8_t *src;
109  uint8_t *dest;
110  uint8_t *tmp[BLOSC_MAX_THREADS];
111  uint8_t *tmp2[BLOSC_MAX_THREADS];
112} params;
113
114
115/* Structure for parameters meant for keeping track of current temporaries */
116static struct temp_data {
117  int32_t nthreads;
118  int32_t typesize;
119  int32_t blocksize;
120} current_temp;
121
122
123/* Macros for synchronization */
124
125/* Wait until all threads are initialized */
126#ifdef _POSIX_BARRIERS_MINE
127static int rc;
128#define WAIT_INIT \
129  rc = pthread_barrier_wait(&barr_init); \
130  if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { \
131    printf("Could not wait on barrier (init)\n"); \
132    return(-1); \
133  }
134#else
135#define WAIT_INIT \
136  pthread_mutex_lock(&count_threads_mutex); \
137  if (count_threads < nthreads) { \
138    count_threads++; \
139    pthread_cond_wait(&count_threads_cv, &count_threads_mutex); \
140  } \
141  else { \
142    pthread_cond_broadcast(&count_threads_cv); \
143  } \
144  pthread_mutex_unlock(&count_threads_mutex);
145#endif
146
147/* Wait for all threads to finish */
148#ifdef _POSIX_BARRIERS_MINE
149#define WAIT_FINISH \
150  rc = pthread_barrier_wait(&barr_finish); \
151  if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { \
152    printf("Could not wait on barrier (finish)\n"); \
153    return(-1);                                       \
154  }
155#else
156#define WAIT_FINISH \
157  pthread_mutex_lock(&count_threads_mutex); \
158  if (count_threads > 0) { \
159    count_threads--; \
160    pthread_cond_wait(&count_threads_cv, &count_threads_mutex); \
161  } \
162  else { \
163    pthread_cond_broadcast(&count_threads_cv); \
164  } \
165  pthread_mutex_unlock(&count_threads_mutex);
166#endif
167
168
169/* A function for aligned malloc that is portable */
170static uint8_t *my_malloc(size_t size)
171{
172  void *block = NULL;
173  int res = 0;
174
175#if defined(_WIN32)
176  /* A (void *) cast needed for avoiding a warning with MINGW :-/ */
177  block = (void *)_aligned_malloc(size, 16);
178#elif defined __APPLE__
179  /* Mac OS X guarantees 16-byte alignment in small allocs */
180  block = malloc(size);
181#elif _POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600
182  /* Platform does have an implementation of posix_memalign */
183  res = posix_memalign(&block, 16, size);
184#else
185  block = malloc(size);
186#endif  /* _WIN32 */
187
188  if (block == NULL || res != 0) {
189    printf("Error allocating memory!");
190    return NULL;
191  }
192
193  return (uint8_t *)block;
194}
195
196
197/* Release memory booked by my_malloc */
198static void my_free(void *block)
199{
200#if defined(_WIN32)
201    _aligned_free(block);
202#else
203    free(block);
204#endif  /* _WIN32 */
205}
206
207
208/* If `a` is little-endian, return it as-is.  If not, return a copy,
209   with the endianness changed */
210static int32_t sw32(int32_t a)
211{
212  int32_t tmp;
213  char *pa = (char *)&a;
214  char *ptmp = (char *)&tmp;
215  int i = 1;                    /* for big/little endian detection */
216  char *p = (char *)&i;
217
218  if (p[0] != 1) {
219    /* big endian */
220    ptmp[0] = pa[3];
221    ptmp[1] = pa[2];
222    ptmp[2] = pa[1];
223    ptmp[3] = pa[0];
224    return tmp;
225  }
226  else {
227    /* little endian */
228    return a;
229  }
230}
231
232
233/* Shuffle & compress a single block */
234static int blosc_c(int32_t blocksize, int32_t leftoverblock,
235                   int32_t ntbytes, int32_t maxbytes,
236                   uint8_t *src, uint8_t *dest, uint8_t *tmp)
237{
238  int32_t j, neblock, nsplits;
239  int32_t cbytes;                   /* number of compressed bytes in split */
240  int32_t ctbytes = 0;              /* number of compressed bytes in block */
241  int32_t maxout;
242  int32_t typesize = params.typesize;
243  uint8_t *_tmp;
244
245  if ((params.flags & BLOSC_DOSHUFFLE) && (typesize > 1)) {
246    /* Shuffle this block (this makes sense only if typesize > 1) */
247    shuffle(typesize, blocksize, src, tmp);
248    _tmp = tmp;
249  }
250  else {
251    _tmp = src;
252  }
253
254  /* Compress for each shuffled slice split for this block. */
255  /* If typesize is too large, neblock is too small or we are in a
256     leftover block, do not split at all. */
257  if ((typesize <= MAX_SPLITS) && (blocksize/typesize) >= MIN_BUFFERSIZE &&
258      (!leftoverblock)) {
259    nsplits = typesize;
260  }
261  else {
262    nsplits = 1;
263  }
264  neblock = blocksize / nsplits;
265  for (j = 0; j < nsplits; j++) {
266    dest += sizeof(int32_t);
267    ntbytes += (int32_t)sizeof(int32_t);
268    ctbytes += (int32_t)sizeof(int32_t);
269    maxout = neblock;
270    if (ntbytes+maxout > maxbytes) {
271      maxout = maxbytes - ntbytes;   /* avoid buffer overrun */
272      if (maxout <= 0) {
273        return 0;                  /* non-compressible block */
274      }
275    }
276    cbytes = blosclz_compress(params.clevel, _tmp+j*neblock, neblock,
277                              dest, maxout);
278    if (cbytes >= maxout) {
279      /* Buffer overrun caused by blosclz_compress (should never happen) */
280      return -1;
281    }
282    else if (cbytes < 0) {
283      /* cbytes should never be negative */
284      return -2;
285    }
286    else if (cbytes == 0) {
287      /* The compressor has been unable to compress data significantly. */
288      /* Before doing the copy, check that we are not running into a
289         buffer overflow. */
290      if ((ntbytes+neblock) > maxbytes) {
291        return 0;    /* Non-compressible data */
292      }
293      memcpy(dest, _tmp+j*neblock, neblock);
294      cbytes = neblock;
295    }
296    ((int32_t *)(dest))[-1] = sw32(cbytes);
297    dest += cbytes;
298    ntbytes += cbytes;
299    ctbytes += cbytes;
300  }  /* Closes j < nsplits */
301
302  return ctbytes;
303}
304
305
306/* Decompress & unshuffle a single block */
307static int blosc_d(int32_t blocksize, int32_t leftoverblock,
308                   uint8_t *src, uint8_t *dest, uint8_t *tmp, uint8_t *tmp2)
309{
310  int32_t j, neblock, nsplits;
311  int32_t nbytes;                /* number of decompressed bytes in split */
312  int32_t cbytes;                /* number of compressed bytes in split */
313  int32_t ctbytes = 0;           /* number of compressed bytes in block */
314  int32_t ntbytes = 0;           /* number of uncompressed bytes in block */
315  uint8_t *_tmp;
316  int32_t typesize = params.typesize;
317
318  if ((params.flags & BLOSC_DOSHUFFLE) && (typesize > 1)) {
319    _tmp = tmp;
320  }
321  else {
322    _tmp = dest;
323  }
324
325  /* Compress for each shuffled slice split for this block. */
326  if ((typesize <= MAX_SPLITS) && (blocksize/typesize) >= MIN_BUFFERSIZE &&
327      (!leftoverblock)) {
328    nsplits = typesize;
329  }
330  else {
331    nsplits = 1;
332  }
333  neblock = blocksize / nsplits;
334  for (j = 0; j < nsplits; j++) {
335    cbytes = sw32(((int32_t *)(src))[0]);   /* amount of compressed bytes */
336    src += sizeof(int32_t);
337    ctbytes += (int32_t)sizeof(int32_t);
338    /* Uncompress */
339    if (cbytes == neblock) {
340      memcpy(_tmp, src, neblock);
341      nbytes = neblock;
342    }
343    else {
344      nbytes = blosclz_decompress(src, cbytes, _tmp, neblock);
345      if (nbytes != neblock) {
346        return -2;
347      }
348    }
349    src += cbytes;
350    ctbytes += cbytes;
351    _tmp += nbytes;
352    ntbytes += nbytes;
353  } /* Closes j < nsplits */
354
355  if ((params.flags & BLOSC_DOSHUFFLE) && (typesize > 1)) {
356    if ((uintptr_t)dest % 16 == 0) {
357      /* 16-bytes aligned dest.  SSE2 unshuffle will work. */
358      unshuffle(typesize, blocksize, tmp, dest);
359    }
360    else {
361      /* dest is not aligned.  Use tmp2, which is aligned, and copy. */
362      unshuffle(typesize, blocksize, tmp, tmp2);
363      if (tmp2 != dest) {
364        /* Copy only when dest is not tmp2 (e.g. not blosc_getitem())  */
365        memcpy(dest, tmp2, blocksize);
366      }
367    }
368  }
369
370  /* Return the number of uncompressed bytes */
371  return ntbytes;
372}
373
374
375/* Serial version for compression/decompression */
376static int serial_blosc(void)
377{
378  int32_t j, bsize, leftoverblock;
379  int32_t cbytes;
380  int32_t compress = params.compress;
381  int32_t blocksize = params.blocksize;
382  int32_t ntbytes = params.ntbytes;
383  int32_t flags = params.flags;
384  int32_t maxbytes = params.maxbytes;
385  int32_t nblocks = params.nblocks;
386  int32_t leftover = params.nbytes % params.blocksize;
387  int32_t *bstarts = params.bstarts;
388  uint8_t *src = params.src;
389  uint8_t *dest = params.dest;
390  uint8_t *tmp = params.tmp[0];     /* tmp for thread 0 */
391  uint8_t *tmp2 = params.tmp2[0];   /* tmp2 for thread 0 */
392
393  for (j = 0; j < nblocks; j++) {
394    if (compress && !(flags & BLOSC_MEMCPYED)) {
395      bstarts[j] = sw32(ntbytes);
396    }
397    bsize = blocksize;
398    leftoverblock = 0;
399    if ((j == nblocks - 1) && (leftover > 0)) {
400      bsize = leftover;
401      leftoverblock = 1;
402    }
403    if (compress) {
404      if (flags & BLOSC_MEMCPYED) {
405        /* We want to memcpy only */
406        memcpy(dest+BLOSC_MAX_OVERHEAD+j*blocksize, src+j*blocksize, bsize);
407        cbytes = bsize;
408      }
409      else {
410        /* Regular compression */
411        cbytes = blosc_c(bsize, leftoverblock, ntbytes, maxbytes,
412                         src+j*blocksize, dest+ntbytes, tmp);
413        if (cbytes == 0) {
414          ntbytes = 0;              /* uncompressible data */
415          break;
416        }
417      }
418    }
419    else {
420      if (flags & BLOSC_MEMCPYED) {
421        /* We want to memcpy only */
422        memcpy(dest+j*blocksize, src+BLOSC_MAX_OVERHEAD+j*blocksize, bsize);
423        cbytes = bsize;
424      }
425      else {
426        /* Regular decompression */
427        cbytes = blosc_d(bsize, leftoverblock,
428                         src+sw32(bstarts[j]), dest+j*blocksize, tmp, tmp2);
429      }
430    }
431    if (cbytes < 0) {
432      ntbytes = cbytes;         /* error in blosc_c or blosc_d */
433      break;
434    }
435    ntbytes += cbytes;
436  }
437
438  return ntbytes;
439}
440
441
442/* Threaded version for compression/decompression */
443static int parallel_blosc(void)
444{
445
446  /* Check whether we need to restart threads */
447  if (!init_threads_done || pid != getpid()) {
448    blosc_set_nthreads_(nthreads);
449  }
450
451  /* Synchronization point for all threads (wait for initialization) */
452  WAIT_INIT;
453  /* Synchronization point for all threads (wait for finalization) */
454  WAIT_FINISH;
455
456  if (giveup_code > 0) {
457    /* Return the total bytes (de-)compressed in threads */
458    return params.ntbytes;
459  }
460  else {
461    /* Compression/decompression gave up.  Return error code. */
462    return giveup_code;
463  }
464}
465
466
467/* Convenience functions for creating and releasing temporaries */
468static int create_temporaries(void)
469{
470  int32_t tid;
471  int32_t typesize = params.typesize;
472  int32_t blocksize = params.blocksize;
473  /* Extended blocksize for temporary destination.  Extended blocksize
474   is only useful for compression in parallel mode, but it doesn't
475   hurt serial mode either. */
476  int32_t ebsize = blocksize + typesize*(int32_t)sizeof(int32_t);
477
478  /* Create temporary area for each thread */
479  for (tid = 0; tid < nthreads; tid++) {
480    uint8_t *tmp = my_malloc(blocksize);
481    uint8_t *tmp2;
482    if (tmp == NULL) {
483      return -1;
484    }
485    params.tmp[tid] = tmp;
486    tmp2 = my_malloc(ebsize);
487    if (tmp2 == NULL) {
488      return -1;
489    }
490    params.tmp2[tid] = tmp2;
491  }
492
493  init_temps_done = 1;
494  /* Update params for current temporaries */
495  current_temp.nthreads = nthreads;
496  current_temp.typesize = typesize;
497  current_temp.blocksize = blocksize;
498  return 0;
499}
500
501
502static void release_temporaries(void)
503{
504  int32_t tid;
505
506  /* Release buffers */
507  for (tid = 0; tid < nthreads; tid++) {
508    my_free(params.tmp[tid]);
509    my_free(params.tmp2[tid]);
510  }
511
512  init_temps_done = 0;
513}
514
515
516/* Do the compression or decompression of the buffer depending on the
517   global params. */
518static int do_job(void)
519{
520  int32_t ntbytes;
521
522  /* Initialize/reset temporaries if needed */
523  if (!init_temps_done) {
524    int ret;
525    ret = create_temporaries();
526    if (ret < 0) {
527      return -1;
528    }
529  }
530  else if (current_temp.nthreads != nthreads ||
531           current_temp.typesize != params.typesize ||
532           current_temp.blocksize != params.blocksize) {
533    int ret;
534    release_temporaries();
535    ret = create_temporaries();
536    if (ret < 0) {
537      return -1;
538    }
539  }
540
541  /* Run the serial version when nthreads is 1 or when the buffers are
542     not much larger than blocksize */
543  if (nthreads == 1 || (params.nbytes / params.blocksize) <= 1) {
544    ntbytes = serial_blosc();
545  }
546  else {
547    ntbytes = parallel_blosc();
548  }
549
550  return ntbytes;
551}
552
553
554static int32_t compute_blocksize(int32_t clevel, int32_t typesize,
555                                 int32_t nbytes)
556{
557  int32_t blocksize;
558
559  /* Protection against very small buffers */
560  if (nbytes < (int32_t)typesize) {
561    return 1;
562  }
563
564  blocksize = nbytes;           /* Start by a whole buffer as blocksize */
565
566  if (force_blocksize) {
567    blocksize = force_blocksize;
568    /* Check that forced blocksize is not too small nor too large */
569    if (blocksize < MIN_BUFFERSIZE) {
570      blocksize = MIN_BUFFERSIZE;
571    }
572  }
573  else if (nbytes >= L1*4) {
574    blocksize = L1 * 4;
575    if (clevel == 0) {
576      blocksize /= 16;
577    }
578    else if (clevel <= 3) {
579      blocksize /= 8;
580    }
581    else if (clevel <= 5) {
582      blocksize /= 4;
583    }
584    else if (clevel <= 6) {
585      blocksize /= 2;
586    }
587    else if (clevel < 9) {
588      blocksize *= 1;
589    }
590    else {
591      blocksize *= 2;
592    }
593  }
594
595  /* Check that blocksize is not too large */
596  if (blocksize > (int32_t)nbytes) {
597    blocksize = nbytes;
598  }
599
600  /* blocksize must be a multiple of the typesize */
601  if (blocksize > typesize) {
602    blocksize = blocksize / typesize * typesize;
603  }
604
605  /* blocksize must not exceed (64 KB * typesize) in order to allow
606     BloscLZ to achieve better compression ratios (the ultimate reason
607     for this is that hash_log in BloscLZ cannot be larger than 15) */
608  if ((blocksize / typesize) > 64*KB) {
609    blocksize = 64 * KB * typesize;
610  }
611
612  return blocksize;
613}
614
615
616/* The public routine for compression.  See blosc.h for docstrings. */
617int blosc_compress(int clevel, int doshuffle, size_t typesize, size_t nbytes,
618      const void *src, void *dest, size_t destsize)
619{
620  uint8_t *_dest=NULL;         /* current pos for destination buffer */
621  uint8_t *flags;              /* flags for header.  Currently booked:
622                                  - 0: shuffled?
623                                  - 1: memcpy'ed? */
624  int32_t nbytes_;            /* number of bytes in source buffer */
625  int32_t nblocks;            /* number of total blocks in buffer */
626  int32_t leftover;           /* extra bytes at end of buffer */
627  int32_t *bstarts;           /* start pointers for each block */
628  int32_t blocksize;          /* length of the block in bytes */
629  int32_t ntbytes = 0;        /* the number of compressed bytes */
630  int32_t *ntbytes_;          /* placeholder for bytes in output buffer */
631  int32_t maxbytes = (int32_t)destsize;  /* maximum size for dest buffer */
632
633  /* Check buffer size limits */
634  if (nbytes > BLOSC_MAX_BUFFERSIZE) {
635    /* If buffer is too large, give up. */
636    fprintf(stderr, "Input buffer size cannot exceed %d bytes\n",
637            BLOSC_MAX_BUFFERSIZE);
638    return -1;
639  }
640
641  /* We can safely do this assignation now */
642  nbytes_ = (int32_t)nbytes;
643
644  /* Compression level */
645  if (clevel < 0 || clevel > 9) {
646    /* If clevel not in 0..9, print an error */
647    fprintf(stderr, "`clevel` parameter must be between 0 and 9!\n");
648    return -10;
649  }
650
651  /* Shuffle */
652  if (doshuffle != 0 && doshuffle != 1) {
653    fprintf(stderr, "`shuffle` parameter must be either 0 or 1!\n");
654    return -10;
655  }
656
657  /* Check typesize limits */
658  if (typesize > BLOSC_MAX_TYPESIZE) {
659    /* If typesize is too large, treat buffer as an 1-byte stream. */
660    typesize = 1;
661  }
662
663  /* Get the blocksize */
664  blocksize = compute_blocksize(clevel, (int32_t)typesize, nbytes_);
665
666  /* Compute number of blocks in buffer */
667  nblocks = nbytes_ / blocksize;
668  leftover = nbytes_ % blocksize;
669  nblocks = (leftover>0)? nblocks+1: nblocks;
670
671  _dest = (uint8_t *)(dest);
672  /* Write header for this block */
673  _dest[0] = BLOSC_VERSION_FORMAT;         /* blosc format version */
674  _dest[1] = BLOSCLZ_VERSION_FORMAT;       /* blosclz format version */
675  flags = _dest+2;                         /* flags */
676  _dest[2] = 0;                            /* zeroes flags */
677  _dest[3] = (uint8_t)typesize;            /* type size */
678  _dest += 4;
679  ((int32_t *)_dest)[0] = sw32(nbytes_);  /* size of the buffer */
680  ((int32_t *)_dest)[1] = sw32(blocksize);/* block size */
681  ntbytes_ = (int32_t *)(_dest+8);        /* compressed buffer size */
682  _dest += sizeof(int32_t)*3;
683  bstarts = (int32_t *)_dest;             /* starts for every block */
684  _dest += sizeof(int32_t)*nblocks;        /* space for pointers to blocks */
685  ntbytes = (int32_t)(_dest - (uint8_t *)dest);
686
687  if (clevel == 0) {
688    /* Compression level 0 means buffer to be memcpy'ed */
689    *flags |= BLOSC_MEMCPYED;
690  }
691
692  if (nbytes_ < MIN_BUFFERSIZE) {
693    /* Buffer is too small.  Try memcpy'ing. */
694    *flags |= BLOSC_MEMCPYED;
695  }
696
697  if (doshuffle == 1) {
698    /* Shuffle is active */
699    *flags |= BLOSC_DOSHUFFLE;              /* bit 0 set to one in flags */
700  }
701
702  /* Take global lock for the time of compression */
703  pthread_mutex_lock(&global_comp_mutex);
704  /* Populate parameters for compression routines */
705  params.compress = 1;
706  params.clevel = clevel;
707  params.flags = (int32_t)*flags;
708  params.typesize = (int32_t)typesize;
709  params.blocksize = blocksize;
710  params.ntbytes = ntbytes;
711  params.nbytes = nbytes_;
712  params.maxbytes = maxbytes;
713  params.nblocks = nblocks;
714  params.leftover = leftover;
715  params.bstarts = bstarts;
716  params.src = (uint8_t *)src;
717  params.dest = (uint8_t *)dest;
718
719  if (!(*flags & BLOSC_MEMCPYED)) {
720    /* Do the actual compression */
721    ntbytes = do_job();
722    if (ntbytes < 0) {
723      return -1;
724    }
725    if ((ntbytes == 0) && (nbytes_+BLOSC_MAX_OVERHEAD <= maxbytes)) {
726      /* Last chance for fitting `src` buffer in `dest`.  Update flags
727       and do a memcpy later on. */
728      *flags |= BLOSC_MEMCPYED;
729      params.flags |= BLOSC_MEMCPYED;
730    }
731  }
732
733  if (*flags & BLOSC_MEMCPYED) {
734    if (nbytes_+BLOSC_MAX_OVERHEAD > maxbytes) {
735      /* We are exceeding maximum output size */
736      ntbytes = 0;
737    }
738    else if (((nbytes_ % L1) == 0) || (nthreads > 1)) {
739      /* More effective with large buffers that are multiples of the
740       cache size or multi-cores */
741      params.ntbytes = BLOSC_MAX_OVERHEAD;
742      ntbytes = do_job();
743      if (ntbytes < 0) {
744        return -1;
745      }
746    }
747    else {
748      memcpy((uint8_t *)dest+BLOSC_MAX_OVERHEAD, src, nbytes_);
749      ntbytes = nbytes_ + BLOSC_MAX_OVERHEAD;
750    }
751  }
752
753  /* Set the number of compressed bytes in header */
754  *ntbytes_ = sw32(ntbytes);
755
756  /* Release global lock */
757  pthread_mutex_unlock(&global_comp_mutex);
758 
759  assert((int32_t)ntbytes <= (int32_t)maxbytes);
760  return ntbytes;
761}
762
763
764/* The public routine for decompression.  See blosc.h for docstrings. */
765int blosc_decompress(const void *src, void *dest, size_t destsize)
766{
767  uint8_t *_src=NULL;            /* current pos for source buffer */
768  uint8_t version, versionlz;    /* versions for compressed header */
769  uint8_t flags;                 /* flags for header */
770  int32_t ntbytes;               /* the number of uncompressed bytes */
771  int32_t nblocks;              /* number of total blocks in buffer */
772  int32_t leftover;             /* extra bytes at end of buffer */
773  int32_t *bstarts;             /* start pointers for each block */
774  int32_t typesize, blocksize, nbytes, ctbytes;
775
776  _src = (uint8_t *)(src);
777
778  /* Read the header block */
779  version = _src[0];                         /* blosc format version */
780  versionlz = _src[1];                       /* blosclz format version */
781  flags = _src[2];                           /* flags */
782  typesize = (int32_t)_src[3];              /* typesize */
783  _src += 4;
784  nbytes = sw32(((int32_t *)_src)[0]);      /* buffer size */
785  blocksize = sw32(((int32_t *)_src)[1]);   /* block size */
786  ctbytes = sw32(((int32_t *)_src)[2]);     /* compressed buffer size */
787
788  version += 0;                             /* shut up compiler warning */
789  versionlz += 0;                           /* shut up compiler warning */
790  ctbytes += 0;                             /* shut up compiler warning */
791
792  _src += sizeof(int32_t)*3;
793  bstarts = (int32_t *)_src;
794  /* Compute some params */
795  /* Total blocks */
796  nblocks = nbytes / blocksize;
797  leftover = nbytes % blocksize;
798  nblocks = (leftover>0)? nblocks+1: nblocks;
799  _src += sizeof(int32_t)*nblocks;
800
801  /* Check that we have enough space to decompress */
802  if (nbytes > (int32_t)destsize) {
803    return -1;
804  }
805
806  /* Take global lock for the time of decompression */
807  pthread_mutex_lock(&global_comp_mutex);
808 
809  /* Populate parameters for decompression routines */
810  params.compress = 0;
811  params.clevel = 0;            /* specific for compression */
812  params.flags = (int32_t)flags;
813  params.typesize = typesize;
814  params.blocksize = blocksize;
815  params.ntbytes = 0;
816  params.nbytes = nbytes;
817  params.nblocks = nblocks;
818  params.leftover = leftover;
819  params.bstarts = bstarts;
820  params.src = (uint8_t *)src;
821  params.dest = (uint8_t *)dest;
822
823  /* Check whether this buffer is memcpy'ed */
824  if (flags & BLOSC_MEMCPYED) {
825    if (((nbytes % L1) == 0) || (nthreads > 1)) {
826      /* More effective with large buffers that are multiples of the
827       cache size or multi-cores */
828      ntbytes = do_job();
829      if (ntbytes < 0) {
830        return -1;
831      }
832    }
833    else {
834      memcpy(dest, (uint8_t *)src+BLOSC_MAX_OVERHEAD, nbytes);
835      ntbytes = nbytes;
836    }
837  }
838  else {
839    /* Do the actual decompression */
840    ntbytes = do_job();
841    if (ntbytes < 0) {
842      return -1;
843    }
844  }
845  /* Release global lock */
846  pthread_mutex_unlock(&global_comp_mutex);
847 
848  assert(ntbytes <= (int32_t)destsize);
849  return ntbytes;
850}
851
852
853/* Specific routine optimized for decompression a small number of
854   items out of a compressed chunk.  This does not use threads because
855   it would affect negatively to performance. */
856int blosc_getitem(const void *src, int start, int nitems, void *dest)
857{
858  uint8_t *_src=NULL;               /* current pos for source buffer */
859  uint8_t version, versionlz;       /* versions for compressed header */
860  uint8_t flags;                    /* flags for header */
861  int32_t ntbytes = 0;              /* the number of uncompressed bytes */
862  int32_t nblocks;                 /* number of total blocks in buffer */
863  int32_t leftover;                /* extra bytes at end of buffer */
864  int32_t *bstarts;                /* start pointers for each block */
865  uint8_t *tmp = params.tmp[0];     /* tmp for thread 0 */
866  uint8_t *tmp2 = params.tmp2[0];   /* tmp2 for thread 0 */
867  int tmp_init = 0;
868  int32_t typesize, blocksize, nbytes, ctbytes;
869  int32_t j, bsize, bsize2, leftoverblock;
870  int32_t cbytes, startb, stopb;
871  int stop = start + nitems;
872
873  _src = (uint8_t *)(src);
874
875  /* Take global lock  */
876  pthread_mutex_lock(&global_comp_mutex);
877 
878  /* Read the header block */
879  version = _src[0];                         /* blosc format version */
880  versionlz = _src[1];                       /* blosclz format version */
881  flags = _src[2];                           /* flags */
882  typesize = (int32_t)_src[3];              /* typesize */
883  _src += 4;
884  nbytes = sw32(((int32_t *)_src)[0]);      /* buffer size */
885  blocksize = sw32(((int32_t *)_src)[1]);   /* block size */
886  ctbytes = sw32(((int32_t *)_src)[2]);     /* compressed buffer size */
887
888  version += 0;                             /* shut up compiler warning */
889  versionlz += 0;                           /* shut up compiler warning */
890  ctbytes += 0;                             /* shut up compiler warning */
891
892  _src += sizeof(int32_t)*3;
893  bstarts = (int32_t *)_src;
894  /* Compute some params */
895  /* Total blocks */
896  nblocks = nbytes / blocksize;
897  leftover = nbytes % blocksize;
898  nblocks = (leftover>0)? nblocks+1: nblocks;
899  _src += sizeof(int32_t)*nblocks;
900
901  /* Check region boundaries */
902  if ((start < 0) || (start*typesize > nbytes)) {
903    fprintf(stderr, "`start` out of bounds");
904    return (-1);
905  }
906
907  if ((stop < 0) || (stop*typesize > nbytes)) {
908    fprintf(stderr, "`start`+`nitems` out of bounds");
909    return (-1);
910  }
911
912  /* Parameters needed by blosc_d */
913  params.typesize = typesize;
914  params.flags = flags;
915
916  /* Initialize temporaries if needed */
917  if (tmp == NULL || tmp2 == NULL || current_temp.blocksize < blocksize) {
918    tmp = my_malloc(blocksize);
919    if (tmp == NULL) {
920      return -1;
921    }
922    tmp2 = my_malloc(blocksize);
923    if (tmp2 == NULL) {
924      return -1;
925    }
926    tmp_init = 1;
927  }
928
929  for (j = 0; j < nblocks; j++) {
930    bsize = blocksize;
931    leftoverblock = 0;
932    if ((j == nblocks - 1) && (leftover > 0)) {
933      bsize = leftover;
934      leftoverblock = 1;
935    }
936
937    /* Compute start & stop for each block */
938    startb = start * typesize - j * blocksize;
939    stopb = stop * typesize - j * blocksize;
940    if ((startb >= (int)blocksize) || (stopb <= 0)) {
941      continue;
942    }
943    if (startb < 0) {
944      startb = 0;
945    }
946    if (stopb > (int)blocksize) {
947      stopb = blocksize;
948    }
949    bsize2 = stopb - startb;
950
951    /* Do the actual data copy */
952    if (flags & BLOSC_MEMCPYED) {
953      /* We want to memcpy only */
954      memcpy((uint8_t *)dest + ntbytes,
955          (uint8_t *)src + BLOSC_MAX_OVERHEAD + j*blocksize + startb,
956             bsize2);
957      cbytes = bsize2;
958    }
959    else {
960      /* Regular decompression.  Put results in tmp2. */
961      cbytes = blosc_d(bsize, leftoverblock,
962                       (uint8_t *)src+sw32(bstarts[j]), tmp2, tmp, tmp2);
963      if (cbytes < 0) {
964        ntbytes = cbytes;
965        break;
966      }
967      /* Copy to destination */
968      memcpy((uint8_t *)dest + ntbytes, tmp2 + startb, bsize2);
969      cbytes = bsize2;
970    }
971    ntbytes += cbytes;
972  }
973 
974  /* Release global lock */
975  pthread_mutex_unlock(&global_comp_mutex);
976
977  if (tmp_init) {
978    my_free(tmp);
979    my_free(tmp2);
980  }
981
982  return ntbytes;
983}
984
985
986/* Decompress & unshuffle several blocks in a single thread */
987static int t_blosc(void *tids)
988{
989  int32_t tid = *(int32_t *)tids;
990  int32_t cbytes, ntdest;
991  int32_t tblocks;              /* number of blocks per thread */
992  int32_t leftover2;
993  int32_t tblock;               /* limit block on a thread */
994  int32_t nblock_;              /* private copy of nblock */
995  int32_t bsize, leftoverblock;
996  /* Parameters for threads */
997  int32_t blocksize;
998  int32_t ebsize;
999  int32_t compress;
1000  int32_t maxbytes;
1001  int32_t ntbytes;
1002  int32_t flags;
1003  int32_t nblocks;
1004  int32_t leftover;
1005  int32_t *bstarts;
1006  uint8_t *src;
1007  uint8_t *dest;
1008  uint8_t *tmp;
1009  uint8_t *tmp2;
1010
1011  while (1) {
1012
1013    init_sentinels_done = 0;     /* sentinels have to be initialised yet */
1014
1015    /* Synchronization point for all threads (wait for initialization) */
1016    WAIT_INIT;
1017
1018    /* Check if thread has been asked to return */
1019    if (end_threads) {
1020      return(0);
1021    }
1022
1023    pthread_mutex_lock(&count_mutex);
1024    if (!init_sentinels_done) {
1025      /* Set sentinels and other global variables */
1026      giveup_code = 1;            /* no error code initially */
1027      nblock = -1;                /* block counter */
1028      init_sentinels_done = 1;    /* sentinels have been initialised */
1029    }
1030    pthread_mutex_unlock(&count_mutex);
1031
1032    /* Get parameters for this thread before entering the main loop */
1033    blocksize = params.blocksize;
1034    ebsize = blocksize + params.typesize*(int32_t)sizeof(int32_t);
1035    compress = params.compress;
1036    flags = params.flags;
1037    maxbytes = params.maxbytes;
1038    nblocks = params.nblocks;
1039    leftover = params.leftover;
1040    bstarts = params.bstarts;
1041    src = params.src;
1042    dest = params.dest;
1043    tmp = params.tmp[tid];
1044    tmp2 = params.tmp2[tid];
1045
1046    ntbytes = 0;                /* only useful for decompression */
1047
1048    if (compress && !(flags & BLOSC_MEMCPYED)) {
1049      /* Compression always has to follow the block order */
1050      pthread_mutex_lock(&count_mutex);
1051      nblock++;
1052      nblock_ = nblock;
1053      pthread_mutex_unlock(&count_mutex);
1054      tblock = nblocks;
1055    }
1056    else {
1057      /* Decompression can happen using any order.  We choose
1058       sequential block order on each thread */
1059
1060      /* Blocks per thread */
1061      tblocks = nblocks / nthreads;
1062      leftover2 = nblocks % nthreads;
1063      tblocks = (leftover2>0)? tblocks+1: tblocks;
1064
1065      nblock_ = tid*tblocks;
1066      tblock = nblock_ + tblocks;
1067      if (tblock > nblocks) {
1068        tblock = nblocks;
1069      }
1070    }
1071
1072    /* Loop over blocks */
1073    leftoverblock = 0;
1074    while ((nblock_ < tblock) && giveup_code > 0) {
1075      bsize = blocksize;
1076      if (nblock_ == (nblocks - 1) && (leftover > 0)) {
1077        bsize = leftover;
1078        leftoverblock = 1;
1079      }
1080      if (compress) {
1081        if (flags & BLOSC_MEMCPYED) {
1082          /* We want to memcpy only */
1083          memcpy(dest+BLOSC_MAX_OVERHEAD+nblock_*blocksize,
1084                 src+nblock_*blocksize, bsize);
1085          cbytes = bsize;
1086        }
1087        else {
1088          /* Regular compression */
1089          cbytes = blosc_c(bsize, leftoverblock, 0, ebsize,
1090                           src+nblock_*blocksize, tmp2, tmp);
1091        }
1092      }
1093      else {
1094        if (flags & BLOSC_MEMCPYED) {
1095          /* We want to memcpy only */
1096          memcpy(dest+nblock_*blocksize,
1097                 src+BLOSC_MAX_OVERHEAD+nblock_*blocksize, bsize);
1098          cbytes = bsize;
1099        }
1100        else {
1101          cbytes = blosc_d(bsize, leftoverblock,
1102                           src+sw32(bstarts[nblock_]), dest+nblock_*blocksize,
1103                           tmp, tmp2);
1104        }
1105      }
1106
1107      /* Check whether current thread has to giveup */
1108      if (giveup_code <= 0) {
1109        break;
1110      }
1111
1112      /* Check results for the compressed/decompressed block */
1113      if (cbytes < 0) {            /* compr/decompr failure */
1114        /* Set giveup_code error */
1115        pthread_mutex_lock(&count_mutex);
1116        giveup_code = cbytes;
1117        pthread_mutex_unlock(&count_mutex);
1118        break;
1119      }
1120
1121      if (compress && !(flags & BLOSC_MEMCPYED)) {
1122        /* Start critical section */
1123        pthread_mutex_lock(&count_mutex);
1124        ntdest = params.ntbytes;
1125        bstarts[nblock_] = sw32(ntdest);    /* update block start counter */
1126        if ( (cbytes == 0) || (ntdest+cbytes > (int32_t)maxbytes) ) {
1127          giveup_code = 0;                  /* uncompressible buffer */
1128          pthread_mutex_unlock(&count_mutex);
1129          break;
1130        }
1131        nblock++;
1132        nblock_ = nblock;
1133        params.ntbytes += cbytes;           /* update return bytes counter */
1134        pthread_mutex_unlock(&count_mutex);
1135        /* End of critical section */
1136
1137        /* Copy the compressed buffer to destination */
1138        memcpy(dest+ntdest, tmp2, cbytes);
1139      }
1140      else {
1141        nblock_++;
1142        /* Update counter for this thread */
1143        ntbytes += cbytes;
1144      }
1145
1146    } /* closes while (nblock_) */
1147
1148    /* Sum up all the bytes decompressed */
1149    if ((!compress || (flags & BLOSC_MEMCPYED)) && giveup_code > 0) {
1150      /* Update global counter for all threads (decompression only) */
1151      pthread_mutex_lock(&count_mutex);
1152      params.ntbytes += ntbytes;
1153      pthread_mutex_unlock(&count_mutex);
1154    }
1155
1156    /* Meeting point for all threads (wait for finalization) */
1157    WAIT_FINISH;
1158
1159  }  /* closes while(1) */
1160
1161  /* This should never be reached, but anyway */
1162  return(0);
1163}
1164
1165
1166static int init_threads(void)
1167{
1168  int32_t tid;
1169  int rc2;
1170
1171  /* Initialize mutex and condition variable objects */
1172  pthread_mutex_init(&count_mutex, NULL);
1173
1174  /* Barrier initialization */
1175#ifdef _POSIX_BARRIERS_MINE
1176  pthread_barrier_init(&barr_init, NULL, nthreads+1);
1177  pthread_barrier_init(&barr_finish, NULL, nthreads+1);
1178#else
1179  pthread_mutex_init(&count_threads_mutex, NULL);
1180  pthread_cond_init(&count_threads_cv, NULL);
1181  count_threads = 0;      /* Reset threads counter */
1182#endif
1183
1184#if !defined(_WIN32)
1185  /* Initialize and set thread detached attribute */
1186  pthread_attr_init(&ct_attr);
1187  pthread_attr_setdetachstate(&ct_attr, PTHREAD_CREATE_JOINABLE);
1188#endif
1189
1190  /* Finally, create the threads in detached state */
1191  for (tid = 0; tid < nthreads; tid++) {
1192    tids[tid] = tid;
1193#if !defined(_WIN32)
1194    rc2 = pthread_create(&threads[tid], &ct_attr, (void*)t_blosc,
1195                        (void *)&tids[tid]);
1196#else
1197    rc2 = pthread_create(&threads[tid], NULL, (void*)t_blosc,
1198                        (void *)&tids[tid]);
1199#endif
1200    if (rc2) {
1201      fprintf(stderr, "ERROR; return code from pthread_create() is %d\n", rc2);
1202      fprintf(stderr, "\tError detail: %s\n", strerror(rc2));
1203      return(-1);
1204    }
1205  }
1206
1207  init_threads_done = 1;                 /* Initialization done! */
1208  pid = (int)getpid();                   /* save the PID for this process */
1209
1210  return(0);
1211}
1212
1213void blosc_init(void) {
1214  /* Init global lock  */
1215  pthread_mutex_init(&global_comp_mutex, NULL);
1216  init_lib = 1;
1217}
1218
1219int blosc_set_nthreads(int nthreads_new) 
1220{
1221  int ret;
1222
1223  /* Check if should initialize (implementing previous 1.2.3 behaviour,
1224     where calling blosc_set_nthreads was enough) */
1225  if (!init_lib) blosc_init();
1226
1227  /* Take global lock  */
1228  pthread_mutex_lock(&global_comp_mutex);
1229 
1230  ret = blosc_set_nthreads_(nthreads_new);
1231  /* Release global lock  */
1232  pthread_mutex_unlock(&global_comp_mutex);
1233 
1234  return ret;
1235}
1236
1237int blosc_set_nthreads_(int nthreads_new)
1238{
1239  int32_t nthreads_old = nthreads;
1240  int32_t t;
1241  int rc2;
1242  void *status;
1243
1244  if (nthreads_new > BLOSC_MAX_THREADS) {
1245    fprintf(stderr,
1246            "Error.  nthreads cannot be larger than BLOSC_MAX_THREADS (%d)",
1247            BLOSC_MAX_THREADS);
1248    return -1;
1249  }
1250  else if (nthreads_new <= 0) {
1251    fprintf(stderr, "Error.  nthreads must be a positive integer");
1252    return -1;
1253  }
1254
1255  /* Only join threads if they are not initialized or if our PID is
1256     different from that in pid var (probably means that we are a
1257     subprocess, and thus threads are non-existent). */
1258  if (nthreads > 1 && init_threads_done && pid == getpid()) {
1259      /* Tell all existing threads to finish */
1260      end_threads = 1;
1261      /* Synchronization point for all threads (wait for initialization) */
1262      WAIT_INIT;
1263      /* Join exiting threads */
1264      for (t=0; t<nthreads; t++) {
1265        rc2 = pthread_join(threads[t], &status);
1266        if (rc2) {
1267          fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc2);
1268          fprintf(stderr, "\tError detail: %s\n", strerror(rc2));
1269          return(-1);
1270        }
1271      }
1272      init_threads_done = 0;
1273      end_threads = 0;
1274    }
1275
1276  /* Launch a new pool of threads (if necessary) */
1277  nthreads = nthreads_new;
1278  if (nthreads > 1 && (!init_threads_done || pid != getpid())) {
1279    init_threads();
1280  }
1281
1282  return nthreads_old;
1283}
1284
1285
1286/* Free possible memory temporaries and thread resources */
1287int blosc_free_resources(void)
1288{
1289  int32_t t;
1290  int rc2;
1291  void *status;
1292 
1293   /* Take global lock  */
1294  pthread_mutex_lock(&global_comp_mutex);
1295
1296  /* Release temporaries */
1297  if (init_temps_done) {
1298    release_temporaries();
1299  }
1300
1301  /* Finish the possible thread pool */
1302  if (nthreads > 1 && init_threads_done) {
1303    /* Tell all existing threads to finish */
1304    end_threads = 1;
1305    /* Synchronization point for all threads (wait for initialization) */
1306    WAIT_INIT;
1307    /* Join exiting threads */
1308    for (t=0; t<nthreads; t++) {
1309      rc2 = pthread_join(threads[t], &status);
1310      if (rc2) {
1311        fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc2);
1312        fprintf(stderr, "\tError detail: %s\n", strerror(rc2));
1313        return(-1);
1314      }
1315    }
1316
1317    /* Release mutex and condition variable objects */
1318    pthread_mutex_destroy(&count_mutex);
1319
1320    /* Barriers */
1321#ifdef _POSIX_BARRIERS_MINE
1322    pthread_barrier_destroy(&barr_init);
1323    pthread_barrier_destroy(&barr_finish);
1324#else
1325    pthread_mutex_destroy(&count_threads_mutex);
1326    pthread_cond_destroy(&count_threads_cv);
1327#endif
1328
1329    /* Thread attributes */
1330#if !defined(_WIN32)
1331    pthread_attr_destroy(&ct_attr);
1332#endif
1333
1334    init_threads_done = 0;
1335    end_threads = 0;
1336  }
1337   /* Release global lock  */
1338  pthread_mutex_unlock(&global_comp_mutex);
1339  return(0);
1340
1341}
1342
1343void blosc_destroy(void) {
1344  /* Free the resources */
1345  blosc_free_resources();
1346  /* Destroy global lock */
1347  pthread_mutex_destroy(&global_comp_mutex);
1348}
1349
1350/* Return `nbytes`, `cbytes` and `blocksize` from a compressed buffer. */
1351void blosc_cbuffer_sizes(const void *cbuffer, size_t *nbytes,
1352                         size_t *cbytes, size_t *blocksize)
1353{
1354  uint8_t *_src = (uint8_t *)(cbuffer);    /* current pos for source buffer */
1355  uint8_t version, versionlz;              /* versions for compressed header */
1356
1357  /* Read the version info (could be useful in the future) */
1358  version = _src[0];                         /* blosc format version */
1359  versionlz = _src[1];                       /* blosclz format version */
1360
1361  version += 0;                             /* shut up compiler warning */
1362  versionlz += 0;                           /* shut up compiler warning */
1363
1364  /* Read the interesting values */
1365  _src += 4;
1366  *nbytes = (size_t)sw32(((int32_t *)_src)[0]);  /* uncompressed buffer size */
1367  *blocksize = (size_t)sw32(((int32_t *)_src)[1]);   /* block size */
1368  *cbytes = (size_t)sw32(((int32_t *)_src)[2]);  /* compressed buffer size */
1369}
1370
1371
1372/* Return `typesize` and `flags` from a compressed buffer. */
1373void blosc_cbuffer_metainfo(const void *cbuffer, size_t *typesize,
1374                            int *flags)
1375{
1376  uint8_t *_src = (uint8_t *)(cbuffer);  /* current pos for source buffer */
1377  uint8_t version, versionlz;            /* versions for compressed header */
1378
1379  /* Read the version info (could be useful in the future) */
1380  version = _src[0];                     /* blosc format version */
1381  versionlz = _src[1];                   /* blosclz format version */
1382
1383  version += 0;                             /* shut up compiler warning */
1384  versionlz += 0;                           /* shut up compiler warning */
1385
1386  /* Read the interesting values */
1387  *flags = (int)_src[2];                 /* flags */
1388  *typesize = (size_t)_src[3];           /* typesize */
1389}
1390
1391
1392/* Return version information from a compressed buffer. */
1393void blosc_cbuffer_versions(const void *cbuffer, int *version,
1394                            int *versionlz)
1395{
1396  uint8_t *_src = (uint8_t *)(cbuffer);  /* current pos for source buffer */
1397
1398  /* Read the version info */
1399  *version = (int)_src[0];             /* blosc format version */
1400  *versionlz = (int)_src[1];           /* blosclz format version */
1401}
1402
1403
1404/* Force the use of a specific blocksize.  If 0, an automatic
1405   blocksize will be used (the default). */
1406void blosc_set_blocksize(size_t size)
1407{
1408  /* Take global lock  */
1409  pthread_mutex_lock(&global_comp_mutex);
1410 
1411  force_blocksize = (int32_t)size;
1412 
1413   /* Release global lock  */
1414  pthread_mutex_unlock(&global_comp_mutex);
1415}
Note: See TracBrowser for help on using the repository browser.