Home | History | Annotate | Download | only in squashfs-tools
      1 /*
      2  * Create a squashfs filesystem.  This is a highly compressed read only
      3  * filesystem.
      4  *
      5  * Copyright (c) 2014
      6  * Phillip Lougher <phillip (at) squashfs.org.uk>
      7  *
      8  * This program is free software; you can redistribute it and/or
      9  * modify it under the terms of the GNU General Public License
     10  * as published by the Free Software Foundation; either version 2,
     11  * or (at your option) any later version.
     12  *
     13  * This program is distributed in the hope that it will be useful,
     14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
     15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     16  * GNU General Public License for more details.
     17  *
     18  * You should have received a copy of the GNU General Public License
     19  * along with this program; if not, write to the Free Software
     20  * Foundation, 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
     21  *
     22  * process_fragments.c
     23  */
     24 
     25 #include <pthread.h>
     26 #include <sys/ioctl.h>
     27 #include <unistd.h>
     28 #include <signal.h>
     29 #include <sys/time.h>
     30 #include <string.h>
     31 #include <stdio.h>
     32 #include <math.h>
     33 #include <stdarg.h>
     34 #include <errno.h>
     35 #include <stdlib.h>
     36 #include <dirent.h>
     37 #include <sys/types.h>
     38 #include <sys/stat.h>
     39 #include <fcntl.h>
     40 
     41 #include "caches-queues-lists.h"
     42 #include "squashfs_fs.h"
     43 #include "mksquashfs.h"
     44 #include "error.h"
     45 #include "progressbar.h"
     46 #include "info.h"
     47 #include "compressor.h"
     48 #include "process_fragments.h"
     49 
     50 #define FALSE 0
     51 #define TRUE 1
     52 
     53 extern struct queue *to_process_frag;
     54 extern struct seq_queue *to_main;
     55 extern int sparse_files;
     56 
     57 /*
     58  * Compute 16 bit BSD checksum over the data, and check for sparseness
     59  */
     60 static int checksum_sparse(struct file_buffer *file_buffer)
     61 {
     62 	unsigned char *b = (unsigned char *) file_buffer->data;
     63 	unsigned short chksum = 0;
     64 	int bytes = file_buffer->size, sparse = TRUE, value;
     65 
     66 	while(bytes --) {
     67 		chksum = (chksum & 1) ? (chksum >> 1) | 0x8000 : chksum >> 1;
     68 		value = *b++;
     69 		if(value) {
     70 			sparse = FALSE;
     71 			chksum += value;
     72 		}
     73 	}
     74 
     75 	file_buffer->checksum = chksum;
     76 	return sparse;
     77 }
     78 
     79 
     80 static int read_filesystem(int fd, long long byte, int bytes, void *buff)
     81 {
     82 	off_t off = byte;
     83 
     84 	TRACE("read_filesystem: reading from position 0x%llx, bytes %d\n",
     85 		byte, bytes);
     86 
     87 	if(lseek(fd, off, SEEK_SET) == -1) {
     88 		ERROR("read_filesystem: Lseek on destination failed because %s, "
     89 			"offset=0x%llx\n", strerror(errno), off);
     90 		return 0;
     91 	} else if(read_bytes(fd, buff, bytes) < bytes) {
     92 		ERROR("Read on destination failed\n");
     93 		return 0;
     94 	}
     95 
     96 	return 1;
     97 }
     98 
     99 
    100 static struct file_buffer *get_fragment(struct fragment *fragment,
    101 	char *data_buffer, int fd)
    102 {
    103 	struct squashfs_fragment_entry *disk_fragment;
    104 	struct file_buffer *buffer, *compressed_buffer;
    105 	long long start_block;
    106 	int res, size, index = fragment->index;
    107 	char locked;
    108 
    109 	/*
    110 	 * Lookup fragment block in cache.
    111 	 * If the fragment block doesn't exist, then get the compressed version
    112 	 * from the writer cache or off disk, and decompress it.
    113 	 *
    114 	 * This routine has two things which complicate the code:
    115 	 *
    116 	 *	1. Multiple threads can simultaneously lookup/create the
    117 	 *	   same buffer.  This means a buffer needs to be "locked"
    118 	 *	   when it is being filled in, to prevent other threads from
    119 	 *	   using it when it is not ready.  This is because we now do
    120 	 *	   fragment duplicate checking in parallel.
    121 	 *	2. We have two caches which need to be checked for the
    122 	 *	   presence of fragment blocks: the normal fragment cache
    123 	 *	   and a "reserve" cache.  The reserve cache is used to
    124 	 *	   prevent an unnecessary pipeline stall when the fragment cache
    125 	 *	   is full of fragments waiting to be compressed.
    126 	 */
    127 	pthread_cleanup_push((void *) pthread_mutex_unlock, &dup_mutex);
    128 	pthread_mutex_lock(&dup_mutex);
    129 
    130 again:
    131 	buffer = cache_lookup_nowait(fragment_buffer, index, &locked);
    132 	if(buffer) {
    133 		pthread_mutex_unlock(&dup_mutex);
    134 		if(locked)
    135 			/* got a buffer being filled in.  Wait for it */
    136 			cache_wait_unlock(buffer);
    137 		goto finished;
    138 	}
    139 
    140 	/* not in fragment cache, is it in the reserve cache? */
    141 	buffer = cache_lookup_nowait(reserve_cache, index, &locked);
    142 	if(buffer) {
    143 		pthread_mutex_unlock(&dup_mutex);
    144 		if(locked)
    145 			/* got a buffer being filled in.  Wait for it */
    146 			cache_wait_unlock(buffer);
    147 		goto finished;
    148 	}
    149 
    150 	/* in neither cache, try to get it from the fragment cache */
    151 	buffer = cache_get_nowait(fragment_buffer, index);
    152 	if(!buffer) {
    153 		/*
    154 		 * no room, get it from the reserve cache, this is
    155 		 * dimensioned so it will always have space (no more than
    156 		 * processors + 1 can have an outstanding reserve buffer)
    157 		 */
    158 		buffer = cache_get_nowait(reserve_cache, index);
    159 		if(!buffer) {
    160 			/* failsafe */
    161 			ERROR("no space in reserve cache\n");
    162 			goto again;
    163 		}
    164 	}
    165 
    166 	pthread_mutex_unlock(&dup_mutex);
    167 
    168 	compressed_buffer = cache_lookup(fwriter_buffer, index);
    169 
    170 	pthread_cleanup_push((void *) pthread_mutex_unlock, &fragment_mutex);
    171 	pthread_mutex_lock(&fragment_mutex);
    172 	disk_fragment = &fragment_table[index];
    173 	size = SQUASHFS_COMPRESSED_SIZE_BLOCK(disk_fragment->size);
    174 	start_block = disk_fragment->start_block;
    175 	pthread_cleanup_pop(1);
    176 
    177 	if(SQUASHFS_COMPRESSED_BLOCK(disk_fragment->size)) {
    178 		int error;
    179 		char *data;
    180 
    181 		if(compressed_buffer)
    182 			data = compressed_buffer->data;
    183 		else {
    184 			res = read_filesystem(fd, start_block, size, data_buffer);
    185 			if(res == 0) {
    186 				ERROR("Failed to read fragment from output"
    187 					" filesystem\n");
    188 				BAD_ERROR("Output filesystem corrupted?\n");
    189 			}
    190 			data = data_buffer;
    191 		}
    192 
    193 		res = compressor_uncompress(comp, buffer->data, data, size,
    194 			block_size, &error);
    195 		if(res == -1)
    196 			BAD_ERROR("%s uncompress failed with error code %d\n",
    197 				comp->name, error);
    198 	} else if(compressed_buffer)
    199 		memcpy(buffer->data, compressed_buffer->data, size);
    200 	else {
    201 		res = read_filesystem(fd, start_block, size, buffer->data);
    202 		if(res == 0) {
    203 			ERROR("Failed to read fragment from output "
    204 				"filesystem\n");
    205 			BAD_ERROR("Output filesystem corrupted?\n");
    206 		}
    207 	}
    208 
    209 	cache_unlock(buffer);
    210 	cache_block_put(compressed_buffer);
    211 
    212 finished:
    213 	pthread_cleanup_pop(0);
    214 
    215 	return buffer;
    216 }
    217 
    218 
    219 struct file_buffer *get_fragment_cksum(struct file_info *file,
    220 	char *data_buffer, int fd, unsigned short *checksum)
    221 {
    222 	struct file_buffer *frag_buffer;
    223 	struct append_file *append;
    224 	int index = file->fragment->index;
    225 
    226 	frag_buffer = get_fragment(file->fragment, data_buffer, fd);
    227 
    228 	pthread_cleanup_push((void *) pthread_mutex_unlock, &dup_mutex);
    229 
    230 	for(append = file_mapping[index]; append; append = append->next) {
    231 		int offset = append->file->fragment->offset;
    232 		int size = append->file->fragment->size;
    233 		char *data = frag_buffer->data + offset;
    234 		unsigned short cksum = get_checksum_mem(data, size);
    235 
    236 		if(file == append->file)
    237 			*checksum = cksum;
    238 
    239 		pthread_mutex_lock(&dup_mutex);
    240 		append->file->fragment_checksum = cksum;
    241 		append->file->have_frag_checksum = TRUE;
    242 		pthread_mutex_unlock(&dup_mutex);
    243 	}
    244 
    245 	pthread_cleanup_pop(0);
    246 
    247 	return frag_buffer;
    248 }
    249 
    250 
    251 void *frag_thrd(void *destination_file)
    252 {
    253 	sigset_t sigmask, old_mask;
    254 	char *data_buffer;
    255 	int fd;
    256 
    257 	sigemptyset(&sigmask);
    258 	sigaddset(&sigmask, SIGINT);
    259 	sigaddset(&sigmask, SIGTERM);
    260 	sigaddset(&sigmask, SIGUSR1);
    261 	pthread_sigmask(SIG_BLOCK, &sigmask, &old_mask);
    262 
    263 	fd = open(destination_file, O_RDONLY);
    264 	if(fd == -1)
    265 		BAD_ERROR("frag_thrd: can't open destination for reading\n");
    266 
    267 	data_buffer = malloc(SQUASHFS_FILE_MAX_SIZE);
    268 	if(data_buffer == NULL)
    269 		MEM_ERROR();
    270 
    271 	pthread_cleanup_push((void *) pthread_mutex_unlock, &dup_mutex);
    272 
    273 	while(1) {
    274 		struct file_buffer *file_buffer = queue_get(to_process_frag);
    275 		struct file_buffer *buffer;
    276 		int sparse = checksum_sparse(file_buffer);
    277 		struct file_info *dupl_ptr;
    278 		long long file_size;
    279 		unsigned short checksum;
    280 		char flag;
    281 		int res;
    282 
    283 		if(sparse_files && sparse) {
    284 			file_buffer->c_byte = 0;
    285 			file_buffer->fragment = FALSE;
    286 		} else
    287 			file_buffer->c_byte = file_buffer->size;
    288 
    289 		/*
    290 		 * Specutively pull into the fragment cache any fragment blocks
    291 		 * which contain fragments which *this* fragment may be
    292 		 * be a duplicate.
    293 		 *
    294 		 * By ensuring the fragment block is in cache ahead of time
    295 		 * should eliminate the parallelisation stall when the
    296 		 * main thread needs to read the fragment block to do a
    297 		 * duplicate check on it.
    298 		 *
    299 		 * If this is a fragment belonging to a larger file
    300 		 * (with additional blocks) then ignore it.  Here we're
    301 		 * interested in the "low hanging fruit" of files which
    302 		 * consist of only a fragment
    303 		 */
    304 		if(file_buffer->file_size != file_buffer->size) {
    305 			seq_queue_put(to_main, file_buffer);
    306 			continue;
    307 		}
    308 
    309 		file_size = file_buffer->file_size;
    310 
    311 		pthread_mutex_lock(&dup_mutex);
    312 		dupl_ptr = dupl[DUP_HASH(file_size)];
    313 		pthread_mutex_unlock(&dup_mutex);
    314 
    315 		file_buffer->dupl_start = dupl_ptr;
    316 		file_buffer->duplicate = FALSE;
    317 
    318 		for(; dupl_ptr; dupl_ptr = dupl_ptr->next) {
    319 			if(file_size != dupl_ptr->file_size ||
    320 					file_size != dupl_ptr->fragment->size)
    321 				continue;
    322 
    323 			pthread_mutex_lock(&dup_mutex);
    324 			flag = dupl_ptr->have_frag_checksum;
    325 			checksum = dupl_ptr->fragment_checksum;
    326 			pthread_mutex_unlock(&dup_mutex);
    327 
    328 			/*
    329 			 * If we have the checksum and it matches then
    330 			 * read in the fragment block.
    331 			 *
    332 			 * If we *don't* have the checksum, then we are
    333 			 * appending, and the fragment block is on the
    334 			 * "old" filesystem.  Read it in and checksum
    335 			 * the entire fragment buffer
    336 			 */
    337 			if(!flag) {
    338 				buffer = get_fragment_cksum(dupl_ptr,
    339 					data_buffer, fd, &checksum);
    340 				if(checksum != file_buffer->checksum) {
    341 					cache_block_put(buffer);
    342 					continue;
    343 				}
    344 			} else if(checksum == file_buffer->checksum)
    345 				buffer = get_fragment(dupl_ptr->fragment,
    346 					data_buffer, fd);
    347 			else
    348 				continue;
    349 
    350 			res = memcmp(file_buffer->data, buffer->data +
    351 				dupl_ptr->fragment->offset, file_size);
    352 			cache_block_put(buffer);
    353 			if(res == 0) {
    354 				struct file_buffer *dup = malloc(sizeof(*dup));
    355 				if(dup == NULL)
    356 					MEM_ERROR();
    357 				memcpy(dup, file_buffer, sizeof(*dup));
    358 				cache_block_put(file_buffer);
    359 				dup->dupl_start = dupl_ptr;
    360 				dup->duplicate = TRUE;
    361 				file_buffer = dup;
    362 				break;
    363 			}
    364 		}
    365 
    366 		seq_queue_put(to_main, file_buffer);
    367 	}
    368 
    369 	pthread_cleanup_pop(0);
    370 }
    371