Changeset 981e22c for thirdparty/blosc/blosc.c
- Timestamp:
- 08/26/16 19:35:26 (8 years ago)
- Branches:
- master, pympi
- Children:
- 8ebc79b
- Parents:
- cda87e9
- git-author:
- Hal Finkel <hfinkel@…> (08/26/16 19:35:26)
- git-committer:
- Hal Finkel <hfinkel@…> (08/26/16 19:35:26)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
thirdparty/blosc/blosc.c
r00587dc r981e22c 1 1 /********************************************************************* 2 Blosc - Blocked S uffling and Compression Library3 4 Author: Francesc Alted <f [email protected]>2 Blosc - Blocked Shuffling and Compression Library 3 4 Author: Francesc Alted <f[email protected]> 5 5 Creation date: 2009-05-20 6 6 … … 9 9 10 10 11 #include <stdio.h> 11 12 #include <stdlib.h> 12 #include < stdio.h>13 #include <errno.h> 13 14 #include <string.h> 14 15 #include <sys/types.h> 15 16 #include <sys/stat.h> 16 17 #include <assert.h> 18 #if defined(USING_CMAKE) 19 #include "config.h" 20 #endif /* USING_CMAKE */ 17 21 #include "blosc.h" 22 #include "shuffle.h" 18 23 #include "blosclz.h" 19 #include "shuffle.h" 24 #if defined(HAVE_LZ4) 25 #include "lz4.h" 26 #include "lz4hc.h" 27 #endif /* HAVE_LZ4 */ 28 #if defined(HAVE_SNAPPY) 29 #include "snappy-c.h" 30 #endif /* HAVE_SNAPPY */ 31 #if defined(HAVE_ZLIB) 32 #include "zlib.h" 33 #endif /* HAVE_ZLIB */ 34 #if defined(HAVE_ZSTD) 35 #include "zstd.h" 36 #endif /* HAVE_ZSTD */ 20 37 21 38 #if defined(_WIN32) && !defined(__MINGW32__) 22 39 #include <windows.h> 23 #include "win32/stdint-windows.h" 40 #include <malloc.h> 41 42 /* stdint.h only available in VS2010 (VC++ 16.0) and newer */ 43 #if defined(_MSC_VER) && _MSC_VER < 1600 44 #include "win32/stdint-windows.h" 45 #else 46 #include <stdint.h> 47 #endif 48 24 49 #include <process.h> 25 50 #define getpid _getpid … … 30 55 #endif /* _WIN32 */ 31 56 32 #if defined(_WIN32) 57 #if defined(_WIN32) && !defined(__GNUC__) 33 58 #include "win32/pthread.h" 34 59 #include "win32/pthread.c" … … 37 62 #endif 38 63 64 /* If C11 is supported, use it's built-in aligned allocation. */ 65 #if __STDC_VERSION__ >= 201112L 66 #include <stdalign.h> 67 #endif 68 39 69 40 70 /* Some useful units */ … … 50 80 /* The size of L1 cache. 32 KB is quite common nowadays. */ 51 81 #define L1 (32*KB) 52 53 /* Wrapped function to adjust the number of threads used by blosc */54 int blosc_set_nthreads_(int);55 56 /* Global variables for main logic */57 static int32_t init_temps_done = 0; /* temp for compr/decompr initialized? */58 static int32_t force_blocksize = 0; /* force the use of a blocksize? */59 static int pid = 0; /* the PID for this process */60 static int init_lib = 0; /* is library initalized? */61 62 /* Global variables for threads */63 static int32_t nthreads = 1; /* number of desired threads in pool */64 static int32_t init_threads_done = 0; /* pool of threads initialized? */65 static int32_t end_threads = 0; /* should exisiting threads end? */66 static int32_t init_sentinels_done = 0; /* sentinels initialized? */67 static int32_t giveup_code; /* error code when give up */68 static int32_t nblock; /* block counter */69 static pthread_t threads[BLOSC_MAX_THREADS]; /* opaque structure for threads */70 static int32_t tids[BLOSC_MAX_THREADS]; /* ID per each thread */71 #if !defined(_WIN32)72 static pthread_attr_t ct_attr; /* creation time attrs for threads */73 #endif74 82 75 83 /* Have problems using posix barriers when symbol value is 200112L */ … … 78 86 #define _POSIX_BARRIERS_MINE 79 87 #endif 80 81 88 /* Synchronization variables */ 82 static pthread_mutex_t count_mutex; 89 90 91 struct blosc_context { 92 int32_t compress; /* 1 if we are doing compression 0 if decompress */ 93 94 const uint8_t* src; 95 uint8_t* dest; /* The current pos in the destination buffer */ 96 uint8_t* header_flags; /* Flags for header. Currently booked: 97 - 0: byte-shuffled? 98 - 1: memcpy'ed? 99 - 2: bit-shuffled? */ 100 int32_t sourcesize; /* Number of bytes in source buffer (or uncompressed bytes in compressed file) */ 101 int32_t nblocks; /* Number of total blocks in buffer */ 102 int32_t leftover; /* Extra bytes at end of buffer */ 103 int32_t blocksize; /* Length of the block in bytes */ 104 int32_t typesize; /* Type size */ 105 int32_t num_output_bytes; /* Counter for the number of output bytes */ 106 int32_t destsize; /* Maximum size for destination buffer */ 107 uint8_t* bstarts; /* Start of the buffer past header info */ 108 int32_t compcode; /* Compressor code to use */ 109 int clevel; /* Compression level (1-9) */ 110 111 /* Threading */ 112 int32_t numthreads; 113 int32_t threads_started; 114 int32_t end_threads; 115 pthread_t threads[BLOSC_MAX_THREADS]; 116 int32_t tids[BLOSC_MAX_THREADS]; 117 pthread_mutex_t count_mutex; 118 #ifdef _POSIX_BARRIERS_MINE 119 pthread_barrier_t barr_init; 120 pthread_barrier_t barr_finish; 121 #else 122 int32_t count_threads; 123 pthread_mutex_t count_threads_mutex; 124 pthread_cond_t count_threads_cv; 125 #endif 126 #if !defined(_WIN32) 127 pthread_attr_t ct_attr; /* creation time attrs for threads */ 128 #endif 129 int32_t thread_giveup_code; /* error code when give up */ 130 int32_t thread_nblock; /* block counter */ 131 }; 132 133 struct thread_context { 134 struct blosc_context* parent_context; 135 int32_t tid; 136 uint8_t* tmp; 137 uint8_t* tmp2; 138 uint8_t* tmp3; 139 int32_t tmpblocksize; /* Used to keep track of how big the temporary buffers are */ 140 }; 141 142 /* Global context for non-contextual API */ 143 static struct blosc_context* g_global_context; 83 144 static pthread_mutex_t global_comp_mutex; 84 #ifdef _POSIX_BARRIERS_MINE 85 static pthread_barrier_t barr_init; 86 static pthread_barrier_t barr_finish; 87 #else 88 static int32_t count_threads; 89 static pthread_mutex_t count_threads_mutex; 90 static pthread_cond_t count_threads_cv; 91 #endif 92 93 94 /* Structure for parameters in (de-)compression threads */ 95 static struct thread_data { 96 int32_t typesize; 97 int32_t blocksize; 98 int32_t compress; 99 int32_t clevel; 100 int32_t flags; 101 int32_t memcpyed; 102 int32_t ntbytes; 103 int32_t nbytes; 104 int32_t maxbytes; 105 int32_t nblocks; 106 int32_t leftover; 107 int32_t *bstarts; /* start pointers for each block */ 108 uint8_t *src; 109 uint8_t *dest; 110 uint8_t *tmp[BLOSC_MAX_THREADS]; 111 uint8_t *tmp2[BLOSC_MAX_THREADS]; 112 } params; 113 114 115 /* Structure for parameters meant for keeping track of current temporaries */ 116 static struct temp_data { 117 int32_t nthreads; 118 int32_t typesize; 119 int32_t blocksize; 120 } current_temp; 121 145 static int32_t g_compressor = BLOSC_BLOSCLZ; /* the compressor to use by default */ 146 static int32_t g_threads = 1; 147 static int32_t g_force_blocksize = 0; 148 static int32_t g_initlib = 0; 149 150 151 152 /* Wrapped function to adjust the number of threads used by blosc */ 153 int blosc_set_nthreads_(struct blosc_context*); 154 155 /* Releases the global threadpool */ 156 int blosc_release_threadpool(struct blosc_context* context); 122 157 123 158 /* Macros for synchronization */ … … 125 160 /* Wait until all threads are initialized */ 126 161 #ifdef _POSIX_BARRIERS_MINE 127 static int rc; 128 #define WAIT_INIT \ 129 rc = pthread_barrier_wait(&barr_init); \ 162 #define WAIT_INIT(RET_VAL, CONTEXT_PTR) \ 163 rc = pthread_barrier_wait(&CONTEXT_PTR->barr_init); \ 130 164 if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { \ 131 printf("Could not wait on barrier (init) \n"); \132 return( -1);\165 printf("Could not wait on barrier (init): %d\n", rc); \ 166 return((RET_VAL)); \ 133 167 } 134 168 #else 135 #define WAIT_INIT \136 pthread_mutex_lock(& count_threads_mutex); \137 if ( count_threads < nthreads) { \138 count_threads++;\139 pthread_cond_wait(& count_threads_cv, &count_threads_mutex); \169 #define WAIT_INIT(RET_VAL, CONTEXT_PTR) \ 170 pthread_mutex_lock(&CONTEXT_PTR->count_threads_mutex); \ 171 if (CONTEXT_PTR->count_threads < CONTEXT_PTR->numthreads) { \ 172 CONTEXT_PTR->count_threads++; \ 173 pthread_cond_wait(&CONTEXT_PTR->count_threads_cv, &CONTEXT_PTR->count_threads_mutex); \ 140 174 } \ 141 175 else { \ 142 pthread_cond_broadcast(& count_threads_cv); \176 pthread_cond_broadcast(&CONTEXT_PTR->count_threads_cv); \ 143 177 } \ 144 pthread_mutex_unlock(& count_threads_mutex);178 pthread_mutex_unlock(&CONTEXT_PTR->count_threads_mutex); 145 179 #endif 146 180 147 181 /* Wait for all threads to finish */ 148 182 #ifdef _POSIX_BARRIERS_MINE 149 #define WAIT_FINISH \150 rc = pthread_barrier_wait(& barr_finish); \183 #define WAIT_FINISH(RET_VAL, CONTEXT_PTR) \ 184 rc = pthread_barrier_wait(&CONTEXT_PTR->barr_finish); \ 151 185 if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { \ 152 186 printf("Could not wait on barrier (finish)\n"); \ 153 return( -1);\187 return((RET_VAL)); \ 154 188 } 155 189 #else 156 #define WAIT_FINISH \157 pthread_mutex_lock(& count_threads_mutex); \158 if ( count_threads > 0) { \159 count_threads--; \160 pthread_cond_wait(& count_threads_cv, &count_threads_mutex); \190 #define WAIT_FINISH(RET_VAL, CONTEXT_PTR) \ 191 pthread_mutex_lock(&CONTEXT_PTR->count_threads_mutex); \ 192 if (CONTEXT_PTR->count_threads > 0) { \ 193 CONTEXT_PTR->count_threads--; \ 194 pthread_cond_wait(&CONTEXT_PTR->count_threads_cv, &CONTEXT_PTR->count_threads_mutex); \ 161 195 } \ 162 196 else { \ 163 pthread_cond_broadcast(& count_threads_cv); \197 pthread_cond_broadcast(&CONTEXT_PTR->count_threads_cv); \ 164 198 } \ 165 pthread_mutex_unlock(& count_threads_mutex);199 pthread_mutex_unlock(&CONTEXT_PTR->count_threads_mutex); 166 200 #endif 167 201 … … 173 207 int res = 0; 174 208 175 #if defined(_WIN32) 209 /* Do an alignment to 32 bytes because AVX2 is supported */ 210 #if _ISOC11_SOURCE 211 /* C11 aligned allocation. 'size' must be a multiple of the alignment. */ 212 block = aligned_alloc(32, size); 213 #elif defined(_WIN32) 176 214 /* A (void *) cast needed for avoiding a warning with MINGW :-/ */ 177 block = (void *)_aligned_malloc(size, 16);215 block = (void *)_aligned_malloc(size, 32); 178 216 #elif defined __APPLE__ 179 217 /* Mac OS X guarantees 16-byte alignment in small allocs */ … … 181 219 #elif _POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600 182 220 /* Platform does have an implementation of posix_memalign */ 183 res = posix_memalign(&block, 16, size);221 res = posix_memalign(&block, 32, size); 184 222 #else 185 223 block = malloc(size); … … 206 244 207 245 208 /* If `a` is little-endian, return it as-is. If not, return a copy, 209 with the endianness changed */ 210 static int32_t sw32(int32_t a) 211 { 212 int32_t tmp; 213 char *pa = (char *)&a; 214 char *ptmp = (char *)&tmp; 246 /* Copy 4 bytes from `*pa` to int32_t, changing endianness if necessary. */ 247 static int32_t sw32_(const uint8_t *pa) 248 { 249 int32_t idest; 250 uint8_t *dest = (uint8_t *)&idest; 215 251 int i = 1; /* for big/little endian detection */ 216 252 char *p = (char *)&i; … … 218 254 if (p[0] != 1) { 219 255 /* big endian */ 220 ptmp[0] = pa[3]; 221 ptmp[1] = pa[2]; 222 ptmp[2] = pa[1]; 223 ptmp[3] = pa[0]; 224 return tmp; 256 dest[0] = pa[3]; 257 dest[1] = pa[2]; 258 dest[2] = pa[1]; 259 dest[3] = pa[0]; 225 260 } 226 261 else { 227 262 /* little endian */ 228 return a; 229 } 230 } 231 263 dest[0] = pa[0]; 264 dest[1] = pa[1]; 265 dest[2] = pa[2]; 266 dest[3] = pa[3]; 267 } 268 return idest; 269 } 270 271 272 /* Copy 4 bytes from `*pa` to `*dest`, changing endianness if necessary. */ 273 static void _sw32(uint8_t* dest, int32_t a) 274 { 275 uint8_t *pa = (uint8_t *)&a; 276 int i = 1; /* for big/little endian detection */ 277 char *p = (char *)&i; 278 279 if (p[0] != 1) { 280 /* big endian */ 281 dest[0] = pa[3]; 282 dest[1] = pa[2]; 283 dest[2] = pa[1]; 284 dest[3] = pa[0]; 285 } 286 else { 287 /* little endian */ 288 dest[0] = pa[0]; 289 dest[1] = pa[1]; 290 dest[2] = pa[2]; 291 dest[3] = pa[3]; 292 } 293 } 294 295 296 /* 297 * Conversion routines between compressor and compression libraries 298 */ 299 300 /* Return the library code associated with the compressor name */ 301 static int compname_to_clibcode(const char *compname) 302 { 303 if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0) 304 return BLOSC_BLOSCLZ_LIB; 305 if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0) 306 return BLOSC_LZ4_LIB; 307 if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0) 308 return BLOSC_LZ4_LIB; 309 if (strcmp(compname, BLOSC_SNAPPY_COMPNAME) == 0) 310 return BLOSC_SNAPPY_LIB; 311 if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0) 312 return BLOSC_ZLIB_LIB; 313 if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0) 314 return BLOSC_ZSTD_LIB; 315 return -1; 316 } 317 318 /* Return the library name associated with the compressor code */ 319 static char *clibcode_to_clibname(int clibcode) 320 { 321 if (clibcode == BLOSC_BLOSCLZ_LIB) return BLOSC_BLOSCLZ_LIBNAME; 322 if (clibcode == BLOSC_LZ4_LIB) return BLOSC_LZ4_LIBNAME; 323 if (clibcode == BLOSC_SNAPPY_LIB) return BLOSC_SNAPPY_LIBNAME; 324 if (clibcode == BLOSC_ZLIB_LIB) return BLOSC_ZLIB_LIBNAME; 325 if (clibcode == BLOSC_ZSTD_LIB) return BLOSC_ZSTD_LIBNAME; 326 return NULL; /* should never happen */ 327 } 328 329 330 /* 331 * Conversion routines between compressor names and compressor codes 332 */ 333 334 /* Get the compressor name associated with the compressor code */ 335 int blosc_compcode_to_compname(int compcode, char **compname) 336 { 337 int code = -1; /* -1 means non-existent compressor code */ 338 char *name = NULL; 339 340 /* Map the compressor code */ 341 if (compcode == BLOSC_BLOSCLZ) 342 name = BLOSC_BLOSCLZ_COMPNAME; 343 else if (compcode == BLOSC_LZ4) 344 name = BLOSC_LZ4_COMPNAME; 345 else if (compcode == BLOSC_LZ4HC) 346 name = BLOSC_LZ4HC_COMPNAME; 347 else if (compcode == BLOSC_SNAPPY) 348 name = BLOSC_SNAPPY_COMPNAME; 349 else if (compcode == BLOSC_ZLIB) 350 name = BLOSC_ZLIB_COMPNAME; 351 else if (compcode == BLOSC_ZSTD) 352 name = BLOSC_ZSTD_COMPNAME; 353 354 *compname = name; 355 356 /* Guess if there is support for this code */ 357 if (compcode == BLOSC_BLOSCLZ) 358 code = BLOSC_BLOSCLZ; 359 #if defined(HAVE_LZ4) 360 else if (compcode == BLOSC_LZ4) 361 code = BLOSC_LZ4; 362 else if (compcode == BLOSC_LZ4HC) 363 code = BLOSC_LZ4HC; 364 #endif /* HAVE_LZ4 */ 365 #if defined(HAVE_SNAPPY) 366 else if (compcode == BLOSC_SNAPPY) 367 code = BLOSC_SNAPPY; 368 #endif /* HAVE_SNAPPY */ 369 #if defined(HAVE_ZLIB) 370 else if (compcode == BLOSC_ZLIB) 371 code = BLOSC_ZLIB; 372 #endif /* HAVE_ZLIB */ 373 #if defined(HAVE_ZSTD) 374 else if (compcode == BLOSC_ZSTD) 375 code = BLOSC_ZSTD; 376 #endif /* HAVE_ZSTD */ 377 378 return code; 379 } 380 381 /* Get the compressor code for the compressor name. -1 if it is not available */ 382 int blosc_compname_to_compcode(const char *compname) 383 { 384 int code = -1; /* -1 means non-existent compressor code */ 385 386 if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0) { 387 code = BLOSC_BLOSCLZ; 388 } 389 #if defined(HAVE_LZ4) 390 else if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0) { 391 code = BLOSC_LZ4; 392 } 393 else if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0) { 394 code = BLOSC_LZ4HC; 395 } 396 #endif /* HAVE_LZ4 */ 397 #if defined(HAVE_SNAPPY) 398 else if (strcmp(compname, BLOSC_SNAPPY_COMPNAME) == 0) { 399 code = BLOSC_SNAPPY; 400 } 401 #endif /* HAVE_SNAPPY */ 402 #if defined(HAVE_ZLIB) 403 else if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0) { 404 code = BLOSC_ZLIB; 405 } 406 #endif /* HAVE_ZLIB */ 407 #if defined(HAVE_ZSTD) 408 else if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0) { 409 code = BLOSC_ZSTD; 410 } 411 #endif /* HAVE_ZSTD */ 412 413 return code; 414 } 415 416 417 #if defined(HAVE_LZ4) 418 static int lz4_wrap_compress(const char* input, size_t input_length, 419 char* output, size_t maxout, int accel) 420 { 421 int cbytes; 422 cbytes = LZ4_compress_fast(input, output, (int)input_length, (int)maxout, 423 accel); 424 return cbytes; 425 } 426 427 static int lz4hc_wrap_compress(const char* input, size_t input_length, 428 char* output, size_t maxout, int clevel) 429 { 430 int cbytes; 431 if (input_length > (size_t)(2<<30)) 432 return -1; /* input larger than 1 GB is not supported */ 433 /* clevel for lz4hc goes up to 16, at least in LZ4 1.1.3 */ 434 cbytes = LZ4_compressHC2_limitedOutput(input, output, (int)input_length, 435 (int)maxout, clevel*2-1); 436 return cbytes; 437 } 438 439 static int lz4_wrap_decompress(const char* input, size_t compressed_length, 440 char* output, size_t maxout) 441 { 442 size_t cbytes; 443 cbytes = LZ4_decompress_fast(input, output, (int)maxout); 444 if (cbytes != compressed_length) { 445 return 0; 446 } 447 return (int)maxout; 448 } 449 450 #endif /* HAVE_LZ4 */ 451 452 #if defined(HAVE_SNAPPY) 453 static int snappy_wrap_compress(const char* input, size_t input_length, 454 char* output, size_t maxout) 455 { 456 snappy_status status; 457 size_t cl = maxout; 458 status = snappy_compress(input, input_length, output, &cl); 459 if (status != SNAPPY_OK){ 460 return 0; 461 } 462 return (int)cl; 463 } 464 465 static int snappy_wrap_decompress(const char* input, size_t compressed_length, 466 char* output, size_t maxout) 467 { 468 snappy_status status; 469 size_t ul = maxout; 470 status = snappy_uncompress(input, compressed_length, output, &ul); 471 if (status != SNAPPY_OK){ 472 return 0; 473 } 474 return (int)ul; 475 } 476 #endif /* HAVE_SNAPPY */ 477 478 #if defined(HAVE_ZLIB) 479 /* zlib is not very respectful with sharing name space with others. 480 Fortunately, its names do not collide with those already in blosc. */ 481 static int zlib_wrap_compress(const char* input, size_t input_length, 482 char* output, size_t maxout, int clevel) 483 { 484 int status; 485 uLongf cl = maxout; 486 status = compress2( 487 (Bytef*)output, &cl, (Bytef*)input, (uLong)input_length, clevel); 488 if (status != Z_OK){ 489 return 0; 490 } 491 return (int)cl; 492 } 493 494 static int zlib_wrap_decompress(const char* input, size_t compressed_length, 495 char* output, size_t maxout) 496 { 497 int status; 498 uLongf ul = maxout; 499 status = uncompress( 500 (Bytef*)output, &ul, (Bytef*)input, (uLong)compressed_length); 501 if (status != Z_OK){ 502 return 0; 503 } 504 return (int)ul; 505 } 506 #endif /* HAVE_ZLIB */ 507 508 #if defined(HAVE_ZSTD) 509 static int zstd_wrap_compress(const char* input, size_t input_length, 510 char* output, size_t maxout, int clevel) { 511 size_t code; 512 // clevel = (clevel < 9) ? clevel * 2 - 1 : ZSTD_maxCLevel(); // see zstd#254 513 clevel = (clevel < 9) ? clevel * 2 - 1 : 22; 514 code = ZSTD_compress( 515 (void*)output, maxout, (void*)input, input_length, clevel); 516 if (ZSTD_isError(code)) { 517 return 0; 518 } 519 return (int)code; 520 } 521 522 static int zstd_wrap_decompress(const char* input, size_t compressed_length, 523 char* output, size_t maxout) { 524 size_t code; 525 code = ZSTD_decompress( 526 (void*)output, maxout, (void*)input, compressed_length); 527 if (ZSTD_isError(code)) { 528 fprintf(stderr, "error decompressing with Zstd: %s \n", ZSTD_getErrorName(code)); 529 return 0; 530 } 531 return (int)code; 532 } 533 #endif /* HAVE_ZSTD */ 534 535 /* Compute acceleration for blosclz */ 536 static int get_accel(const struct blosc_context* context) { 537 int32_t clevel = context->clevel; 538 int32_t typesize = context->typesize; 539 540 if (clevel == 9) { 541 return 1; 542 } 543 if (context->compcode == BLOSC_BLOSCLZ) { 544 /* Compute the power of 2. See: 545 * http://www.exploringbinary.com/ten-ways-to-check-if-an-integer-is-a-power-of-two-in-c/ 546 */ 547 int32_t tspow2 = ((typesize != 0) && !(typesize & (typesize - 1))); 548 if (tspow2 && typesize < 32) { 549 return 32; 550 } 551 } 552 else if (context->compcode == BLOSC_LZ4) { 553 /* This acceleration setting based on discussions held in: 554 * https://groups.google.com/forum/#!topic/lz4c/zosy90P8MQw 555 */ 556 return (10 - clevel); 557 } 558 return 1; 559 } 232 560 233 561 /* Shuffle & compress a single block */ 234 static int blosc_c(int32_t blocksize, int32_t leftoverblock, 235 int32_t ntbytes, int32_t maxbytes, 236 uint8_t *src, uint8_t *dest, uint8_t *tmp) 562 static int blosc_c(const struct blosc_context* context, int32_t blocksize, 563 int32_t leftoverblock, int32_t ntbytes, int32_t maxbytes, 564 const uint8_t *src, uint8_t *dest, uint8_t *tmp, 565 uint8_t *tmp2) 237 566 { 238 567 int32_t j, neblock, nsplits; … … 240 569 int32_t ctbytes = 0; /* number of compressed bytes in block */ 241 570 int32_t maxout; 242 int32_t typesize = params.typesize; 243 uint8_t *_tmp; 244 245 if ((params.flags & BLOSC_DOSHUFFLE) && (typesize > 1)) { 246 /* Shuffle this block (this makes sense only if typesize > 1) */ 571 int32_t typesize = context->typesize; 572 const uint8_t *_tmp = src; 573 char *compname; 574 int accel; 575 int bscount; 576 577 if (*(context->header_flags) & BLOSC_DOSHUFFLE & (typesize > 1)) { 578 /* Byte shuffling only makes sense if typesize > 1 */ 247 579 shuffle(typesize, blocksize, src, tmp); 248 580 _tmp = tmp; 249 581 } 250 else { 251 _tmp = src; 252 } 582 /* We don't allow more than 1 filter at the same time (yet) */ 583 else if (*(context->header_flags) & BLOSC_DOBITSHUFFLE) { 584 bscount = bitshuffle(typesize, blocksize, src, tmp, tmp2); 585 if (bscount < 0) 586 return bscount; 587 _tmp = tmp; 588 } 589 590 /* Calculate acceleration for different compressors */ 591 accel = get_accel(context); 253 592 254 593 /* Compress for each shuffled slice split for this block. */ … … 268 607 ctbytes += (int32_t)sizeof(int32_t); 269 608 maxout = neblock; 609 #if defined(HAVE_SNAPPY) 610 if (context->compcode == BLOSC_SNAPPY) { 611 /* TODO perhaps refactor this to keep the value stashed somewhere */ 612 maxout = snappy_max_compressed_length(neblock); 613 } 614 #endif /* HAVE_SNAPPY */ 270 615 if (ntbytes+maxout > maxbytes) { 271 616 maxout = maxbytes - ntbytes; /* avoid buffer overrun */ … … 274 619 } 275 620 } 276 cbytes = blosclz_compress(params.clevel, _tmp+j*neblock, neblock, 277 dest, maxout); 278 if (cbytes >= maxout) { 279 /* Buffer overrun caused by blosclz_compress (should never happen) */ 621 if (context->compcode == BLOSC_BLOSCLZ) { 622 cbytes = blosclz_compress(context->clevel, _tmp+j*neblock, neblock, 623 dest, maxout, accel); 624 } 625 #if defined(HAVE_LZ4) 626 else if (context->compcode == BLOSC_LZ4) { 627 cbytes = lz4_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock, 628 (char *)dest, (size_t)maxout, accel); 629 } 630 else if (context->compcode == BLOSC_LZ4HC) { 631 cbytes = lz4hc_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock, 632 (char *)dest, (size_t)maxout, 633 context->clevel); 634 } 635 #endif /* HAVE_LZ4 */ 636 #if defined(HAVE_SNAPPY) 637 else if (context->compcode == BLOSC_SNAPPY) { 638 cbytes = snappy_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock, 639 (char *)dest, (size_t)maxout); 640 } 641 #endif /* HAVE_SNAPPY */ 642 #if defined(HAVE_ZLIB) 643 else if (context->compcode == BLOSC_ZLIB) { 644 cbytes = zlib_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock, 645 (char *)dest, (size_t)maxout, 646 context->clevel); 647 } 648 #endif /* HAVE_ZLIB */ 649 #if defined(HAVE_ZSTD) 650 else if (context->compcode == BLOSC_ZSTD) { 651 cbytes = zstd_wrap_compress((char*)_tmp + j * neblock, (size_t)neblock, 652 (char*)dest, (size_t)maxout, context->clevel); 653 } 654 #endif /* HAVE_ZSTD */ 655 656 else { 657 blosc_compcode_to_compname(context->compcode, &compname); 658 fprintf(stderr, "Blosc has not been compiled with '%s' ", compname); 659 fprintf(stderr, "compression support. Please use one having it."); 660 return -5; /* signals no compression support */ 661 } 662 663 if (cbytes > maxout) { 664 /* Buffer overrun caused by compression (should never happen) */ 280 665 return -1; 281 666 } … … 284 669 return -2; 285 670 } 286 else if (cbytes == 0 ) {287 /* The compressor has been unable to compress data significantly. */671 else if (cbytes == 0 || cbytes == neblock) { 672 /* The compressor has been unable to compress data at all. */ 288 673 /* Before doing the copy, check that we are not running into a 289 674 buffer overflow. */ … … 294 679 cbytes = neblock; 295 680 } 296 ((int32_t *)(dest))[-1] = sw32(cbytes);681 _sw32(dest - 4, cbytes); 297 682 dest += cbytes; 298 683 ntbytes += cbytes; … … 303 688 } 304 689 305 306 690 /* Decompress & unshuffle a single block */ 307 static int blosc_d( int32_t blocksize, int32_t leftoverblock,308 uint8_t *src, uint8_t *dest, uint8_t *tmp, uint8_t *tmp2)691 static int blosc_d(struct blosc_context* context, int32_t blocksize, int32_t leftoverblock, 692 const uint8_t *src, uint8_t *dest, uint8_t *tmp, uint8_t *tmp2) 309 693 { 310 694 int32_t j, neblock, nsplits; … … 313 697 int32_t ctbytes = 0; /* number of compressed bytes in block */ 314 698 int32_t ntbytes = 0; /* number of uncompressed bytes in block */ 315 uint8_t *_tmp; 316 int32_t typesize = params.typesize; 317 318 if ((params.flags & BLOSC_DOSHUFFLE) && (typesize > 1)) { 699 uint8_t *_tmp = dest; 700 int32_t typesize = context->typesize; 701 int32_t compformat; 702 char *compname; 703 int bscount; 704 705 if ((*(context->header_flags) & BLOSC_DOSHUFFLE & (typesize > 1)) || \ 706 (*(context->header_flags) & BLOSC_DOBITSHUFFLE)) { 319 707 _tmp = tmp; 320 708 } 321 else { 322 _tmp = dest; 323 } 709 710 compformat = (*(context->header_flags) & 0xe0) >> 5; 324 711 325 712 /* Compress for each shuffled slice split for this block. */ … … 333 720 neblock = blocksize / nsplits; 334 721 for (j = 0; j < nsplits; j++) { 335 cbytes = sw32 (((int32_t *)(src))[0]);/* amount of compressed bytes */722 cbytes = sw32_(src); /* amount of compressed bytes */ 336 723 src += sizeof(int32_t); 337 724 ctbytes += (int32_t)sizeof(int32_t); … … 342 729 } 343 730 else { 344 nbytes = blosclz_decompress(src, cbytes, _tmp, neblock); 731 if (compformat == BLOSC_BLOSCLZ_FORMAT) { 732 nbytes = blosclz_decompress(src, cbytes, _tmp, neblock); 733 } 734 #if defined(HAVE_LZ4) 735 else if (compformat == BLOSC_LZ4_FORMAT) { 736 nbytes = lz4_wrap_decompress((char *)src, (size_t)cbytes, 737 (char*)_tmp, (size_t)neblock); 738 } 739 #endif /* HAVE_LZ4 */ 740 #if defined(HAVE_SNAPPY) 741 else if (compformat == BLOSC_SNAPPY_FORMAT) { 742 nbytes = snappy_wrap_decompress((char *)src, (size_t)cbytes, 743 (char*)_tmp, (size_t)neblock); 744 } 745 #endif /* HAVE_SNAPPY */ 746 #if defined(HAVE_ZLIB) 747 else if (compformat == BLOSC_ZLIB_FORMAT) { 748 nbytes = zlib_wrap_decompress((char *)src, (size_t)cbytes, 749 (char*)_tmp, (size_t)neblock); 750 } 751 #endif /* HAVE_ZLIB */ 752 #if defined(HAVE_ZSTD) 753 else if (compformat == BLOSC_ZSTD_FORMAT) { 754 nbytes = zstd_wrap_decompress((char*)src, (size_t)cbytes, 755 (char*)_tmp, (size_t)neblock); 756 } 757 #endif /* HAVE_ZSTD */ 758 else { 759 compname = clibcode_to_clibname(compformat); 760 fprintf(stderr, 761 "Blosc has not been compiled with decompression " 762 "support for '%s' format. ", compname); 763 fprintf(stderr, "Please recompile for adding this support.\n"); 764 return -5; /* signals no decompression support */ 765 } 766 767 /* Check that decompressed bytes number is correct */ 345 768 if (nbytes != neblock) { 346 return -2; 347 } 769 return -2; 770 } 771 348 772 } 349 773 src += cbytes; … … 353 777 } /* Closes j < nsplits */ 354 778 355 if ((params.flags & BLOSC_DOSHUFFLE) && (typesize > 1)) { 356 if ((uintptr_t)dest % 16 == 0) { 357 /* 16-bytes aligned dest. SSE2 unshuffle will work. */ 358 unshuffle(typesize, blocksize, tmp, dest); 359 } 360 else { 361 /* dest is not aligned. Use tmp2, which is aligned, and copy. */ 362 unshuffle(typesize, blocksize, tmp, tmp2); 363 if (tmp2 != dest) { 364 /* Copy only when dest is not tmp2 (e.g. not blosc_getitem()) */ 365 memcpy(dest, tmp2, blocksize); 366 } 367 } 779 if (*(context->header_flags) & BLOSC_DOSHUFFLE & (typesize > 1)) { 780 unshuffle(typesize, blocksize, tmp, dest); 781 } 782 else if (*(context->header_flags) & BLOSC_DOBITSHUFFLE) { 783 bscount = bitunshuffle(typesize, blocksize, tmp, dest, tmp2); 784 if (bscount < 0) 785 return bscount; 368 786 } 369 787 … … 374 792 375 793 /* Serial version for compression/decompression */ 376 static int serial_blosc( void)794 static int serial_blosc(struct blosc_context* context) 377 795 { 378 796 int32_t j, bsize, leftoverblock; 379 797 int32_t cbytes; 380 int32_t compress = params.compress; 381 int32_t blocksize = params.blocksize; 382 int32_t ntbytes = params.ntbytes; 383 int32_t flags = params.flags; 384 int32_t maxbytes = params.maxbytes; 385 int32_t nblocks = params.nblocks; 386 int32_t leftover = params.nbytes % params.blocksize; 387 int32_t *bstarts = params.bstarts; 388 uint8_t *src = params.src; 389 uint8_t *dest = params.dest; 390 uint8_t *tmp = params.tmp[0]; /* tmp for thread 0 */ 391 uint8_t *tmp2 = params.tmp2[0]; /* tmp2 for thread 0 */ 392 393 for (j = 0; j < nblocks; j++) { 394 if (compress && !(flags & BLOSC_MEMCPYED)) { 395 bstarts[j] = sw32(ntbytes); 396 } 397 bsize = blocksize; 798 799 int32_t ebsize = context->blocksize + context->typesize * (int32_t)sizeof(int32_t); 800 int32_t ntbytes = context->num_output_bytes; 801 802 uint8_t *tmp = my_malloc(context->blocksize + ebsize); 803 uint8_t *tmp2 = tmp + context->blocksize; 804 805 for (j = 0; j < context->nblocks; j++) { 806 if (context->compress && !(*(context->header_flags) & BLOSC_MEMCPYED)) { 807 _sw32(context->bstarts + j * 4, ntbytes); 808 } 809 bsize = context->blocksize; 398 810 leftoverblock = 0; 399 if ((j == nblocks - 1) && (leftover > 0)) {400 bsize = leftover;811 if ((j == context->nblocks - 1) && (context->leftover > 0)) { 812 bsize = context->leftover; 401 813 leftoverblock = 1; 402 814 } 403 if (co mpress) {404 if ( flags& BLOSC_MEMCPYED) {815 if (context->compress) { 816 if (*(context->header_flags) & BLOSC_MEMCPYED) { 405 817 /* We want to memcpy only */ 406 memcpy(dest+BLOSC_MAX_OVERHEAD+j*blocksize, src+j*blocksize, bsize); 818 memcpy(context->dest+BLOSC_MAX_OVERHEAD+j*context->blocksize, 819 context->src+j*context->blocksize, 820 bsize); 407 821 cbytes = bsize; 408 822 } 409 823 else { 410 824 /* Regular compression */ 411 cbytes = blosc_c(bsize, leftoverblock, ntbytes, maxbytes, 412 src+j*blocksize, dest+ntbytes, tmp); 825 cbytes = blosc_c(context, bsize, leftoverblock, ntbytes, 826 context->destsize, context->src+j*context->blocksize, 827 context->dest+ntbytes, tmp, tmp2); 413 828 if (cbytes == 0) { 414 829 ntbytes = 0; /* uncompressible data */ … … 418 833 } 419 834 else { 420 if ( flags& BLOSC_MEMCPYED) {835 if (*(context->header_flags) & BLOSC_MEMCPYED) { 421 836 /* We want to memcpy only */ 422 memcpy(dest+j*blocksize, src+BLOSC_MAX_OVERHEAD+j*blocksize, bsize); 837 memcpy(context->dest+j*context->blocksize, 838 context->src+BLOSC_MAX_OVERHEAD+j*context->blocksize, 839 bsize); 423 840 cbytes = bsize; 424 841 } 425 842 else { 426 843 /* Regular decompression */ 427 cbytes = blosc_d(bsize, leftoverblock, 428 src+sw32(bstarts[j]), dest+j*blocksize, tmp, tmp2); 844 cbytes = blosc_d(context, bsize, leftoverblock, 845 context->src + sw32_(context->bstarts + j * 4), 846 context->dest+j*context->blocksize, tmp, tmp2); 429 847 } 430 848 } … … 436 854 } 437 855 856 // Free temporaries 857 my_free(tmp); 858 438 859 return ntbytes; 439 860 } … … 441 862 442 863 /* Threaded version for compression/decompression */ 443 static int parallel_blosc(void) 444 { 864 static int parallel_blosc(struct blosc_context* context) 865 { 866 int rc; 445 867 446 868 /* Check whether we need to restart threads */ 447 if (!init_threads_done || pid != getpid()) { 448 blosc_set_nthreads_(nthreads); 449 } 869 blosc_set_nthreads_(context); 870 871 /* Set sentinels */ 872 context->thread_giveup_code = 1; 873 context->thread_nblock = -1; 450 874 451 875 /* Synchronization point for all threads (wait for initialization) */ 452 WAIT_INIT; 876 WAIT_INIT(-1, context); 877 453 878 /* Synchronization point for all threads (wait for finalization) */ 454 WAIT_FINISH ;455 456 if ( giveup_code > 0) {879 WAIT_FINISH(-1, context); 880 881 if (context->thread_giveup_code > 0) { 457 882 /* Return the total bytes (de-)compressed in threads */ 458 return params.ntbytes;883 return context->num_output_bytes; 459 884 } 460 885 else { 461 886 /* Compression/decompression gave up. Return error code. */ 462 return giveup_code; 463 } 464 } 465 466 467 /* Convenience functions for creating and releasing temporaries */ 468 static int create_temporaries(void) 469 { 470 int32_t tid; 471 int32_t typesize = params.typesize; 472 int32_t blocksize = params.blocksize; 473 /* Extended blocksize for temporary destination. Extended blocksize 474 is only useful for compression in parallel mode, but it doesn't 475 hurt serial mode either. */ 476 int32_t ebsize = blocksize + typesize*(int32_t)sizeof(int32_t); 477 478 /* Create temporary area for each thread */ 479 for (tid = 0; tid < nthreads; tid++) { 480 uint8_t *tmp = my_malloc(blocksize); 481 uint8_t *tmp2; 482 if (tmp == NULL) { 483 return -1; 484 } 485 params.tmp[tid] = tmp; 486 tmp2 = my_malloc(ebsize); 487 if (tmp2 == NULL) { 488 return -1; 489 } 490 params.tmp2[tid] = tmp2; 491 } 492 493 init_temps_done = 1; 494 /* Update params for current temporaries */ 495 current_temp.nthreads = nthreads; 496 current_temp.typesize = typesize; 497 current_temp.blocksize = blocksize; 498 return 0; 499 } 500 501 502 static void release_temporaries(void) 503 { 504 int32_t tid; 505 506 /* Release buffers */ 507 for (tid = 0; tid < nthreads; tid++) { 508 my_free(params.tmp[tid]); 509 my_free(params.tmp2[tid]); 510 } 511 512 init_temps_done = 0; 887 return context->thread_giveup_code; 888 } 513 889 } 514 890 … … 516 892 /* Do the compression or decompression of the buffer depending on the 517 893 global params. */ 518 static int do_job( void)894 static int do_job(struct blosc_context* context) 519 895 { 520 896 int32_t ntbytes; 521 522 /* Initialize/reset temporaries if needed */523 if (!init_temps_done) {524 int ret;525 ret = create_temporaries();526 if (ret < 0) {527 return -1;528 }529 }530 else if (current_temp.nthreads != nthreads ||531 current_temp.typesize != params.typesize ||532 current_temp.blocksize != params.blocksize) {533 int ret;534 release_temporaries();535 ret = create_temporaries();536 if (ret < 0) {537 return -1;538 }539 }540 897 541 898 /* Run the serial version when nthreads is 1 or when the buffers are 542 899 not much larger than blocksize */ 543 if ( nthreads == 1 || (params.nbytes / params.blocksize) <= 1) {544 ntbytes = serial_blosc( );900 if (context->numthreads == 1 || (context->sourcesize / context->blocksize) <= 1) { 901 ntbytes = serial_blosc(context); 545 902 } 546 903 else { 547 ntbytes = parallel_blosc( );904 ntbytes = parallel_blosc(context); 548 905 } 549 906 … … 552 909 553 910 554 static int32_t compute_blocksize(int32_t clevel, int32_t typesize, 555 int32_t nbytes) 911 static int32_t compute_blocksize(struct blosc_context* context, int32_t clevel, 912 int32_t typesize, int32_t nbytes, 913 int32_t forced_blocksize) 556 914 { 557 915 int32_t blocksize; … … 564 922 blocksize = nbytes; /* Start by a whole buffer as blocksize */ 565 923 566 if (force _blocksize) {567 blocksize = force _blocksize;568 /* Check that forced blocksize is not too small nor too large*/924 if (forced_blocksize) { 925 blocksize = forced_blocksize; 926 /* Check that forced blocksize is not too small */ 569 927 if (blocksize < MIN_BUFFERSIZE) { 570 928 blocksize = MIN_BUFFERSIZE; 571 929 } 572 930 } 573 else if (nbytes >= L1*4) { 574 blocksize = L1 * 4; 931 else if (nbytes >= L1) { 932 blocksize = L1; 933 934 /* For LZ4HC, increase the block sizes by a factor of 8 because it 935 is meant for compressing large blocks (it shows a big overhead 936 when compressing small ones). */ 937 if (context->compcode == BLOSC_LZ4HC) { 938 blocksize *= 8; 939 } 940 941 /* For Zlib, increase the block sizes by a factor of 8 because it 942 is meant for compressing large blocks (it shows a big overhead 943 when compressing small ones). */ 944 if (context->compcode == BLOSC_ZLIB) { 945 blocksize *= 8; 946 } 947 948 /* For Zstd, increase the block sizes by a factor of 8 because it 949 is meant for compressing large blocks (it shows a big overhead 950 when compressing small ones). */ 951 if (context->compcode == BLOSC_ZSTD) { 952 blocksize *= 8; 953 } 954 575 955 if (clevel == 0) { 576 blocksize /= 16;956 blocksize /= 4; 577 957 } 578 958 else if (clevel <= 3) { 579 blocksize /= 8;959 blocksize /= 2; 580 960 } 581 961 else if (clevel <= 5) { 582 blocksize /= 4;962 blocksize *= 1; 583 963 } 584 964 else if (clevel <= 6) { 585 blocksize /= 2;965 blocksize *= 2; 586 966 } 587 967 else if (clevel < 9) { 588 blocksize *= 1;968 blocksize *= 4; 589 969 } 590 970 else { 591 blocksize *= 2;971 blocksize *= 16; 592 972 } 593 973 } … … 598 978 } 599 979 600 /* blocksize mustbe a multiple of the typesize */980 /* blocksize *must absolutely* be a multiple of the typesize */ 601 981 if (blocksize > typesize) { 602 982 blocksize = blocksize / typesize * typesize; 603 983 } 604 984 605 /* blocksize must not exceed (64 KB * typesize) in order to allow606 BloscLZ to achieve better compression ratios (the ultimate reason607 for this is that hash_log in BloscLZ cannot be larger than 15) */608 if ((blocksize / typesize) > 64*KB) {609 blocksize = 64 * KB * typesize;610 }611 612 985 return blocksize; 613 986 } 614 987 615 616 /* The public routine for compression. See blosc.h for docstrings. */ 617 int blosc_compress(int clevel, int doshuffle, size_t typesize, size_t nbytes, 618 const void *src, void *dest, size_t destsize) 619 { 620 uint8_t *_dest=NULL; /* current pos for destination buffer */ 621 uint8_t *flags; /* flags for header. Currently booked: 622 - 0: shuffled? 623 - 1: memcpy'ed? */ 624 int32_t nbytes_; /* number of bytes in source buffer */ 625 int32_t nblocks; /* number of total blocks in buffer */ 626 int32_t leftover; /* extra bytes at end of buffer */ 627 int32_t *bstarts; /* start pointers for each block */ 628 int32_t blocksize; /* length of the block in bytes */ 629 int32_t ntbytes = 0; /* the number of compressed bytes */ 630 int32_t *ntbytes_; /* placeholder for bytes in output buffer */ 631 int32_t maxbytes = (int32_t)destsize; /* maximum size for dest buffer */ 988 static int initialize_context_compression(struct blosc_context* context, 989 int clevel, 990 int doshuffle, 991 size_t typesize, 992 size_t sourcesize, 993 const void* src, 994 void* dest, 995 size_t destsize, 996 int32_t compressor, 997 int32_t blocksize, 998 int32_t numthreads) 999 { 1000 /* Set parameters */ 1001 context->compress = 1; 1002 context->src = (const uint8_t*)src; 1003 context->dest = (uint8_t *)(dest); 1004 context->num_output_bytes = 0; 1005 context->destsize = (int32_t)destsize; 1006 context->sourcesize = sourcesize; 1007 context->typesize = typesize; 1008 context->compcode = compressor; 1009 context->numthreads = numthreads; 1010 context->end_threads = 0; 1011 context->clevel = clevel; 632 1012 633 1013 /* Check buffer size limits */ 634 if ( nbytes> BLOSC_MAX_BUFFERSIZE) {1014 if (sourcesize > BLOSC_MAX_BUFFERSIZE) { 635 1015 /* If buffer is too large, give up. */ 636 1016 fprintf(stderr, "Input buffer size cannot exceed %d bytes\n", … … 638 1018 return -1; 639 1019 } 640 641 /* We can safely do this assignation now */642 nbytes_ = (int32_t)nbytes;643 1020 644 1021 /* Compression level */ … … 650 1027 651 1028 /* Shuffle */ 652 if (doshuffle != 0 && doshuffle != 1 ) {653 fprintf(stderr, "`shuffle` parameter must be either 0 or 1!\n");1029 if (doshuffle != 0 && doshuffle != 1 && doshuffle != 2) { 1030 fprintf(stderr, "`shuffle` parameter must be either 0, 1 or 2!\n"); 654 1031 return -10; 655 1032 } 656 1033 657 1034 /* Check typesize limits */ 658 if ( typesize > BLOSC_MAX_TYPESIZE) {1035 if (context->typesize > BLOSC_MAX_TYPESIZE) { 659 1036 /* If typesize is too large, treat buffer as an 1-byte stream. */ 660 typesize = 1;1037 context->typesize = 1; 661 1038 } 662 1039 663 1040 /* Get the blocksize */ 664 blocksize = compute_blocksize(clevel, (int32_t)typesize, nbytes_);1041 context->blocksize = compute_blocksize(context, clevel, (int32_t)context->typesize, context->sourcesize, blocksize); 665 1042 666 1043 /* Compute number of blocks in buffer */ 667 nblocks = nbytes_ / blocksize; 668 leftover = nbytes_ % blocksize; 669 nblocks = (leftover>0)? nblocks+1: nblocks; 670 671 _dest = (uint8_t *)(dest); 672 /* Write header for this block */ 673 _dest[0] = BLOSC_VERSION_FORMAT; /* blosc format version */ 674 _dest[1] = BLOSCLZ_VERSION_FORMAT; /* blosclz format version */ 675 flags = _dest+2; /* flags */ 676 _dest[2] = 0; /* zeroes flags */ 677 _dest[3] = (uint8_t)typesize; /* type size */ 678 _dest += 4; 679 ((int32_t *)_dest)[0] = sw32(nbytes_); /* size of the buffer */ 680 ((int32_t *)_dest)[1] = sw32(blocksize);/* block size */ 681 ntbytes_ = (int32_t *)(_dest+8); /* compressed buffer size */ 682 _dest += sizeof(int32_t)*3; 683 bstarts = (int32_t *)_dest; /* starts for every block */ 684 _dest += sizeof(int32_t)*nblocks; /* space for pointers to blocks */ 685 ntbytes = (int32_t)(_dest - (uint8_t *)dest); 686 687 if (clevel == 0) { 1044 context->nblocks = context->sourcesize / context->blocksize; 1045 context->leftover = context->sourcesize % context->blocksize; 1046 context->nblocks = (context->leftover > 0) ? (context->nblocks + 1) : context->nblocks; 1047 1048 return 1; 1049 } 1050 1051 static int write_compression_header(struct blosc_context* context, int clevel, int doshuffle) 1052 { 1053 int32_t compformat; 1054 1055 /* Write version header for this block */ 1056 context->dest[0] = BLOSC_VERSION_FORMAT; /* blosc format version */ 1057 1058 /* Write compressor format */ 1059 compformat = -1; 1060 switch (context->compcode) 1061 { 1062 case BLOSC_BLOSCLZ: 1063 compformat = BLOSC_BLOSCLZ_FORMAT; 1064 context->dest[1] = BLOSC_BLOSCLZ_VERSION_FORMAT; /* blosclz format version */ 1065 break; 1066 1067 #if defined(HAVE_LZ4) 1068 case BLOSC_LZ4: 1069 compformat = BLOSC_LZ4_FORMAT; 1070 context->dest[1] = BLOSC_LZ4_VERSION_FORMAT; /* lz4 format version */ 1071 break; 1072 case BLOSC_LZ4HC: 1073 compformat = BLOSC_LZ4HC_FORMAT; 1074 context->dest[1] = BLOSC_LZ4HC_VERSION_FORMAT; /* lz4hc is the same as lz4 */ 1075 break; 1076 #endif /* HAVE_LZ4 */ 1077 1078 #if defined(HAVE_SNAPPY) 1079 case BLOSC_SNAPPY: 1080 compformat = BLOSC_SNAPPY_FORMAT; 1081 context->dest[1] = BLOSC_SNAPPY_VERSION_FORMAT; /* snappy format version */ 1082 break; 1083 #endif /* HAVE_SNAPPY */ 1084 1085 #if defined(HAVE_ZLIB) 1086 case BLOSC_ZLIB: 1087 compformat = BLOSC_ZLIB_FORMAT; 1088 context->dest[1] = BLOSC_ZLIB_VERSION_FORMAT; /* zlib format version */ 1089 break; 1090 #endif /* HAVE_ZLIB */ 1091 1092 #if defined(HAVE_ZSTD) 1093 case BLOSC_ZSTD: 1094 compformat = BLOSC_ZSTD_FORMAT; 1095 context->dest[1] = BLOSC_ZSTD_VERSION_FORMAT; /* zstd format version */ 1096 break; 1097 #endif /* HAVE_ZSTD */ 1098 1099 default: 1100 { 1101 char *compname; 1102 compname = clibcode_to_clibname(compformat); 1103 fprintf(stderr, "Blosc has not been compiled with '%s' ", compname); 1104 fprintf(stderr, "compression support. Please use one having it."); 1105 return -5; /* signals no compression support */ 1106 break; 1107 } 1108 } 1109 1110 context->header_flags = context->dest+2; /* flags */ 1111 context->dest[2] = 0; /* zeroes flags */ 1112 context->dest[3] = (uint8_t)context->typesize; /* type size */ 1113 _sw32(context->dest + 4, context->sourcesize); /* size of the buffer */ 1114 _sw32(context->dest + 8, context->blocksize); /* block size */ 1115 context->bstarts = context->dest + 16; /* starts for every block */ 1116 context->num_output_bytes = 16 + sizeof(int32_t)*context->nblocks; /* space for header and pointers */ 1117 1118 if (context->clevel == 0) { 688 1119 /* Compression level 0 means buffer to be memcpy'ed */ 689 * flags|= BLOSC_MEMCPYED;690 } 691 692 if ( nbytes_< MIN_BUFFERSIZE) {1120 *(context->header_flags) |= BLOSC_MEMCPYED; 1121 } 1122 1123 if (context->sourcesize < MIN_BUFFERSIZE) { 693 1124 /* Buffer is too small. Try memcpy'ing. */ 694 *flags |= BLOSC_MEMCPYED; 695 } 696 697 if (doshuffle == 1) { 698 /* Shuffle is active */ 699 *flags |= BLOSC_DOSHUFFLE; /* bit 0 set to one in flags */ 700 } 701 702 /* Take global lock for the time of compression */ 703 pthread_mutex_lock(&global_comp_mutex); 704 /* Populate parameters for compression routines */ 705 params.compress = 1; 706 params.clevel = clevel; 707 params.flags = (int32_t)*flags; 708 params.typesize = (int32_t)typesize; 709 params.blocksize = blocksize; 710 params.ntbytes = ntbytes; 711 params.nbytes = nbytes_; 712 params.maxbytes = maxbytes; 713 params.nblocks = nblocks; 714 params.leftover = leftover; 715 params.bstarts = bstarts; 716 params.src = (uint8_t *)src; 717 params.dest = (uint8_t *)dest; 718 719 if (!(*flags & BLOSC_MEMCPYED)) { 1125 *(context->header_flags) |= BLOSC_MEMCPYED; 1126 } 1127 1128 if (doshuffle == BLOSC_SHUFFLE) { 1129 /* Byte-shuffle is active */ 1130 *(context->header_flags) |= BLOSC_DOSHUFFLE; /* bit 0 set to one in flags */ 1131 } 1132 1133 if (doshuffle == BLOSC_BITSHUFFLE) { 1134 /* Bit-shuffle is active */ 1135 *(context->header_flags) |= BLOSC_DOBITSHUFFLE; /* bit 2 set to one in flags */ 1136 } 1137 1138 *(context->header_flags) |= compformat << 5; /* compressor format start at bit 5 */ 1139 1140 return 1; 1141 } 1142 1143 int blosc_compress_context(struct blosc_context* context) 1144 { 1145 int32_t ntbytes = 0; 1146 1147 if (!(*(context->header_flags) & BLOSC_MEMCPYED)) { 720 1148 /* Do the actual compression */ 721 ntbytes = do_job( );1149 ntbytes = do_job(context); 722 1150 if (ntbytes < 0) { 723 1151 return -1; 724 1152 } 725 if ((ntbytes == 0) && ( nbytes_+BLOSC_MAX_OVERHEAD <= maxbytes)) {1153 if ((ntbytes == 0) && (context->sourcesize+BLOSC_MAX_OVERHEAD <= context->destsize)) { 726 1154 /* Last chance for fitting `src` buffer in `dest`. Update flags 727 1155 and do a memcpy later on. */ 728 *flags |= BLOSC_MEMCPYED; 729 params.flags |= BLOSC_MEMCPYED; 730 } 731 } 732 733 if (*flags & BLOSC_MEMCPYED) { 734 if (nbytes_+BLOSC_MAX_OVERHEAD > maxbytes) { 1156 *(context->header_flags) |= BLOSC_MEMCPYED; 1157 } 1158 } 1159 1160 if (*(context->header_flags) & BLOSC_MEMCPYED) { 1161 if (context->sourcesize + BLOSC_MAX_OVERHEAD > context->destsize) { 735 1162 /* We are exceeding maximum output size */ 736 1163 ntbytes = 0; 737 1164 } 738 else if (((nbytes_ % L1) == 0) || (nthreads > 1)) {739 /* More effective with large buffers that are multiples of the740 cache size or multi-cores */741 params.ntbytes = BLOSC_MAX_OVERHEAD;742 ntbytes = do_job();743 if (ntbytes < 0) {744 return -1;745 }746 }747 1165 else { 748 memcpy((uint8_t *)dest+BLOSC_MAX_OVERHEAD, src, nbytes_); 749 ntbytes = nbytes_ + BLOSC_MAX_OVERHEAD; 1166 memcpy(context->dest+BLOSC_MAX_OVERHEAD, context->src, 1167 context->sourcesize); 1168 ntbytes = context->sourcesize + BLOSC_MAX_OVERHEAD; 750 1169 } 751 1170 } 752 1171 753 1172 /* Set the number of compressed bytes in header */ 754 *ntbytes_ = sw32(ntbytes); 755 756 /* Release global lock */ 1173 _sw32(context->dest + 12, ntbytes); 1174 1175 assert(ntbytes <= context->destsize); 1176 return ntbytes; 1177 } 1178 1179 /* The public routine for compression with context. */ 1180 int blosc_compress_ctx(int clevel, int doshuffle, size_t typesize, 1181 size_t nbytes, const void* src, void* dest, 1182 size_t destsize, const char* compressor, 1183 size_t blocksize, int numinternalthreads) 1184 { 1185 int error, result; 1186 struct blosc_context context; 1187 1188 context.threads_started = 0; 1189 error = initialize_context_compression(&context, clevel, doshuffle, typesize, 1190 nbytes, src, dest, destsize, 1191 blosc_compname_to_compcode(compressor), 1192 blocksize, numinternalthreads); 1193 if (error < 0) { return error; } 1194 1195 error = write_compression_header(&context, clevel, doshuffle); 1196 if (error < 0) { return error; } 1197 1198 result = blosc_compress_context(&context); 1199 1200 if (numinternalthreads > 1) 1201 { 1202 blosc_release_threadpool(&context); 1203 } 1204 1205 return result; 1206 } 1207 1208 /* The public routine for compression. See blosc.h for docstrings. */ 1209 int blosc_compress(int clevel, int doshuffle, size_t typesize, size_t nbytes, 1210 const void *src, void *dest, size_t destsize) 1211 { 1212 int error; 1213 int result; 1214 char* envvar; 1215 1216 /* Check if should initialize */ 1217 if (!g_initlib) blosc_init(); 1218 1219 /* Check for a BLOSC_CLEVEL environment variable */ 1220 envvar = getenv("BLOSC_CLEVEL"); 1221 if (envvar != NULL) { 1222 long value; 1223 value = strtol(envvar, NULL, 10); 1224 if ((value != EINVAL) && (value >= 0)) { 1225 clevel = (int)value; 1226 } 1227 } 1228 1229 /* Check for a BLOSC_SHUFFLE environment variable */ 1230 envvar = getenv("BLOSC_SHUFFLE"); 1231 if (envvar != NULL) { 1232 if (strcmp(envvar, "NOSHUFFLE") == 0) { 1233 doshuffle = BLOSC_NOSHUFFLE; 1234 } 1235 if (strcmp(envvar, "SHUFFLE") == 0) { 1236 doshuffle = BLOSC_SHUFFLE; 1237 } 1238 if (strcmp(envvar, "BITSHUFFLE") == 0) { 1239 doshuffle = BLOSC_BITSHUFFLE; 1240 } 1241 } 1242 1243 /* Check for a BLOSC_TYPESIZE environment variable */ 1244 envvar = getenv("BLOSC_TYPESIZE"); 1245 if (envvar != NULL) { 1246 long value; 1247 value = strtol(envvar, NULL, 10); 1248 if ((value != EINVAL) && (value > 0)) { 1249 typesize = (int)value; 1250 } 1251 } 1252 1253 /* Check for a BLOSC_COMPRESSOR environment variable */ 1254 envvar = getenv("BLOSC_COMPRESSOR"); 1255 if (envvar != NULL) { 1256 result = blosc_set_compressor(envvar); 1257 if (result < 0) { return result; } 1258 } 1259 1260 /* Check for a BLOSC_COMPRESSOR environment variable */ 1261 envvar = getenv("BLOSC_BLOCKSIZE"); 1262 if (envvar != NULL) { 1263 long blocksize; 1264 blocksize = strtol(envvar, NULL, 10); 1265 if ((blocksize != EINVAL) && (blocksize > 0)) { 1266 blosc_set_blocksize((size_t)blocksize); 1267 } 1268 } 1269 1270 /* Check for a BLOSC_NTHREADS environment variable */ 1271 envvar = getenv("BLOSC_NTHREADS"); 1272 if (envvar != NULL) { 1273 long nthreads; 1274 nthreads = strtol(envvar, NULL, 10); 1275 if ((nthreads != EINVAL) && (nthreads > 0)) { 1276 result = blosc_set_nthreads((int)nthreads); 1277 if (result < 0) { return result; } 1278 } 1279 } 1280 1281 /* Check for a BLOSC_NOLOCK environment variable. It is important 1282 that this should be the last env var so that it can take the 1283 previous ones into account */ 1284 envvar = getenv("BLOSC_NOLOCK"); 1285 if (envvar != NULL) { 1286 char *compname; 1287 blosc_compcode_to_compname(g_compressor, &compname); 1288 result = blosc_compress_ctx(clevel, doshuffle, typesize, 1289 nbytes, src, dest, destsize, 1290 compname, g_force_blocksize, g_threads); 1291 return result; 1292 } 1293 1294 pthread_mutex_lock(&global_comp_mutex); 1295 1296 error = initialize_context_compression(g_global_context, clevel, doshuffle, 1297 typesize, nbytes, src, dest, destsize, 1298 g_compressor, g_force_blocksize, 1299 g_threads); 1300 if (error < 0) { return error; } 1301 1302 error = write_compression_header(g_global_context, clevel, doshuffle); 1303 if (error < 0) { return error; } 1304 1305 result = blosc_compress_context(g_global_context); 1306 757 1307 pthread_mutex_unlock(&global_comp_mutex); 758 759 assert((int32_t)ntbytes <= (int32_t)maxbytes); 760 return ntbytes; 761 } 762 763 764 /* The public routine for decompression. See blosc.h for docstrings. */ 765 int blosc_decompress(const void *src, void *dest, size_t destsize) 766 { 767 uint8_t *_src=NULL; /* current pos for source buffer */ 768 uint8_t version, versionlz; /* versions for compressed header */ 769 uint8_t flags; /* flags for header */ 770 int32_t ntbytes; /* the number of uncompressed bytes */ 771 int32_t nblocks; /* number of total blocks in buffer */ 772 int32_t leftover; /* extra bytes at end of buffer */ 773 int32_t *bstarts; /* start pointers for each block */ 774 int32_t typesize, blocksize, nbytes, ctbytes; 775 776 _src = (uint8_t *)(src); 1308 1309 return result; 1310 } 1311 1312 int blosc_run_decompression_with_context(struct blosc_context* context, 1313 const void* src, 1314 void* dest, 1315 size_t destsize, 1316 int numinternalthreads) 1317 { 1318 uint8_t version; 1319 uint8_t versionlz; 1320 uint32_t ctbytes; 1321 int32_t ntbytes; 1322 1323 context->compress = 0; 1324 context->src = (const uint8_t*)src; 1325 context->dest = (uint8_t*)dest; 1326 context->destsize = destsize; 1327 context->num_output_bytes = 0; 1328 context->numthreads = numinternalthreads; 1329 context->end_threads = 0; 777 1330 778 1331 /* Read the header block */ 779 version = _src[0]; /* blosc format version */ 780 versionlz = _src[1]; /* blosclz format version */ 781 flags = _src[2]; /* flags */ 782 typesize = (int32_t)_src[3]; /* typesize */ 783 _src += 4; 784 nbytes = sw32(((int32_t *)_src)[0]); /* buffer size */ 785 blocksize = sw32(((int32_t *)_src)[1]); /* block size */ 786 ctbytes = sw32(((int32_t *)_src)[2]); /* compressed buffer size */ 787 1332 version = context->src[0]; /* blosc format version */ 1333 versionlz = context->src[1]; /* blosclz format version */ 1334 1335 context->header_flags = (uint8_t*)(context->src + 2); /* flags */ 1336 context->typesize = (int32_t)context->src[3]; /* typesize */ 1337 context->sourcesize = sw32_(context->src + 4); /* buffer size */ 1338 context->blocksize = sw32_(context->src + 8); /* block size */ 1339 ctbytes = sw32_(context->src + 12); /* compressed buffer size */ 1340 1341 /* Unused values */ 788 1342 version += 0; /* shut up compiler warning */ 789 1343 versionlz += 0; /* shut up compiler warning */ 790 1344 ctbytes += 0; /* shut up compiler warning */ 791 1345 792 _src += sizeof(int32_t)*3; 793 bstarts = (int32_t *)_src; 1346 context->bstarts = (uint8_t*)(context->src + 16); 1347 /* Compute some params */ 1348 /* Total blocks */ 1349 context->nblocks = context->sourcesize / context->blocksize; 1350 context->leftover = context->sourcesize % context->blocksize; 1351 context->nblocks = (context->leftover>0)? context->nblocks+1: context->nblocks; 1352 1353 /* Check that we have enough space to decompress */ 1354 if (context->sourcesize > (int32_t)destsize) { 1355 return -1; 1356 } 1357 1358 /* Check whether this buffer is memcpy'ed */ 1359 if (*(context->header_flags) & BLOSC_MEMCPYED) { 1360 memcpy(dest, (uint8_t *)src+BLOSC_MAX_OVERHEAD, context->sourcesize); 1361 ntbytes = context->sourcesize; 1362 } 1363 else { 1364 /* Do the actual decompression */ 1365 ntbytes = do_job(context); 1366 if (ntbytes < 0) { 1367 return -1; 1368 } 1369 } 1370 1371 assert(ntbytes <= (int32_t)destsize); 1372 return ntbytes; 1373 } 1374 1375 /* The public routine for decompression with context. */ 1376 int blosc_decompress_ctx(const void *src, void *dest, size_t destsize, 1377 int numinternalthreads) 1378 { 1379 int result; 1380 struct blosc_context context; 1381 1382 context.threads_started = 0; 1383 result = blosc_run_decompression_with_context(&context, src, dest, destsize, numinternalthreads); 1384 1385 if (numinternalthreads > 1) 1386 { 1387 blosc_release_threadpool(&context); 1388 } 1389 1390 return result; 1391 } 1392 1393 1394 /* The public routine for decompression. See blosc.h for docstrings. */ 1395 int blosc_decompress(const void *src, void *dest, size_t destsize) 1396 { 1397 int result; 1398 char* envvar; 1399 long nthreads; 1400 1401 /* Check if should initialize */ 1402 if (!g_initlib) blosc_init(); 1403 1404 /* Check for a BLOSC_NTHREADS environment variable */ 1405 envvar = getenv("BLOSC_NTHREADS"); 1406 if (envvar != NULL) { 1407 nthreads = strtol(envvar, NULL, 10); 1408 if ((nthreads != EINVAL) && (nthreads > 0)) { 1409 result = blosc_set_nthreads((int)nthreads); 1410 if (result < 0) { return result; } 1411 } 1412 } 1413 1414 /* Check for a BLOSC_NOLOCK environment variable. It is important 1415 that this should be the last env var so that it can take the 1416 previous ones into account */ 1417 envvar = getenv("BLOSC_NOLOCK"); 1418 if (envvar != NULL) { 1419 result = blosc_decompress_ctx(src, dest, destsize, g_threads); 1420 return result; 1421 } 1422 1423 pthread_mutex_lock(&global_comp_mutex); 1424 1425 result = blosc_run_decompression_with_context(g_global_context, src, dest, 1426 destsize, g_threads); 1427 1428 pthread_mutex_unlock(&global_comp_mutex); 1429 1430 return result; 1431 } 1432 1433 1434 /* Specific routine optimized for decompression a small number of 1435 items out of a compressed chunk. This does not use threads because 1436 it would affect negatively to performance. */ 1437 int blosc_getitem(const void *src, int start, int nitems, void *dest) 1438 { 1439 uint8_t *_src=NULL; /* current pos for source buffer */ 1440 uint8_t version, versionlz; /* versions for compressed header */ 1441 uint8_t flags; /* flags for header */ 1442 int32_t ntbytes = 0; /* the number of uncompressed bytes */ 1443 int32_t nblocks; /* number of total blocks in buffer */ 1444 int32_t leftover; /* extra bytes at end of buffer */ 1445 uint8_t *bstarts; /* start pointers for each block */ 1446 int tmp_init = 0; 1447 int32_t typesize, blocksize, nbytes, ctbytes; 1448 int32_t j, bsize, bsize2, leftoverblock; 1449 int32_t cbytes, startb, stopb; 1450 int stop = start + nitems; 1451 uint8_t *tmp; 1452 uint8_t *tmp2; 1453 uint8_t *tmp3; 1454 int32_t ebsize; 1455 1456 _src = (uint8_t *)(src); 1457 1458 /* Read the header block */ 1459 version = _src[0]; /* blosc format version */ 1460 versionlz = _src[1]; /* blosclz format version */ 1461 flags = _src[2]; /* flags */ 1462 typesize = (int32_t)_src[3]; /* typesize */ 1463 nbytes = sw32_(_src + 4); /* buffer size */ 1464 blocksize = sw32_(_src + 8); /* block size */ 1465 ctbytes = sw32_(_src + 12); /* compressed buffer size */ 1466 1467 ebsize = blocksize + typesize * (int32_t)sizeof(int32_t); 1468 tmp = my_malloc(blocksize + ebsize + blocksize); 1469 tmp2 = tmp + blocksize; 1470 tmp3 = tmp + blocksize + ebsize; 1471 1472 version += 0; /* shut up compiler warning */ 1473 versionlz += 0; /* shut up compiler warning */ 1474 ctbytes += 0; /* shut up compiler warning */ 1475 1476 _src += 16; 1477 bstarts = _src; 794 1478 /* Compute some params */ 795 1479 /* Total blocks */ … … 799 1483 _src += sizeof(int32_t)*nblocks; 800 1484 801 /* Check that we have enough space to decompress */802 if (nbytes > (int32_t)destsize) {803 return -1;804 }805 806 /* Take global lock for the time of decompression */807 pthread_mutex_lock(&global_comp_mutex);808 809 /* Populate parameters for decompression routines */810 params.compress = 0;811 params.clevel = 0; /* specific for compression */812 params.flags = (int32_t)flags;813 params.typesize = typesize;814 params.blocksize = blocksize;815 params.ntbytes = 0;816 params.nbytes = nbytes;817 params.nblocks = nblocks;818 params.leftover = leftover;819 params.bstarts = bstarts;820 params.src = (uint8_t *)src;821 params.dest = (uint8_t *)dest;822 823 /* Check whether this buffer is memcpy'ed */824 if (flags & BLOSC_MEMCPYED) {825 if (((nbytes % L1) == 0) || (nthreads > 1)) {826 /* More effective with large buffers that are multiples of the827 cache size or multi-cores */828 ntbytes = do_job();829 if (ntbytes < 0) {830 return -1;831 }832 }833 else {834 memcpy(dest, (uint8_t *)src+BLOSC_MAX_OVERHEAD, nbytes);835 ntbytes = nbytes;836 }837 }838 else {839 /* Do the actual decompression */840 ntbytes = do_job();841 if (ntbytes < 0) {842 return -1;843 }844 }845 /* Release global lock */846 pthread_mutex_unlock(&global_comp_mutex);847 848 assert(ntbytes <= (int32_t)destsize);849 return ntbytes;850 }851 852 853 /* Specific routine optimized for decompression a small number of854 items out of a compressed chunk. This does not use threads because855 it would affect negatively to performance. */856 int blosc_getitem(const void *src, int start, int nitems, void *dest)857 {858 uint8_t *_src=NULL; /* current pos for source buffer */859 uint8_t version, versionlz; /* versions for compressed header */860 uint8_t flags; /* flags for header */861 int32_t ntbytes = 0; /* the number of uncompressed bytes */862 int32_t nblocks; /* number of total blocks in buffer */863 int32_t leftover; /* extra bytes at end of buffer */864 int32_t *bstarts; /* start pointers for each block */865 uint8_t *tmp = params.tmp[0]; /* tmp for thread 0 */866 uint8_t *tmp2 = params.tmp2[0]; /* tmp2 for thread 0 */867 int tmp_init = 0;868 int32_t typesize, blocksize, nbytes, ctbytes;869 int32_t j, bsize, bsize2, leftoverblock;870 int32_t cbytes, startb, stopb;871 int stop = start + nitems;872 873 _src = (uint8_t *)(src);874 875 /* Take global lock */876 pthread_mutex_lock(&global_comp_mutex);877 878 /* Read the header block */879 version = _src[0]; /* blosc format version */880 versionlz = _src[1]; /* blosclz format version */881 flags = _src[2]; /* flags */882 typesize = (int32_t)_src[3]; /* typesize */883 _src += 4;884 nbytes = sw32(((int32_t *)_src)[0]); /* buffer size */885 blocksize = sw32(((int32_t *)_src)[1]); /* block size */886 ctbytes = sw32(((int32_t *)_src)[2]); /* compressed buffer size */887 888 version += 0; /* shut up compiler warning */889 versionlz += 0; /* shut up compiler warning */890 ctbytes += 0; /* shut up compiler warning */891 892 _src += sizeof(int32_t)*3;893 bstarts = (int32_t *)_src;894 /* Compute some params */895 /* Total blocks */896 nblocks = nbytes / blocksize;897 leftover = nbytes % blocksize;898 nblocks = (leftover>0)? nblocks+1: nblocks;899 _src += sizeof(int32_t)*nblocks;900 901 1485 /* Check region boundaries */ 902 1486 if ((start < 0) || (start*typesize > nbytes)) { 903 1487 fprintf(stderr, "`start` out of bounds"); 904 return (-1);1488 return -1; 905 1489 } 906 1490 907 1491 if ((stop < 0) || (stop*typesize > nbytes)) { 908 1492 fprintf(stderr, "`start`+`nitems` out of bounds"); 909 return (-1); 910 } 911 912 /* Parameters needed by blosc_d */ 913 params.typesize = typesize; 914 params.flags = flags; 915 916 /* Initialize temporaries if needed */ 917 if (tmp == NULL || tmp2 == NULL || current_temp.blocksize < blocksize) { 918 tmp = my_malloc(blocksize); 919 if (tmp == NULL) { 920 return -1; 921 } 922 tmp2 = my_malloc(blocksize); 923 if (tmp2 == NULL) { 924 return -1; 925 } 926 tmp_init = 1; 1493 return -1; 927 1494 } 928 1495 … … 958 1525 } 959 1526 else { 1527 struct blosc_context context; 1528 /* blosc_d only uses typesize and flags */ 1529 context.typesize = typesize; 1530 context.header_flags = &flags; 1531 960 1532 /* Regular decompression. Put results in tmp2. */ 961 cbytes = blosc_d(bsize, leftoverblock, 962 (uint8_t *)src+sw32(bstarts[j]), tmp2, tmp, tmp2); 1533 cbytes = blosc_d(&context, bsize, leftoverblock, 1534 (uint8_t *)src + sw32_(bstarts + j * 4), 1535 tmp2, tmp, tmp3); 963 1536 if (cbytes < 0) { 964 1537 ntbytes = cbytes; … … 971 1544 ntbytes += cbytes; 972 1545 } 973 974 /* Release global lock */ 975 pthread_mutex_unlock(&global_comp_mutex); 976 977 if (tmp_init) { 978 my_free(tmp); 979 my_free(tmp2); 980 } 1546 1547 my_free(tmp); 981 1548 982 1549 return ntbytes; … … 985 1552 986 1553 /* Decompress & unshuffle several blocks in a single thread */ 987 static int t_blosc(void *tids)988 { 989 int32_t tid = *(int32_t *)tids;1554 static void *t_blosc(void *ctxt) 1555 { 1556 struct thread_context* context = (struct thread_context*)ctxt; 990 1557 int32_t cbytes, ntdest; 991 1558 int32_t tblocks; /* number of blocks per thread */ … … 1003 1570 int32_t nblocks; 1004 1571 int32_t leftover; 1005 int32_t *bstarts;1006 uint8_t *src;1572 uint8_t *bstarts; 1573 const uint8_t *src; 1007 1574 uint8_t *dest; 1008 1575 uint8_t *tmp; 1009 1576 uint8_t *tmp2; 1010 1011 while (1) {1012 1013 init_sentinels_done = 0; /* sentinels have to be initialised yet */1014 1577 uint8_t *tmp3; 1578 int rc; 1579 1580 while(1) 1581 { 1015 1582 /* Synchronization point for all threads (wait for initialization) */ 1016 WAIT_INIT; 1017 1018 /* Check if thread has been asked to return */ 1019 if (end_threads) { 1020 return(0); 1021 } 1022 1023 pthread_mutex_lock(&count_mutex); 1024 if (!init_sentinels_done) { 1025 /* Set sentinels and other global variables */ 1026 giveup_code = 1; /* no error code initially */ 1027 nblock = -1; /* block counter */ 1028 init_sentinels_done = 1; /* sentinels have been initialised */ 1029 } 1030 pthread_mutex_unlock(&count_mutex); 1583 WAIT_INIT(NULL, context->parent_context); 1584 1585 if(context->parent_context->end_threads) 1586 { 1587 break; 1588 } 1031 1589 1032 1590 /* Get parameters for this thread before entering the main loop */ 1033 blocksize = params.blocksize; 1034 ebsize = blocksize + params.typesize*(int32_t)sizeof(int32_t); 1035 compress = params.compress; 1036 flags = params.flags; 1037 maxbytes = params.maxbytes; 1038 nblocks = params.nblocks; 1039 leftover = params.leftover; 1040 bstarts = params.bstarts; 1041 src = params.src; 1042 dest = params.dest; 1043 tmp = params.tmp[tid]; 1044 tmp2 = params.tmp2[tid]; 1591 blocksize = context->parent_context->blocksize; 1592 ebsize = blocksize + context->parent_context->typesize * (int32_t)sizeof(int32_t); 1593 compress = context->parent_context->compress; 1594 flags = *(context->parent_context->header_flags); 1595 maxbytes = context->parent_context->destsize; 1596 nblocks = context->parent_context->nblocks; 1597 leftover = context->parent_context->leftover; 1598 bstarts = context->parent_context->bstarts; 1599 src = context->parent_context->src; 1600 dest = context->parent_context->dest; 1601 1602 if (blocksize > context->tmpblocksize) 1603 { 1604 my_free(context->tmp); 1605 context->tmp = my_malloc(blocksize + ebsize + blocksize); 1606 context->tmp2 = context->tmp + blocksize; 1607 context->tmp3 = context->tmp + blocksize + ebsize; 1608 } 1609 1610 tmp = context->tmp; 1611 tmp2 = context->tmp2; 1612 tmp3 = context->tmp3; 1045 1613 1046 1614 ntbytes = 0; /* only useful for decompression */ … … 1048 1616 if (compress && !(flags & BLOSC_MEMCPYED)) { 1049 1617 /* Compression always has to follow the block order */ 1050 pthread_mutex_lock(&co unt_mutex);1051 nblock++;1052 nblock_ = nblock;1053 pthread_mutex_unlock(&co unt_mutex);1618 pthread_mutex_lock(&context->parent_context->count_mutex); 1619 context->parent_context->thread_nblock++; 1620 nblock_ = context->parent_context->thread_nblock; 1621 pthread_mutex_unlock(&context->parent_context->count_mutex); 1054 1622 tblock = nblocks; 1055 1623 } … … 1059 1627 1060 1628 /* Blocks per thread */ 1061 tblocks = nblocks / nthreads;1062 leftover2 = nblocks % nthreads;1629 tblocks = nblocks / context->parent_context->numthreads; 1630 leftover2 = nblocks % context->parent_context->numthreads; 1063 1631 tblocks = (leftover2>0)? tblocks+1: tblocks; 1064 1632 1065 nblock_ = tid*tblocks;1633 nblock_ = context->tid*tblocks; 1066 1634 tblock = nblock_ + tblocks; 1067 1635 if (tblock > nblocks) { … … 1072 1640 /* Loop over blocks */ 1073 1641 leftoverblock = 0; 1074 while ((nblock_ < tblock) && giveup_code > 0) {1642 while ((nblock_ < tblock) && context->parent_context->thread_giveup_code > 0) { 1075 1643 bsize = blocksize; 1076 1644 if (nblock_ == (nblocks - 1) && (leftover > 0)) { … … 1087 1655 else { 1088 1656 /* Regular compression */ 1089 cbytes = blosc_c( bsize, leftoverblock, 0, ebsize,1090 src+nblock_*blocksize, tmp2, tmp );1657 cbytes = blosc_c(context->parent_context, bsize, leftoverblock, 0, ebsize, 1658 src+nblock_*blocksize, tmp2, tmp, tmp3); 1091 1659 } 1092 1660 } … … 1099 1667 } 1100 1668 else { 1101 cbytes = blosc_d(bsize, leftoverblock, 1102 src+sw32(bstarts[nblock_]), dest+nblock_*blocksize, 1669 cbytes = blosc_d(context->parent_context, bsize, leftoverblock, 1670 src + sw32_(bstarts + nblock_ * 4), 1671 dest+nblock_*blocksize, 1103 1672 tmp, tmp2); 1104 1673 } … … 1106 1675 1107 1676 /* Check whether current thread has to giveup */ 1108 if ( giveup_code <= 0) {1677 if (context->parent_context->thread_giveup_code <= 0) { 1109 1678 break; 1110 1679 } … … 1113 1682 if (cbytes < 0) { /* compr/decompr failure */ 1114 1683 /* Set giveup_code error */ 1115 pthread_mutex_lock(&co unt_mutex);1116 giveup_code = cbytes;1117 pthread_mutex_unlock(&co unt_mutex);1684 pthread_mutex_lock(&context->parent_context->count_mutex); 1685 context->parent_context->thread_giveup_code = cbytes; 1686 pthread_mutex_unlock(&context->parent_context->count_mutex); 1118 1687 break; 1119 1688 } … … 1121 1690 if (compress && !(flags & BLOSC_MEMCPYED)) { 1122 1691 /* Start critical section */ 1123 pthread_mutex_lock(&co unt_mutex);1124 ntdest = params.ntbytes;1125 bstarts[nblock_] = sw32(ntdest);/* update block start counter */1126 if ( (cbytes == 0) || (ntdest+cbytes > (int32_t)maxbytes) ) {1127 giveup_code = 0;/* uncompressible buffer */1128 pthread_mutex_unlock(&co unt_mutex);1692 pthread_mutex_lock(&context->parent_context->count_mutex); 1693 ntdest = context->parent_context->num_output_bytes; 1694 _sw32(bstarts + nblock_ * 4, ntdest); /* update block start counter */ 1695 if ( (cbytes == 0) || (ntdest+cbytes > maxbytes) ) { 1696 context->parent_context->thread_giveup_code = 0; /* uncompressible buffer */ 1697 pthread_mutex_unlock(&context->parent_context->count_mutex); 1129 1698 break; 1130 1699 } 1131 nblock++;1132 nblock_ = nblock;1133 params.ntbytes += cbytes; /* update return bytes counter */1134 pthread_mutex_unlock(&co unt_mutex);1700 context->parent_context->thread_nblock++; 1701 nblock_ = context->parent_context->thread_nblock; 1702 context->parent_context->num_output_bytes += cbytes; /* update return bytes counter */ 1703 pthread_mutex_unlock(&context->parent_context->count_mutex); 1135 1704 /* End of critical section */ 1136 1705 … … 1147 1716 1148 1717 /* Sum up all the bytes decompressed */ 1149 if ((!compress || (flags & BLOSC_MEMCPYED)) && giveup_code > 0) {1718 if ((!compress || (flags & BLOSC_MEMCPYED)) && context->parent_context->thread_giveup_code > 0) { 1150 1719 /* Update global counter for all threads (decompression only) */ 1151 pthread_mutex_lock(&co unt_mutex);1152 params.ntbytes += ntbytes;1153 pthread_mutex_unlock(&co unt_mutex);1720 pthread_mutex_lock(&context->parent_context->count_mutex); 1721 context->parent_context->num_output_bytes += ntbytes; 1722 pthread_mutex_unlock(&context->parent_context->count_mutex); 1154 1723 } 1155 1724 1156 1725 /* Meeting point for all threads (wait for finalization) */ 1157 WAIT_FINISH; 1158 1159 } /* closes while(1) */ 1160 1161 /* This should never be reached, but anyway */ 1162 return(0); 1163 } 1164 1165 1166 static int init_threads(void) 1726 WAIT_FINISH(NULL, context->parent_context); 1727 } 1728 1729 /* Cleanup our working space and context */ 1730 my_free(context->tmp); 1731 my_free(context); 1732 1733 return(NULL); 1734 } 1735 1736 1737 static int init_threads(struct blosc_context* context) 1167 1738 { 1168 1739 int32_t tid; 1169 1740 int rc2; 1741 int32_t ebsize; 1742 struct thread_context* thread_context; 1170 1743 1171 1744 /* Initialize mutex and condition variable objects */ 1172 pthread_mutex_init(&count_mutex, NULL); 1745 pthread_mutex_init(&context->count_mutex, NULL); 1746 1747 /* Set context thread sentinels */ 1748 context->thread_giveup_code = 1; 1749 context->thread_nblock = -1; 1173 1750 1174 1751 /* Barrier initialization */ 1175 1752 #ifdef _POSIX_BARRIERS_MINE 1176 pthread_barrier_init(& barr_init, NULL, nthreads+1);1177 pthread_barrier_init(& barr_finish, NULL, nthreads+1);1753 pthread_barrier_init(&context->barr_init, NULL, context->numthreads+1); 1754 pthread_barrier_init(&context->barr_finish, NULL, context->numthreads+1); 1178 1755 #else 1179 pthread_mutex_init(&co unt_threads_mutex, NULL);1180 pthread_cond_init(&co unt_threads_cv, NULL);1181 co unt_threads = 0; /* Reset threads counter */1756 pthread_mutex_init(&context->count_threads_mutex, NULL); 1757 pthread_cond_init(&context->count_threads_cv, NULL); 1758 context->count_threads = 0; /* Reset threads counter */ 1182 1759 #endif 1183 1760 1184 1761 #if !defined(_WIN32) 1185 1762 /* Initialize and set thread detached attribute */ 1186 pthread_attr_init(&c t_attr);1187 pthread_attr_setdetachstate(&c t_attr, PTHREAD_CREATE_JOINABLE);1763 pthread_attr_init(&context->ct_attr); 1764 pthread_attr_setdetachstate(&context->ct_attr, PTHREAD_CREATE_JOINABLE); 1188 1765 #endif 1189 1766 1190 1767 /* Finally, create the threads in detached state */ 1191 for (tid = 0; tid < nthreads; tid++) { 1192 tids[tid] = tid; 1768 for (tid = 0; tid < context->numthreads; tid++) { 1769 context->tids[tid] = tid; 1770 1771 /* Create a thread context thread owns context (will destroy when finished) */ 1772 thread_context = (struct thread_context*)my_malloc(sizeof(struct thread_context)); 1773 thread_context->parent_context = context; 1774 thread_context->tid = tid; 1775 1776 ebsize = context->blocksize + context->typesize * (int32_t)sizeof(int32_t); 1777 thread_context->tmp = my_malloc(context->blocksize + ebsize + context->blocksize); 1778 thread_context->tmp2 = thread_context->tmp + context->blocksize; 1779 thread_context->tmp3 = thread_context->tmp + context->blocksize + ebsize; 1780 thread_context->tmpblocksize = context->blocksize; 1781 1193 1782 #if !defined(_WIN32) 1194 rc2 = pthread_create(&threads[tid], &ct_attr, (void*)t_blosc, 1195 (void *)&tids[tid]); 1783 rc2 = pthread_create(&context->threads[tid], &context->ct_attr, t_blosc, (void *)thread_context); 1196 1784 #else 1197 rc2 = pthread_create(&threads[tid], NULL, (void*)t_blosc, 1198 (void *)&tids[tid]); 1785 rc2 = pthread_create(&context->threads[tid], NULL, t_blosc, (void *)thread_context); 1199 1786 #endif 1200 1787 if (rc2) { … … 1205 1792 } 1206 1793 1207 init_threads_done = 1; /* Initialization done! */1208 pid = (int)getpid(); /* save the PID for this process */1209 1794 1210 1795 return(0); 1211 1796 } 1212 1797 1213 void blosc_init(void) { 1214 /* Init global lock */ 1215 pthread_mutex_init(&global_comp_mutex, NULL); 1216 init_lib = 1; 1217 } 1218 1219 int blosc_set_nthreads(int nthreads_new) 1220 { 1221 int ret; 1222 1223 /* Check if should initialize (implementing previous 1.2.3 behaviour, 1224 where calling blosc_set_nthreads was enough) */ 1225 if (!init_lib) blosc_init(); 1226 1227 /* Take global lock */ 1228 pthread_mutex_lock(&global_comp_mutex); 1229 1230 ret = blosc_set_nthreads_(nthreads_new); 1231 /* Release global lock */ 1232 pthread_mutex_unlock(&global_comp_mutex); 1233 1798 int blosc_get_nthreads(void) 1799 { 1800 int ret = g_threads; 1801 1234 1802 return ret; 1235 1803 } 1236 1804 1237 int blosc_set_nthreads_(int nthreads_new) 1238 { 1239 int32_t nthreads_old = nthreads; 1240 int32_t t; 1241 int rc2; 1242 void *status; 1243 1244 if (nthreads_new > BLOSC_MAX_THREADS) { 1805 int blosc_set_nthreads(int nthreads_new) 1806 { 1807 int ret = g_threads; 1808 1809 /* Check if should initialize */ 1810 if (!g_initlib) blosc_init(); 1811 1812 if (nthreads_new != ret){ 1813 /* Re-initialize Blosc */ 1814 blosc_destroy(); 1815 blosc_init(); 1816 g_threads = nthreads_new; 1817 } 1818 1819 return ret; 1820 } 1821 1822 int blosc_set_nthreads_(struct blosc_context* context) 1823 { 1824 if (context->numthreads > BLOSC_MAX_THREADS) { 1245 1825 fprintf(stderr, 1246 1826 "Error. nthreads cannot be larger than BLOSC_MAX_THREADS (%d)", … … 1248 1828 return -1; 1249 1829 } 1250 else if ( nthreads_new<= 0) {1830 else if (context->numthreads <= 0) { 1251 1831 fprintf(stderr, "Error. nthreads must be a positive integer"); 1252 1832 return -1; 1253 1833 } 1254 1834 1255 /* Only join threads if they are not initialized or if our PID is 1256 different from that in pid var (probably means that we are a 1257 subprocess, and thus threads are non-existent). */ 1258 if (nthreads > 1 && init_threads_done && pid == getpid()) { 1259 /* Tell all existing threads to finish */ 1260 end_threads = 1; 1261 /* Synchronization point for all threads (wait for initialization) */ 1262 WAIT_INIT; 1263 /* Join exiting threads */ 1264 for (t=0; t<nthreads; t++) { 1265 rc2 = pthread_join(threads[t], &status); 1266 if (rc2) { 1267 fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc2); 1268 fprintf(stderr, "\tError detail: %s\n", strerror(rc2)); 1269 return(-1); 1270 } 1271 } 1272 init_threads_done = 0; 1273 end_threads = 0; 1274 } 1275 1276 /* Launch a new pool of threads (if necessary) */ 1277 nthreads = nthreads_new; 1278 if (nthreads > 1 && (!init_threads_done || pid != getpid())) { 1279 init_threads(); 1280 } 1281 1282 return nthreads_old; 1283 } 1284 1285 1286 /* Free possible memory temporaries and thread resources */ 1287 int blosc_free_resources(void) 1835 /* Launch a new pool of threads */ 1836 if (context->numthreads > 1 && context->numthreads != context->threads_started) { 1837 blosc_release_threadpool(context); 1838 init_threads(context); 1839 } 1840 1841 /* We have now started the threads */ 1842 context->threads_started = context->numthreads; 1843 1844 return context->numthreads; 1845 } 1846 1847 char* blosc_get_compressor(void) 1848 { 1849 char* compname; 1850 blosc_compcode_to_compname(g_compressor, &compname); 1851 1852 return compname; 1853 } 1854 1855 int blosc_set_compressor(const char *compname) 1856 { 1857 int code = blosc_compname_to_compcode(compname); 1858 1859 g_compressor = code; 1860 1861 /* Check if should initialize */ 1862 if (!g_initlib) blosc_init(); 1863 1864 return code; 1865 } 1866 1867 char* blosc_list_compressors(void) 1868 { 1869 static int compressors_list_done = 0; 1870 static char ret[256]; 1871 1872 if (compressors_list_done) return ret; 1873 ret[0] = '\0'; 1874 strcat(ret, BLOSC_BLOSCLZ_COMPNAME); 1875 #if defined(HAVE_LZ4) 1876 strcat(ret, ","); strcat(ret, BLOSC_LZ4_COMPNAME); 1877 strcat(ret, ","); strcat(ret, BLOSC_LZ4HC_COMPNAME); 1878 #endif /* HAVE_LZ4 */ 1879 #if defined(HAVE_SNAPPY) 1880 strcat(ret, ","); strcat(ret, BLOSC_SNAPPY_COMPNAME); 1881 #endif /* HAVE_SNAPPY */ 1882 #if defined(HAVE_ZLIB) 1883 strcat(ret, ","); strcat(ret, BLOSC_ZLIB_COMPNAME); 1884 #endif /* HAVE_ZLIB */ 1885 #if defined(HAVE_ZSTD) 1886 strcat(ret, ","); strcat(ret, BLOSC_ZSTD_COMPNAME); 1887 #endif /* HAVE_ZSTD */ 1888 compressors_list_done = 1; 1889 return ret; 1890 } 1891 1892 char* blosc_get_version_string(void) 1893 { 1894 static char ret[256]; 1895 strcpy(ret, BLOSC_VERSION_STRING); 1896 return ret; 1897 } 1898 1899 int blosc_get_complib_info(char *compname, char **complib, char **version) 1900 { 1901 int clibcode; 1902 char *clibname; 1903 char *clibversion = "unknown"; 1904 1905 #if (defined(HAVE_LZ4) && defined(LZ4_VERSION_MAJOR)) || (defined(HAVE_SNAPPY) && defined(SNAPPY_VERSION)) || defined(ZSTD_VERSION_MAJOR) 1906 char sbuffer[256]; 1907 #endif 1908 1909 clibcode = compname_to_clibcode(compname); 1910 clibname = clibcode_to_clibname(clibcode); 1911 1912 /* complib version */ 1913 if (clibcode == BLOSC_BLOSCLZ_LIB) { 1914 clibversion = BLOSCLZ_VERSION_STRING; 1915 } 1916 #if defined(HAVE_LZ4) 1917 else if (clibcode == BLOSC_LZ4_LIB) { 1918 #if defined(LZ4_VERSION_MAJOR) 1919 sprintf(sbuffer, "%d.%d.%d", 1920 LZ4_VERSION_MAJOR, LZ4_VERSION_MINOR, LZ4_VERSION_RELEASE); 1921 clibversion = sbuffer; 1922 #endif /* LZ4_VERSION_MAJOR */ 1923 } 1924 #endif /* HAVE_LZ4 */ 1925 #if defined(HAVE_SNAPPY) 1926 else if (clibcode == BLOSC_SNAPPY_LIB) { 1927 #if defined(SNAPPY_VERSION) 1928 sprintf(sbuffer, "%d.%d.%d", SNAPPY_MAJOR, SNAPPY_MINOR, SNAPPY_PATCHLEVEL); 1929 clibversion = sbuffer; 1930 #endif /* SNAPPY_VERSION */ 1931 } 1932 #endif /* HAVE_SNAPPY */ 1933 #if defined(HAVE_ZLIB) 1934 else if (clibcode == BLOSC_ZLIB_LIB) { 1935 clibversion = ZLIB_VERSION; 1936 } 1937 #endif /* HAVE_ZLIB */ 1938 #if defined(HAVE_ZSTD) 1939 else if (clibcode == BLOSC_ZSTD_LIB) { 1940 sprintf(sbuffer, "%d.%d.%d", 1941 ZSTD_VERSION_MAJOR, ZSTD_VERSION_MINOR, ZSTD_VERSION_RELEASE); 1942 clibversion = sbuffer; 1943 } 1944 #endif /* HAVE_ZSTD */ 1945 1946 *complib = strdup(clibname); 1947 *version = strdup(clibversion); 1948 return clibcode; 1949 } 1950 1951 /* Return `nbytes`, `cbytes` and `blocksize` from a compressed buffer. */ 1952 void blosc_cbuffer_sizes(const void *cbuffer, size_t *nbytes, 1953 size_t *cbytes, size_t *blocksize) 1954 { 1955 uint8_t *_src = (uint8_t *)(cbuffer); /* current pos for source buffer */ 1956 uint8_t version, versionlz; /* versions for compressed header */ 1957 1958 /* Read the version info (could be useful in the future) */ 1959 version = _src[0]; /* blosc format version */ 1960 versionlz = _src[1]; /* blosclz format version */ 1961 1962 version += 0; /* shut up compiler warning */ 1963 versionlz += 0; /* shut up compiler warning */ 1964 1965 /* Read the interesting values */ 1966 *nbytes = (size_t)sw32_(_src + 4); /* uncompressed buffer size */ 1967 *blocksize = (size_t)sw32_(_src + 8); /* block size */ 1968 *cbytes = (size_t)sw32_(_src + 12); /* compressed buffer size */ 1969 } 1970 1971 1972 /* Return `typesize` and `flags` from a compressed buffer. */ 1973 void blosc_cbuffer_metainfo(const void *cbuffer, size_t *typesize, 1974 int *flags) 1975 { 1976 uint8_t *_src = (uint8_t *)(cbuffer); /* current pos for source buffer */ 1977 uint8_t version, versionlz; /* versions for compressed header */ 1978 1979 /* Read the version info (could be useful in the future) */ 1980 version = _src[0]; /* blosc format version */ 1981 versionlz = _src[1]; /* blosclz format version */ 1982 1983 version += 0; /* shut up compiler warning */ 1984 versionlz += 0; /* shut up compiler warning */ 1985 1986 /* Read the interesting values */ 1987 *flags = (int)_src[2]; /* flags */ 1988 *typesize = (size_t)_src[3]; /* typesize */ 1989 } 1990 1991 1992 /* Return version information from a compressed buffer. */ 1993 void blosc_cbuffer_versions(const void *cbuffer, int *version, 1994 int *versionlz) 1995 { 1996 uint8_t *_src = (uint8_t *)(cbuffer); /* current pos for source buffer */ 1997 1998 /* Read the version info */ 1999 *version = (int)_src[0]; /* blosc format version */ 2000 *versionlz = (int)_src[1]; /* Lempel-Ziv compressor format version */ 2001 } 2002 2003 2004 /* Return the compressor library/format used in a compressed buffer. */ 2005 char *blosc_cbuffer_complib(const void *cbuffer) 2006 { 2007 uint8_t *_src = (uint8_t *)(cbuffer); /* current pos for source buffer */ 2008 int clibcode; 2009 char *complib; 2010 2011 /* Read the compressor format/library info */ 2012 clibcode = (_src[2] & 0xe0) >> 5; 2013 complib = clibcode_to_clibname(clibcode); 2014 return complib; 2015 } 2016 2017 /* Get the internal blocksize to be used during compression. 0 means 2018 that an automatic blocksize is computed internally. */ 2019 int blosc_get_blocksize(void) 2020 { 2021 return (int)g_force_blocksize; 2022 } 2023 2024 /* Force the use of a specific blocksize. If 0, an automatic 2025 blocksize will be used (the default). */ 2026 void blosc_set_blocksize(size_t size) 2027 { 2028 g_force_blocksize = (int32_t)size; 2029 } 2030 2031 void blosc_init(void) 2032 { 2033 /* Return if we are already initialized */ 2034 if (g_initlib) return; 2035 2036 pthread_mutex_init(&global_comp_mutex, NULL); 2037 g_global_context = (struct blosc_context*)my_malloc(sizeof(struct blosc_context)); 2038 g_global_context->threads_started = 0; 2039 g_initlib = 1; 2040 } 2041 2042 void blosc_destroy(void) 2043 { 2044 /* Return if Blosc is not initialized */ 2045 if (!g_initlib) return; 2046 2047 g_initlib = 0; 2048 blosc_release_threadpool(g_global_context); 2049 my_free(g_global_context); 2050 pthread_mutex_destroy(&global_comp_mutex); 2051 } 2052 2053 int blosc_release_threadpool(struct blosc_context* context) 1288 2054 { 1289 2055 int32_t t; 2056 void* status; 2057 int rc; 1290 2058 int rc2; 1291 void *status; 1292 1293 /* Take global lock */ 1294 pthread_mutex_lock(&global_comp_mutex); 1295 1296 /* Release temporaries */ 1297 if (init_temps_done) { 1298 release_temporaries(); 1299 } 1300 1301 /* Finish the possible thread pool */ 1302 if (nthreads > 1 && init_threads_done) { 2059 2060 if (context->threads_started > 0) 2061 { 1303 2062 /* Tell all existing threads to finish */ 1304 end_threads = 1; 1305 /* Synchronization point for all threads (wait for initialization) */ 1306 WAIT_INIT; 2063 context->end_threads = 1; 2064 2065 /* Sync threads */ 2066 WAIT_INIT(-1, context); 2067 1307 2068 /* Join exiting threads */ 1308 for (t=0; t< nthreads; t++) {1309 rc2 = pthread_join( threads[t], &status);2069 for (t=0; t<context->threads_started; t++) { 2070 rc2 = pthread_join(context->threads[t], &status); 1310 2071 if (rc2) { 1311 2072 fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc2); 1312 2073 fprintf(stderr, "\tError detail: %s\n", strerror(rc2)); 1313 return(-1);1314 2074 } 1315 2075 } 1316 2076 1317 2077 /* Release mutex and condition variable objects */ 1318 pthread_mutex_destroy(&co unt_mutex);2078 pthread_mutex_destroy(&context->count_mutex); 1319 2079 1320 2080 /* Barriers */ 1321 #ifdef _POSIX_BARRIERS_MINE 1322 pthread_barrier_destroy(&barr_init); 1323 pthread_barrier_destroy(&barr_finish); 1324 #else 1325 pthread_mutex_destroy(&count_threads_mutex); 1326 pthread_cond_destroy(&count_threads_cv); 1327 #endif 1328 1329 /* Thread attributes */ 1330 #if !defined(_WIN32) 1331 pthread_attr_destroy(&ct_attr); 1332 #endif 1333 1334 init_threads_done = 0; 1335 end_threads = 0; 1336 } 1337 /* Release global lock */ 1338 pthread_mutex_unlock(&global_comp_mutex); 1339 return(0); 1340 1341 } 1342 1343 void blosc_destroy(void) { 1344 /* Free the resources */ 1345 blosc_free_resources(); 1346 /* Destroy global lock */ 1347 pthread_mutex_destroy(&global_comp_mutex); 1348 } 1349 1350 /* Return `nbytes`, `cbytes` and `blocksize` from a compressed buffer. */ 1351 void blosc_cbuffer_sizes(const void *cbuffer, size_t *nbytes, 1352 size_t *cbytes, size_t *blocksize) 1353 { 1354 uint8_t *_src = (uint8_t *)(cbuffer); /* current pos for source buffer */ 1355 uint8_t version, versionlz; /* versions for compressed header */ 1356 1357 /* Read the version info (could be useful in the future) */ 1358 version = _src[0]; /* blosc format version */ 1359 versionlz = _src[1]; /* blosclz format version */ 1360 1361 version += 0; /* shut up compiler warning */ 1362 versionlz += 0; /* shut up compiler warning */ 1363 1364 /* Read the interesting values */ 1365 _src += 4; 1366 *nbytes = (size_t)sw32(((int32_t *)_src)[0]); /* uncompressed buffer size */ 1367 *blocksize = (size_t)sw32(((int32_t *)_src)[1]); /* block size */ 1368 *cbytes = (size_t)sw32(((int32_t *)_src)[2]); /* compressed buffer size */ 1369 } 1370 1371 1372 /* Return `typesize` and `flags` from a compressed buffer. */ 1373 void blosc_cbuffer_metainfo(const void *cbuffer, size_t *typesize, 1374 int *flags) 1375 { 1376 uint8_t *_src = (uint8_t *)(cbuffer); /* current pos for source buffer */ 1377 uint8_t version, versionlz; /* versions for compressed header */ 1378 1379 /* Read the version info (could be useful in the future) */ 1380 version = _src[0]; /* blosc format version */ 1381 versionlz = _src[1]; /* blosclz format version */ 1382 1383 version += 0; /* shut up compiler warning */ 1384 versionlz += 0; /* shut up compiler warning */ 1385 1386 /* Read the interesting values */ 1387 *flags = (int)_src[2]; /* flags */ 1388 *typesize = (size_t)_src[3]; /* typesize */ 1389 } 1390 1391 1392 /* Return version information from a compressed buffer. */ 1393 void blosc_cbuffer_versions(const void *cbuffer, int *version, 1394 int *versionlz) 1395 { 1396 uint8_t *_src = (uint8_t *)(cbuffer); /* current pos for source buffer */ 1397 1398 /* Read the version info */ 1399 *version = (int)_src[0]; /* blosc format version */ 1400 *versionlz = (int)_src[1]; /* blosclz format version */ 1401 } 1402 1403 1404 /* Force the use of a specific blocksize. If 0, an automatic 1405 blocksize will be used (the default). */ 1406 void blosc_set_blocksize(size_t size) 1407 { 1408 /* Take global lock */ 1409 pthread_mutex_lock(&global_comp_mutex); 1410 1411 force_blocksize = (int32_t)size; 1412 1413 /* Release global lock */ 1414 pthread_mutex_unlock(&global_comp_mutex); 1415 } 2081 #ifdef _POSIX_BARRIERS_MINE 2082 pthread_barrier_destroy(&context->barr_init); 2083 pthread_barrier_destroy(&context->barr_finish); 2084 #else 2085 pthread_mutex_destroy(&context->count_threads_mutex); 2086 pthread_cond_destroy(&context->count_threads_cv); 2087 #endif 2088 2089 /* Thread attributes */ 2090 #if !defined(_WIN32) 2091 pthread_attr_destroy(&context->ct_attr); 2092 #endif 2093 2094 } 2095 2096 context->threads_started = 0; 2097 2098 return 0; 2099 } 2100 2101 int blosc_free_resources(void) 2102 { 2103 /* Return if Blosc is not initialized */ 2104 if (!g_initlib) return -1; 2105 2106 return blosc_release_threadpool(g_global_context); 2107 }
Note: See TracChangeset
for help on using the changeset viewer.