1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #include <grpc/support/port_platform.h> 20 21 #include "src/core/lib/compression/message_compress.h" 22 23 #include <string.h> 24 25 #include <grpc/support/alloc.h> 26 #include <grpc/support/log.h> 27 28 #include <zlib.h> 29 30 #include "src/core/lib/slice/slice_internal.h" 31 32 #define OUTPUT_BLOCK_SIZE 1024 33 34 static int zlib_body(z_stream* zs, grpc_slice_buffer* input, 35 grpc_slice_buffer* output, 36 int (*flate)(z_stream* zs, int flush)) { 37 int r; 38 int flush; 39 size_t i; 40 grpc_slice outbuf = GRPC_SLICE_MALLOC(OUTPUT_BLOCK_SIZE); 41 const uInt uint_max = ~static_cast<uInt>(0); 42 43 GPR_ASSERT(GRPC_SLICE_LENGTH(outbuf) <= uint_max); 44 zs->avail_out = static_cast<uInt> GRPC_SLICE_LENGTH(outbuf); 45 zs->next_out = GRPC_SLICE_START_PTR(outbuf); 46 flush = Z_NO_FLUSH; 47 for (i = 0; i < input->count; i++) { 48 if (i == input->count - 1) flush = Z_FINISH; 49 GPR_ASSERT(GRPC_SLICE_LENGTH(input->slices[i]) <= uint_max); 50 zs->avail_in = static_cast<uInt> GRPC_SLICE_LENGTH(input->slices[i]); 51 zs->next_in = GRPC_SLICE_START_PTR(input->slices[i]); 52 do { 53 if (zs->avail_out == 0) { 54 grpc_slice_buffer_add_indexed(output, outbuf); 55 outbuf = GRPC_SLICE_MALLOC(OUTPUT_BLOCK_SIZE); 56 GPR_ASSERT(GRPC_SLICE_LENGTH(outbuf) <= uint_max); 57 zs->avail_out = static_cast<uInt> GRPC_SLICE_LENGTH(outbuf); 58 zs->next_out = GRPC_SLICE_START_PTR(outbuf); 59 } 60 r = flate(zs, flush); 61 if (r < 0 && r != Z_BUF_ERROR /* not fatal */) { 62 gpr_log(GPR_INFO, "zlib error (%d)", r); 63 goto error; 64 } 65 } while (zs->avail_out == 0); 66 if (zs->avail_in) { 67 gpr_log(GPR_INFO, "zlib: not all input consumed"); 68 goto error; 69 } 70 } 71 72 GPR_ASSERT(outbuf.refcount); 73 outbuf.data.refcounted.length -= zs->avail_out; 74 grpc_slice_buffer_add_indexed(output, outbuf); 75 76 return 1; 77 78 error: 79 grpc_slice_unref_internal(outbuf); 80 return 0; 81 } 82 83 static void* zalloc_gpr(void* opaque, unsigned int items, unsigned int size) { 84 return gpr_malloc(items * size); 85 } 86 87 static void zfree_gpr(void* opaque, void* address) { gpr_free(address); } 88 89 static int zlib_compress(grpc_slice_buffer* input, grpc_slice_buffer* output, 90 int gzip) { 91 z_stream zs; 92 int r; 93 size_t i; 94 size_t count_before = output->count; 95 size_t length_before = output->length; 96 memset(&zs, 0, sizeof(zs)); 97 zs.zalloc = zalloc_gpr; 98 zs.zfree = zfree_gpr; 99 r = deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 15 | (gzip ? 16 : 0), 100 8, Z_DEFAULT_STRATEGY); 101 GPR_ASSERT(r == Z_OK); 102 r = zlib_body(&zs, input, output, deflate) && output->length < input->length; 103 if (!r) { 104 for (i = count_before; i < output->count; i++) { 105 grpc_slice_unref_internal(output->slices[i]); 106 } 107 output->count = count_before; 108 output->length = length_before; 109 } 110 deflateEnd(&zs); 111 return r; 112 } 113 114 static int zlib_decompress(grpc_slice_buffer* input, grpc_slice_buffer* output, 115 int gzip) { 116 z_stream zs; 117 int r; 118 size_t i; 119 size_t count_before = output->count; 120 size_t length_before = output->length; 121 memset(&zs, 0, sizeof(zs)); 122 zs.zalloc = zalloc_gpr; 123 zs.zfree = zfree_gpr; 124 r = inflateInit2(&zs, 15 | (gzip ? 16 : 0)); 125 GPR_ASSERT(r == Z_OK); 126 r = zlib_body(&zs, input, output, inflate); 127 if (!r) { 128 for (i = count_before; i < output->count; i++) { 129 grpc_slice_unref_internal(output->slices[i]); 130 } 131 output->count = count_before; 132 output->length = length_before; 133 } 134 inflateEnd(&zs); 135 return r; 136 } 137 138 static int copy(grpc_slice_buffer* input, grpc_slice_buffer* output) { 139 size_t i; 140 for (i = 0; i < input->count; i++) { 141 grpc_slice_buffer_add(output, grpc_slice_ref_internal(input->slices[i])); 142 } 143 return 1; 144 } 145 146 static int compress_inner(grpc_message_compression_algorithm algorithm, 147 grpc_slice_buffer* input, grpc_slice_buffer* output) { 148 switch (algorithm) { 149 case GRPC_MESSAGE_COMPRESS_NONE: 150 /* the fallback path always needs to be send uncompressed: we simply 151 rely on that here */ 152 return 0; 153 case GRPC_MESSAGE_COMPRESS_DEFLATE: 154 return zlib_compress(input, output, 0); 155 case GRPC_MESSAGE_COMPRESS_GZIP: 156 return zlib_compress(input, output, 1); 157 case GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT: 158 break; 159 } 160 gpr_log(GPR_ERROR, "invalid compression algorithm %d", algorithm); 161 return 0; 162 } 163 164 int grpc_msg_compress(grpc_message_compression_algorithm algorithm, 165 grpc_slice_buffer* input, grpc_slice_buffer* output) { 166 if (!compress_inner(algorithm, input, output)) { 167 copy(input, output); 168 return 0; 169 } 170 return 1; 171 } 172 173 int grpc_msg_decompress(grpc_message_compression_algorithm algorithm, 174 grpc_slice_buffer* input, grpc_slice_buffer* output) { 175 switch (algorithm) { 176 case GRPC_MESSAGE_COMPRESS_NONE: 177 return copy(input, output); 178 case GRPC_MESSAGE_COMPRESS_DEFLATE: 179 return zlib_decompress(input, output, 0); 180 case GRPC_MESSAGE_COMPRESS_GZIP: 181 return zlib_decompress(input, output, 1); 182 case GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT: 183 break; 184 } 185 gpr_log(GPR_ERROR, "invalid compression algorithm %d", algorithm); 186 return 0; 187 } 188