Home | History | Annotate | Download | only in ext2fs
      1 /*
      2 URL: svn://svnanon.samba.org/samba/branches/SAMBA_4_0/source/lib/tdb/common
      3 Rev: 23590
      4 Last Changed Date: 2007-06-22 13:36:10 -0400 (Fri, 22 Jun 2007)
      5 */
      6  /*
      7    trivial database library - standalone version
      8 
      9    Copyright (C) Andrew Tridgell              1999-2005
     10    Copyright (C) Jeremy Allison               2000-2006
     11    Copyright (C) Paul `Rusty' Russell         2000
     12 
     13      ** NOTE! The following LGPL license applies to the tdb
     14      ** library. This does NOT imply that all of Samba is released
     15      ** under the LGPL
     16 
     17    This library is free software; you can redistribute it and/or
     18    modify it under the terms of the GNU Lesser General Public
     19    License as published by the Free Software Foundation; either
     20    version 2 of the License, or (at your option) any later version.
     21 
     22    This library is distributed in the hope that it will be useful,
     23    but WITHOUT ANY WARRANTY; without even the implied warranty of
     24    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
     25    Lesser General Public License for more details.
     26 
     27    You should have received a copy of the GNU Lesser General Public
     28    License along with this library; if not, write to the Free Software
     29    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
     30 */
     31 
     32 #ifdef CONFIG_STAND_ALONE
     33 #define HAVE_MMAP
     34 #define HAVE_STRDUP
     35 #define HAVE_SYS_MMAN_H
     36 #define HAVE_UTIME_H
     37 #define HAVE_UTIME
     38 #endif
     39 #define _XOPEN_SOURCE 600
     40 
     41 #include <unistd.h>
     42 #include <stdio.h>
     43 #include <stdlib.h>
     44 #include <stdarg.h>
     45 #include <stddef.h>
     46 #include <errno.h>
     47 #include <string.h>
     48 #ifdef HAVE_SYS_SELECT_H
     49 #include <sys/select.h>
     50 #endif
     51 #include <sys/time.h>
     52 #include <sys/types.h>
     53 #include <time.h>
     54 #ifdef HAVE_UTIME_H
     55 #include <utime.h>
     56 #endif
     57 #include <sys/stat.h>
     58 #include <sys/file.h>
     59 #include <fcntl.h>
     60 
     61 #ifdef HAVE_SYS_MMAN_H
     62 #include <sys/mman.h>
     63 #endif
     64 
     65 #ifndef MAP_FILE
     66 #define MAP_FILE 0
     67 #endif
     68 
     69 #ifndef MAP_FAILED
     70 #define MAP_FAILED ((void *)-1)
     71 #endif
     72 
     73 #ifndef HAVE_STRDUP
     74 #define strdup rep_strdup
     75 static char *rep_strdup(const char *s)
     76 {
     77 	char *ret;
     78 	int length;
     79 	if (!s)
     80 		return NULL;
     81 
     82 	if (!length)
     83 		length = strlen(s);
     84 
     85 	ret = malloc(length + 1);
     86 	if (ret) {
     87 		strncpy(ret, s, length);
     88 		ret[length] = '\0';
     89 	}
     90 	return ret;
     91 }
     92 #endif
     93 
     94 #ifndef PRINTF_ATTRIBUTE
     95 #if (__GNUC__ >= 3) && (__GNUC_MINOR__ >= 1 )
     96 /** Use gcc attribute to check printf fns.  a1 is the 1-based index of
     97  * the parameter containing the format, and a2 the index of the first
     98  * argument. Note that some gcc 2.x versions don't handle this
     99  * properly **/
    100 #define PRINTF_ATTRIBUTE(a1, a2) __attribute__ ((format (__printf__, a1, a2)))
    101 #else
    102 #define PRINTF_ATTRIBUTE(a1, a2)
    103 #endif
    104 #endif
    105 
    106 typedef int bool;
    107 
    108 #include "tdb.h"
    109 
    110 static TDB_DATA tdb_null;
    111 
    112 #ifndef u32
    113 #define u32 unsigned
    114 #endif
    115 
    116 typedef u32 tdb_len_t;
    117 typedef u32 tdb_off_t;
    118 
    119 #ifndef offsetof
    120 #define offsetof(t,f) ((unsigned int)&((t *)0)->f)
    121 #endif
    122 
    123 #define TDB_MAGIC_FOOD "TDB file\n"
    124 #define TDB_VERSION (0x26011967 + 6)
    125 #define TDB_MAGIC (0x26011999U)
    126 #define TDB_FREE_MAGIC (~TDB_MAGIC)
    127 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
    128 #define TDB_RECOVERY_MAGIC (0xf53bc0e7U)
    129 #define TDB_ALIGNMENT 4
    130 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
    131 #define DEFAULT_HASH_SIZE 131
    132 #define FREELIST_TOP (sizeof(struct tdb_header))
    133 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
    134 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
    135 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
    136 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
    137 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off_t))
    138 #define TDB_HASHTABLE_SIZE(tdb) ((tdb->header.hash_size+1)*sizeof(tdb_off_t))
    139 #define TDB_DATA_START(hash_size) TDB_HASH_TOP(hash_size-1)
    140 #define TDB_RECOVERY_HEAD offsetof(struct tdb_header, recovery_start)
    141 #define TDB_SEQNUM_OFS    offsetof(struct tdb_header, sequence_number)
    142 #define TDB_PAD_BYTE 0x42
    143 #define TDB_PAD_U32  0x42424242
    144 
    145 /* NB assumes there is a local variable called "tdb" that is the
    146  * current context, also takes doubly-parenthesized print-style
    147  * argument. */
    148 #define TDB_LOG(x) tdb->log.log_fn x
    149 
    150 /* lock offsets */
    151 #define GLOBAL_LOCK      0
    152 #define ACTIVE_LOCK      4
    153 #define TRANSACTION_LOCK 8
    154 
    155 /* free memory if the pointer is valid and zero the pointer */
    156 #ifndef SAFE_FREE
    157 #define SAFE_FREE(x) do { if ((x) != NULL) {free(x); (x)=NULL;} } while(0)
    158 #endif
    159 
    160 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
    161 
    162 #define DOCONV() (tdb->flags & TDB_CONVERT)
    163 #define CONVERT(x) (DOCONV() ? tdb_convert(&x, sizeof(x)) : &x)
    164 
    165 
    166 /* the body of the database is made of one list_struct for the free space
    167    plus a separate data list for each hash value */
    168 struct list_struct {
    169 	tdb_off_t next; /* offset of the next record in the list */
    170 	tdb_len_t rec_len; /* total byte length of record */
    171 	tdb_len_t key_len; /* byte length of key */
    172 	tdb_len_t data_len; /* byte length of data */
    173 	u32 full_hash; /* the full 32 bit hash of the key */
    174 	u32 magic;   /* try to catch errors */
    175 	/* the following union is implied:
    176 		union {
    177 			char record[rec_len];
    178 			struct {
    179 				char key[key_len];
    180 				char data[data_len];
    181 			}
    182 			u32 totalsize; (tailer)
    183 		}
    184 	*/
    185 };
    186 
    187 
    188 /* this is stored at the front of every database */
    189 struct tdb_header {
    190 	char magic_food[32]; /* for /etc/magic */
    191 	u32 version; /* version of the code */
    192 	u32 hash_size; /* number of hash entries */
    193 	tdb_off_t rwlocks; /* obsolete - kept to detect old formats */
    194 	tdb_off_t recovery_start; /* offset of transaction recovery region */
    195 	tdb_off_t sequence_number; /* used when TDB_SEQNUM is set */
    196 	tdb_off_t reserved[29];
    197 };
    198 
    199 struct tdb_lock_type {
    200 	int list;
    201 	u32 count;
    202 	u32 ltype;
    203 };
    204 
    205 struct tdb_traverse_lock {
    206 	struct tdb_traverse_lock *next;
    207 	u32 off;
    208 	u32 hash;
    209 	int lock_rw;
    210 };
    211 
    212 
    213 struct tdb_methods {
    214 	int (*tdb_read)(struct tdb_context *, tdb_off_t , void *, tdb_len_t , int );
    215 	int (*tdb_write)(struct tdb_context *, tdb_off_t, const void *, tdb_len_t);
    216 	void (*next_hash_chain)(struct tdb_context *, u32 *);
    217 	int (*tdb_oob)(struct tdb_context *, tdb_off_t , int );
    218 	int (*tdb_expand_file)(struct tdb_context *, tdb_off_t , tdb_off_t );
    219 	int (*tdb_brlock)(struct tdb_context *, tdb_off_t , int, int, int, size_t);
    220 };
    221 
    222 struct tdb_context {
    223 	char *name; /* the name of the database */
    224 	void *map_ptr; /* where it is currently mapped */
    225 	int fd; /* open file descriptor for the database */
    226 	tdb_len_t map_size; /* how much space has been mapped */
    227 	int read_only; /* opened read-only */
    228 	int traverse_read; /* read-only traversal */
    229 	struct tdb_lock_type global_lock;
    230 	int num_lockrecs;
    231 	struct tdb_lock_type *lockrecs; /* only real locks, all with count>0 */
    232 	enum TDB_ERROR ecode; /* error code for last tdb error */
    233 	struct tdb_header header; /* a cached copy of the header */
    234 	u32 flags; /* the flags passed to tdb_open */
    235 	struct tdb_traverse_lock travlocks; /* current traversal locks */
    236 	struct tdb_context *next; /* all tdbs to avoid multiple opens */
    237 	dev_t device;	/* uniquely identifies this tdb */
    238 	ino_t inode;	/* uniquely identifies this tdb */
    239 	struct tdb_logging_context log;
    240 	unsigned int (*hash_fn)(TDB_DATA *key);
    241 	int open_flags; /* flags used in the open - needed by reopen */
    242 	unsigned int num_locks; /* number of chain locks held */
    243 	const struct tdb_methods *methods;
    244 	struct tdb_transaction *transaction;
    245 	int page_size;
    246 	int max_dead_records;
    247 	bool have_transaction_lock;
    248 };
    249 
    250 
    251 /*
    252   internal prototypes
    253 */
    254 static int tdb_munmap(struct tdb_context *tdb);
    255 static void tdb_mmap(struct tdb_context *tdb);
    256 static int tdb_lock(struct tdb_context *tdb, int list, int ltype);
    257 static int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
    258 static int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset, int rw_type, int lck_type, int probe, size_t len);
    259 static int tdb_transaction_lock(struct tdb_context *tdb, int ltype);
    260 static int tdb_transaction_unlock(struct tdb_context *tdb);
    261 static int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len);
    262 static int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off);
    263 static int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off);
    264 static int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
    265 static int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
    266 static void *tdb_convert(void *buf, u32 size);
    267 static int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
    268 static tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_struct *rec);
    269 static int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
    270 static int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
    271 static int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off);
    272 static int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off);
    273 static int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
    274 static int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
    275 static int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct *rec);
    276 static unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len);
    277 static int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
    278 		   tdb_off_t offset, tdb_len_t len,
    279 		   int (*parser)(TDB_DATA key, TDB_DATA data,
    280 				 void *private_data),
    281 		   void *private_data);
    282 static tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
    283 			   struct list_struct *rec);
    284 static void tdb_io_init(struct tdb_context *tdb);
    285 static int tdb_expand(struct tdb_context *tdb, tdb_off_t size);
    286 static int tdb_rec_free_read(struct tdb_context *tdb, tdb_off_t off,
    287 		      struct list_struct *rec);
    288 
    289 
    290 /* file: error.c */
    291 
    292 enum TDB_ERROR tdb_error(struct tdb_context *tdb)
    293 {
    294 	return tdb->ecode;
    295 }
    296 
    297 static struct tdb_errname {
    298 	enum TDB_ERROR ecode; const char *estring;
    299 } emap[] = { {TDB_SUCCESS, "Success"},
    300 	     {TDB_ERR_CORRUPT, "Corrupt database"},
    301 	     {TDB_ERR_IO, "IO Error"},
    302 	     {TDB_ERR_LOCK, "Locking error"},
    303 	     {TDB_ERR_OOM, "Out of memory"},
    304 	     {TDB_ERR_EXISTS, "Record exists"},
    305 	     {TDB_ERR_NOLOCK, "Lock exists on other keys"},
    306 	     {TDB_ERR_EINVAL, "Invalid parameter"},
    307 	     {TDB_ERR_NOEXIST, "Record does not exist"},
    308 	     {TDB_ERR_RDONLY, "write not permitted"} };
    309 
    310 /* Error string for the last tdb error */
    311 const char *tdb_errorstr(struct tdb_context *tdb)
    312 {
    313 	u32 i;
    314 	for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
    315 		if (tdb->ecode == emap[i].ecode)
    316 			return emap[i].estring;
    317 	return "Invalid error code";
    318 }
    319 
    320 /* file: lock.c */
    321 
    322 #define TDB_MARK_LOCK 0x80000000
    323 
    324 /* a byte range locking function - return 0 on success
    325    this functions locks/unlocks 1 byte at the specified offset.
    326 
    327    On error, errno is also set so that errors are passed back properly
    328    through tdb_open().
    329 
    330    note that a len of zero means lock to end of file
    331 */
    332 int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset,
    333 	       int rw_type, int lck_type, int probe, size_t len)
    334 {
    335 	struct flock fl;
    336 	int ret;
    337 
    338 	if (tdb->flags & TDB_NOLOCK) {
    339 		return 0;
    340 	}
    341 
    342 	if ((rw_type == F_WRLCK) && (tdb->read_only || tdb->traverse_read)) {
    343 		tdb->ecode = TDB_ERR_RDONLY;
    344 		return -1;
    345 	}
    346 
    347 	fl.l_type = rw_type;
    348 	fl.l_whence = SEEK_SET;
    349 	fl.l_start = offset;
    350 	fl.l_len = len;
    351 	fl.l_pid = 0;
    352 
    353 	do {
    354 		ret = fcntl(tdb->fd,lck_type,&fl);
    355 	} while (ret == -1 && errno == EINTR);
    356 
    357 	if (ret == -1) {
    358 		/* Generic lock error. errno set by fcntl.
    359 		 * EAGAIN is an expected return from non-blocking
    360 		 * locks. */
    361 		if (!probe && lck_type != F_SETLK) {
    362 			/* Ensure error code is set for log fun to examine. */
    363 			tdb->ecode = TDB_ERR_LOCK;
    364 			TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d len=%d\n",
    365 				 tdb->fd, offset, rw_type, lck_type, (int)len));
    366 		}
    367 		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
    368 	}
    369 	return 0;
    370 }
    371 
    372 
    373 /*
    374   upgrade a read lock to a write lock. This needs to be handled in a
    375   special way as some OSes (such as solaris) have too conservative
    376   deadlock detection and claim a deadlock when progress can be
    377   made. For those OSes we may loop for a while.
    378 */
    379 int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len)
    380 {
    381 	int count = 1000;
    382 	while (count--) {
    383 		struct timeval tv;
    384 		if (tdb_brlock(tdb, offset, F_WRLCK, F_SETLKW, 1, len) == 0) {
    385 			return 0;
    386 		}
    387 		if (errno != EDEADLK) {
    388 			break;
    389 		}
    390 		/* sleep for as short a time as we can - more portable than usleep() */
    391 		tv.tv_sec = 0;
    392 		tv.tv_usec = 1;
    393 		select(0, NULL, NULL, NULL, &tv);
    394 	}
    395 	TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock_upgrade failed at offset %d\n", offset));
    396 	return -1;
    397 }
    398 
    399 
    400 /* lock a list in the database. list -1 is the alloc list */
    401 static int _tdb_lock(struct tdb_context *tdb, int list, int ltype, int op)
    402 {
    403 	struct tdb_lock_type *new_lck;
    404 	int i;
    405 	bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
    406 
    407 	ltype &= ~TDB_MARK_LOCK;
    408 
    409 	/* a global lock allows us to avoid per chain locks */
    410 	if (tdb->global_lock.count &&
    411 	    (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
    412 		return 0;
    413 	}
    414 
    415 	if (tdb->global_lock.count) {
    416 		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
    417 	}
    418 
    419 	if (list < -1 || list >= (int)tdb->header.hash_size) {
    420 		TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_lock: invalid list %d for ltype=%d\n",
    421 			   list, ltype));
    422 		return -1;
    423 	}
    424 	if (tdb->flags & TDB_NOLOCK)
    425 		return 0;
    426 
    427 	for (i=0; i<tdb->num_lockrecs; i++) {
    428 		if (tdb->lockrecs[i].list == list) {
    429 			if (tdb->lockrecs[i].count == 0) {
    430 				/*
    431 				 * Can't happen, see tdb_unlock(). It should
    432 				 * be an assert.
    433 				 */
    434 				TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock: "
    435 					 "lck->count == 0 for list %d", list));
    436 			}
    437 			/*
    438 			 * Just increment the in-memory struct, posix locks
    439 			 * don't stack.
    440 			 */
    441 			tdb->lockrecs[i].count++;
    442 			return 0;
    443 		}
    444 	}
    445 
    446 	new_lck = (struct tdb_lock_type *)realloc(
    447 		tdb->lockrecs,
    448 		sizeof(*tdb->lockrecs) * (tdb->num_lockrecs+1));
    449 	if (new_lck == NULL) {
    450 		errno = ENOMEM;
    451 		return -1;
    452 	}
    453 	tdb->lockrecs = new_lck;
    454 
    455 	/* Since fcntl locks don't nest, we do a lock for the first one,
    456 	   and simply bump the count for future ones */
    457 	if (!mark_lock &&
    458 	    tdb->methods->tdb_brlock(tdb,FREELIST_TOP+4*list, ltype, op,
    459 				     0, 1)) {
    460 		return -1;
    461 	}
    462 
    463 	tdb->num_locks++;
    464 
    465 	tdb->lockrecs[tdb->num_lockrecs].list = list;
    466 	tdb->lockrecs[tdb->num_lockrecs].count = 1;
    467 	tdb->lockrecs[tdb->num_lockrecs].ltype = ltype;
    468 	tdb->num_lockrecs += 1;
    469 
    470 	return 0;
    471 }
    472 
    473 /* lock a list in the database. list -1 is the alloc list */
    474 int tdb_lock(struct tdb_context *tdb, int list, int ltype)
    475 {
    476 	int ret;
    477 	ret = _tdb_lock(tdb, list, ltype, F_SETLKW);
    478 	if (ret) {
    479 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock failed on list %d "
    480 			 "ltype=%d (%s)\n",  list, ltype, strerror(errno)));
    481 	}
    482 	return ret;
    483 }
    484 
    485 /* lock a list in the database. list -1 is the alloc list. non-blocking lock */
    486 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype)
    487 {
    488 	return _tdb_lock(tdb, list, ltype, F_SETLK);
    489 }
    490 
    491 
    492 /* unlock the database: returns void because it's too late for errors. */
    493 	/* changed to return int it may be interesting to know there
    494 	   has been an error  --simo */
    495 int tdb_unlock(struct tdb_context *tdb, int list, int ltype)
    496 {
    497 	int ret = -1;
    498 	int i;
    499 	struct tdb_lock_type *lck = NULL;
    500 	bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
    501 
    502 	ltype &= ~TDB_MARK_LOCK;
    503 
    504 	/* a global lock allows us to avoid per chain locks */
    505 	if (tdb->global_lock.count &&
    506 	    (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
    507 		return 0;
    508 	}
    509 
    510 	if (tdb->global_lock.count) {
    511 		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
    512 	}
    513 
    514 	if (tdb->flags & TDB_NOLOCK)
    515 		return 0;
    516 
    517 	/* Sanity checks */
    518 	if (list < -1 || list >= (int)tdb->header.hash_size) {
    519 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
    520 		return ret;
    521 	}
    522 
    523 	for (i=0; i<tdb->num_lockrecs; i++) {
    524 		if (tdb->lockrecs[i].list == list) {
    525 			lck = &tdb->lockrecs[i];
    526 			break;
    527 		}
    528 	}
    529 
    530 	if ((lck == NULL) || (lck->count == 0)) {
    531 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: count is 0\n"));
    532 		return -1;
    533 	}
    534 
    535 	if (lck->count > 1) {
    536 		lck->count--;
    537 		return 0;
    538 	}
    539 
    540 	/*
    541 	 * This lock has count==1 left, so we need to unlock it in the
    542 	 * kernel. We don't bother with decrementing the in-memory array
    543 	 * element, we're about to overwrite it with the last array element
    544 	 * anyway.
    545 	 */
    546 
    547 	if (mark_lock) {
    548 		ret = 0;
    549 	} else {
    550 		ret = tdb->methods->tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK,
    551 					       F_SETLKW, 0, 1);
    552 	}
    553 	tdb->num_locks--;
    554 
    555 	/*
    556 	 * Shrink the array by overwriting the element just unlocked with the
    557 	 * last array element.
    558 	 */
    559 
    560 	if (tdb->num_lockrecs > 1) {
    561 		*lck = tdb->lockrecs[tdb->num_lockrecs-1];
    562 	}
    563 	tdb->num_lockrecs -= 1;
    564 
    565 	/*
    566 	 * We don't bother with realloc when the array shrinks, but if we have
    567 	 * a completely idle tdb we should get rid of the locked array.
    568 	 */
    569 
    570 	if (tdb->num_lockrecs == 0) {
    571 		SAFE_FREE(tdb->lockrecs);
    572 	}
    573 
    574 	if (ret)
    575 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: An error occurred unlocking!\n"));
    576 	return ret;
    577 }
    578 
    579 /*
    580   get the transaction lock
    581  */
    582 int tdb_transaction_lock(struct tdb_context *tdb, int ltype)
    583 {
    584 	if (tdb->have_transaction_lock || tdb->global_lock.count) {
    585 		return 0;
    586 	}
    587 	if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, ltype,
    588 				     F_SETLKW, 0, 1) == -1) {
    589 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_lock: failed to get transaction lock\n"));
    590 		tdb->ecode = TDB_ERR_LOCK;
    591 		return -1;
    592 	}
    593 	tdb->have_transaction_lock = 1;
    594 	return 0;
    595 }
    596 
    597 /*
    598   release the transaction lock
    599  */
    600 int tdb_transaction_unlock(struct tdb_context *tdb)
    601 {
    602 	int ret;
    603 	if (!tdb->have_transaction_lock) {
    604 		return 0;
    605 	}
    606 	ret = tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
    607 	if (ret == 0) {
    608 		tdb->have_transaction_lock = 0;
    609 	}
    610 	return ret;
    611 }
    612 
    613 
    614 
    615 
    616 /* lock/unlock entire database */
    617 static int _tdb_lockall(struct tdb_context *tdb, int ltype, int op)
    618 {
    619 	bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
    620 
    621 	ltype &= ~TDB_MARK_LOCK;
    622 
    623 	/* There are no locks on read-only dbs */
    624 	if (tdb->read_only || tdb->traverse_read)
    625 		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
    626 
    627 	if (tdb->global_lock.count && tdb->global_lock.ltype == ltype) {
    628 		tdb->global_lock.count++;
    629 		return 0;
    630 	}
    631 
    632 	if (tdb->global_lock.count) {
    633 		/* a global lock of a different type exists */
    634 		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
    635 	}
    636 
    637 	if (tdb->num_locks != 0) {
    638 		/* can't combine global and chain locks */
    639 		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
    640 	}
    641 
    642 	if (!mark_lock &&
    643 	    tdb->methods->tdb_brlock(tdb, FREELIST_TOP, ltype, op,
    644 				     0, 4*tdb->header.hash_size)) {
    645 		if (op == F_SETLKW) {
    646 			TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lockall failed (%s)\n", strerror(errno)));
    647 		}
    648 		return -1;
    649 	}
    650 
    651 	tdb->global_lock.count = 1;
    652 	tdb->global_lock.ltype = ltype;
    653 
    654 	return 0;
    655 }
    656 
    657 
    658 
    659 /* unlock entire db */
    660 static int _tdb_unlockall(struct tdb_context *tdb, int ltype)
    661 {
    662 	bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
    663 
    664 	ltype &= ~TDB_MARK_LOCK;
    665 
    666 	/* There are no locks on read-only dbs */
    667 	if (tdb->read_only || tdb->traverse_read) {
    668 		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
    669 	}
    670 
    671 	if (tdb->global_lock.ltype != ltype || tdb->global_lock.count == 0) {
    672 		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
    673 	}
    674 
    675 	if (tdb->global_lock.count > 1) {
    676 		tdb->global_lock.count--;
    677 		return 0;
    678 	}
    679 
    680 	if (!mark_lock &&
    681 	    tdb->methods->tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW,
    682 				     0, 4*tdb->header.hash_size)) {
    683 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlockall failed (%s)\n", strerror(errno)));
    684 		return -1;
    685 	}
    686 
    687 	tdb->global_lock.count = 0;
    688 	tdb->global_lock.ltype = 0;
    689 
    690 	return 0;
    691 }
    692 
    693 /* lock entire database with write lock */
    694 int tdb_lockall(struct tdb_context *tdb)
    695 {
    696 	return _tdb_lockall(tdb, F_WRLCK, F_SETLKW);
    697 }
    698 
    699 /* lock entire database with write lock - mark only */
    700 int tdb_lockall_mark(struct tdb_context *tdb)
    701 {
    702 	return _tdb_lockall(tdb, F_WRLCK | TDB_MARK_LOCK, F_SETLKW);
    703 }
    704 
    705 /* unlock entire database with write lock - unmark only */
    706 int tdb_lockall_unmark(struct tdb_context *tdb)
    707 {
    708 	return _tdb_unlockall(tdb, F_WRLCK | TDB_MARK_LOCK);
    709 }
    710 
    711 /* lock entire database with write lock - nonblocking varient */
    712 int tdb_lockall_nonblock(struct tdb_context *tdb)
    713 {
    714 	return _tdb_lockall(tdb, F_WRLCK, F_SETLK);
    715 }
    716 
    717 /* unlock entire database with write lock */
    718 int tdb_unlockall(struct tdb_context *tdb)
    719 {
    720 	return _tdb_unlockall(tdb, F_WRLCK);
    721 }
    722 
    723 /* lock entire database with read lock */
    724 int tdb_lockall_read(struct tdb_context *tdb)
    725 {
    726 	return _tdb_lockall(tdb, F_RDLCK, F_SETLKW);
    727 }
    728 
    729 /* lock entire database with read lock - nonblock varient */
    730 int tdb_lockall_read_nonblock(struct tdb_context *tdb)
    731 {
    732 	return _tdb_lockall(tdb, F_RDLCK, F_SETLK);
    733 }
    734 
    735 /* unlock entire database with read lock */
    736 int tdb_unlockall_read(struct tdb_context *tdb)
    737 {
    738 	return _tdb_unlockall(tdb, F_RDLCK);
    739 }
    740 
    741 /* lock/unlock one hash chain. This is meant to be used to reduce
    742    contention - it cannot guarantee how many records will be locked */
    743 int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
    744 {
    745 	return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
    746 }
    747 
    748 /* lock/unlock one hash chain, non-blocking. This is meant to be used
    749    to reduce contention - it cannot guarantee how many records will be
    750    locked */
    751 int tdb_chainlock_nonblock(struct tdb_context *tdb, TDB_DATA key)
    752 {
    753 	return tdb_lock_nonblock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
    754 }
    755 
    756 /* mark a chain as locked without actually locking it. Warning! use with great caution! */
    757 int tdb_chainlock_mark(struct tdb_context *tdb, TDB_DATA key)
    758 {
    759 	return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
    760 }
    761 
    762 /* unmark a chain as locked without actually locking it. Warning! use with great caution! */
    763 int tdb_chainlock_unmark(struct tdb_context *tdb, TDB_DATA key)
    764 {
    765 	return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
    766 }
    767 
    768 int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
    769 {
    770 	return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
    771 }
    772 
    773 int tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
    774 {
    775 	return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
    776 }
    777 
    778 int tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
    779 {
    780 	return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
    781 }
    782 
    783 
    784 
    785 /* record lock stops delete underneath */
    786 int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off)
    787 {
    788 	return off ? tdb->methods->tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0, 1) : 0;
    789 }
    790 
    791 /*
    792   Write locks override our own fcntl readlocks, so check it here.
    793   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
    794   an error to fail to get the lock here.
    795 */
    796 int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off)
    797 {
    798 	struct tdb_traverse_lock *i;
    799 	for (i = &tdb->travlocks; i; i = i->next)
    800 		if (i->off == off)
    801 			return -1;
    802 	return tdb->methods->tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1, 1);
    803 }
    804 
    805 /*
    806   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
    807   an error to fail to get the lock here.
    808 */
    809 int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off)
    810 {
    811 	return tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0, 1);
    812 }
    813 
    814 /* fcntl locks don't stack: avoid unlocking someone else's */
    815 int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off)
    816 {
    817 	struct tdb_traverse_lock *i;
    818 	u32 count = 0;
    819 
    820 	if (off == 0)
    821 		return 0;
    822 	for (i = &tdb->travlocks; i; i = i->next)
    823 		if (i->off == off)
    824 			count++;
    825 	return (count == 1 ? tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0, 1) : 0);
    826 }
    827 
    828 /* file: io.c */
    829 
    830 /* check for an out of bounds access - if it is out of bounds then
    831    see if the database has been expanded by someone else and expand
    832    if necessary
    833    note that "len" is the minimum length needed for the db
    834 */
    835 static int tdb_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
    836 {
    837 	struct stat st;
    838 	if (len <= tdb->map_size)
    839 		return 0;
    840 	if (tdb->flags & TDB_INTERNAL) {
    841 		if (!probe) {
    842 			/* Ensure ecode is set for log fn. */
    843 			tdb->ecode = TDB_ERR_IO;
    844 			TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_oob len %d beyond internal malloc size %d\n",
    845 				 (int)len, (int)tdb->map_size));
    846 		}
    847 		return TDB_ERRCODE(TDB_ERR_IO, -1);
    848 	}
    849 
    850 	if (fstat(tdb->fd, &st) == -1) {
    851 		return TDB_ERRCODE(TDB_ERR_IO, -1);
    852 	}
    853 
    854 	if (st.st_size < (size_t)len) {
    855 		if (!probe) {
    856 			/* Ensure ecode is set for log fn. */
    857 			tdb->ecode = TDB_ERR_IO;
    858 			TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_oob len %d beyond eof at %d\n",
    859 				 (int)len, (int)st.st_size));
    860 		}
    861 		return TDB_ERRCODE(TDB_ERR_IO, -1);
    862 	}
    863 
    864 	/* Unmap, update size, remap */
    865 	if (tdb_munmap(tdb) == -1)
    866 		return TDB_ERRCODE(TDB_ERR_IO, -1);
    867 	tdb->map_size = st.st_size;
    868 	tdb_mmap(tdb);
    869 	return 0;
    870 }
    871 
    872 /* write a lump of data at a specified offset */
    873 static int tdb_write(struct tdb_context *tdb, tdb_off_t off,
    874 		     const void *buf, tdb_len_t len)
    875 {
    876 	if (len == 0) {
    877 		return 0;
    878 	}
    879 
    880 	if (tdb->read_only || tdb->traverse_read) {
    881 		tdb->ecode = TDB_ERR_RDONLY;
    882 		return -1;
    883 	}
    884 
    885 	if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0)
    886 		return -1;
    887 
    888 	if (tdb->map_ptr) {
    889 		memcpy(off + (char *)tdb->map_ptr, buf, len);
    890 	} else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
    891 		/* Ensure ecode is set for log fn. */
    892 		tdb->ecode = TDB_ERR_IO;
    893 		TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_write failed at %d len=%d (%s)\n",
    894 			   off, len, strerror(errno)));
    895 		return TDB_ERRCODE(TDB_ERR_IO, -1);
    896 	}
    897 	return 0;
    898 }
    899 
    900 /* Endian conversion: we only ever deal with 4 byte quantities */
    901 void *tdb_convert(void *buf, u32 size)
    902 {
    903 	u32 i, *p = (u32 *)buf;
    904 	for (i = 0; i < size / 4; i++)
    905 		p[i] = TDB_BYTEREV(p[i]);
    906 	return buf;
    907 }
    908 
    909 
    910 /* read a lump of data at a specified offset, maybe convert */
    911 static int tdb_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
    912 		    tdb_len_t len, int cv)
    913 {
    914 	if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0) {
    915 		return -1;
    916 	}
    917 
    918 	if (tdb->map_ptr) {
    919 		memcpy(buf, off + (char *)tdb->map_ptr, len);
    920 	} else {
    921 		ssize_t ret = pread(tdb->fd, buf, len, off);
    922 		if (ret != (ssize_t)len) {
    923 			/* Ensure ecode is set for log fn. */
    924 			tdb->ecode = TDB_ERR_IO;
    925 			TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_read failed at %d "
    926 				 "len=%d ret=%d (%s) map_size=%d\n",
    927 				 (int)off, (int)len, (int)ret, strerror(errno),
    928 				 (int)tdb->map_size));
    929 			return TDB_ERRCODE(TDB_ERR_IO, -1);
    930 		}
    931 	}
    932 	if (cv) {
    933 		tdb_convert(buf, len);
    934 	}
    935 	return 0;
    936 }
    937 
    938 
    939 
    940 /*
    941   do an unlocked scan of the hash table heads to find the next non-zero head. The value
    942   will then be confirmed with the lock held
    943 */
    944 static void tdb_next_hash_chain(struct tdb_context *tdb, u32 *chain)
    945 {
    946 	u32 h = *chain;
    947 	if (tdb->map_ptr) {
    948 		for (;h < tdb->header.hash_size;h++) {
    949 			if (0 != *(u32 *)(TDB_HASH_TOP(h) + (unsigned char *)tdb->map_ptr)) {
    950 				break;
    951 			}
    952 		}
    953 	} else {
    954 		u32 off=0;
    955 		for (;h < tdb->header.hash_size;h++) {
    956 			if (tdb_ofs_read(tdb, TDB_HASH_TOP(h), &off) != 0 || off != 0) {
    957 				break;
    958 			}
    959 		}
    960 	}
    961 	(*chain) = h;
    962 }
    963 
    964 
    965 int tdb_munmap(struct tdb_context *tdb)
    966 {
    967 	if (tdb->flags & TDB_INTERNAL)
    968 		return 0;
    969 
    970 #ifdef HAVE_MMAP
    971 	if (tdb->map_ptr) {
    972 		int ret = munmap(tdb->map_ptr, tdb->map_size);
    973 		if (ret != 0)
    974 			return ret;
    975 	}
    976 #endif
    977 	tdb->map_ptr = NULL;
    978 	return 0;
    979 }
    980 
    981 void tdb_mmap(struct tdb_context *tdb)
    982 {
    983 	if (tdb->flags & TDB_INTERNAL)
    984 		return;
    985 
    986 #ifdef HAVE_MMAP
    987 	if (!(tdb->flags & TDB_NOMMAP)) {
    988 		tdb->map_ptr = mmap(NULL, tdb->map_size,
    989 				    PROT_READ|(tdb->read_only? 0:PROT_WRITE),
    990 				    MAP_SHARED|MAP_FILE, tdb->fd, 0);
    991 
    992 		/*
    993 		 * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
    994 		 */
    995 
    996 		if (tdb->map_ptr == MAP_FAILED) {
    997 			tdb->map_ptr = NULL;
    998 			TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_mmap failed for size %d (%s)\n",
    999 				 tdb->map_size, strerror(errno)));
   1000 		}
   1001 	} else {
   1002 		tdb->map_ptr = NULL;
   1003 	}
   1004 #else
   1005 	tdb->map_ptr = NULL;
   1006 #endif
   1007 }
   1008 
   1009 /* expand a file.  we prefer to use ftruncate, as that is what posix
   1010   says to use for mmap expansion */
   1011 static int tdb_expand_file(struct tdb_context *tdb, tdb_off_t size, tdb_off_t addition)
   1012 {
   1013 	char buf[1024];
   1014 
   1015 	if (tdb->read_only || tdb->traverse_read) {
   1016 		tdb->ecode = TDB_ERR_RDONLY;
   1017 		return -1;
   1018 	}
   1019 
   1020 	if (ftruncate(tdb->fd, size+addition) == -1) {
   1021 		char b = 0;
   1022 		if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
   1023 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file to %d failed (%s)\n",
   1024 				 size+addition, strerror(errno)));
   1025 			return -1;
   1026 		}
   1027 	}
   1028 
   1029 	/* now fill the file with something. This ensures that the
   1030 	   file isn't sparse, which would be very bad if we ran out of
   1031 	   disk. This must be done with write, not via mmap */
   1032 	memset(buf, TDB_PAD_BYTE, sizeof(buf));
   1033 	while (addition) {
   1034 		int n = addition>sizeof(buf)?sizeof(buf):addition;
   1035 		int ret = pwrite(tdb->fd, buf, n, size);
   1036 		if (ret != n) {
   1037 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file write of %d failed (%s)\n",
   1038 				   n, strerror(errno)));
   1039 			return -1;
   1040 		}
   1041 		addition -= n;
   1042 		size += n;
   1043 	}
   1044 	return 0;
   1045 }
   1046 
   1047 
   1048 /* expand the database at least size bytes by expanding the underlying
   1049    file and doing the mmap again if necessary */
   1050 int tdb_expand(struct tdb_context *tdb, tdb_off_t size)
   1051 {
   1052 	struct list_struct rec;
   1053 	tdb_off_t offset;
   1054 
   1055 	if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
   1056 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "lock failed in tdb_expand\n"));
   1057 		return -1;
   1058 	}
   1059 
   1060 	/* must know about any previous expansions by another process */
   1061 	tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
   1062 
   1063 	/* always make room for at least 10 more records, and round
   1064            the database up to a multiple of the page size */
   1065 	size = TDB_ALIGN(tdb->map_size + size*10, tdb->page_size) - tdb->map_size;
   1066 
   1067 	if (!(tdb->flags & TDB_INTERNAL))
   1068 		tdb_munmap(tdb);
   1069 
   1070 	/*
   1071 	 * We must ensure the file is unmapped before doing this
   1072 	 * to ensure consistency with systems like OpenBSD where
   1073 	 * writes and mmaps are not consistent.
   1074 	 */
   1075 
   1076 	/* expand the file itself */
   1077 	if (!(tdb->flags & TDB_INTERNAL)) {
   1078 		if (tdb->methods->tdb_expand_file(tdb, tdb->map_size, size) != 0)
   1079 			goto fail;
   1080 	}
   1081 
   1082 	tdb->map_size += size;
   1083 
   1084 	if (tdb->flags & TDB_INTERNAL) {
   1085 		char *new_map_ptr = (char *)realloc(tdb->map_ptr,
   1086 						    tdb->map_size);
   1087 		if (!new_map_ptr) {
   1088 			tdb->map_size -= size;
   1089 			goto fail;
   1090 		}
   1091 		tdb->map_ptr = new_map_ptr;
   1092 	} else {
   1093 		/*
   1094 		 * We must ensure the file is remapped before adding the space
   1095 		 * to ensure consistency with systems like OpenBSD where
   1096 		 * writes and mmaps are not consistent.
   1097 		 */
   1098 
   1099 		/* We're ok if the mmap fails as we'll fallback to read/write */
   1100 		tdb_mmap(tdb);
   1101 	}
   1102 
   1103 	/* form a new freelist record */
   1104 	memset(&rec,'\0',sizeof(rec));
   1105 	rec.rec_len = size - sizeof(rec);
   1106 
   1107 	/* link it into the free list */
   1108 	offset = tdb->map_size - size;
   1109 	if (tdb_free(tdb, offset, &rec) == -1)
   1110 		goto fail;
   1111 
   1112 	tdb_unlock(tdb, -1, F_WRLCK);
   1113 	return 0;
   1114  fail:
   1115 	tdb_unlock(tdb, -1, F_WRLCK);
   1116 	return -1;
   1117 }
   1118 
   1119 /* read/write a tdb_off_t */
   1120 int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
   1121 {
   1122 	return tdb->methods->tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
   1123 }
   1124 
   1125 int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
   1126 {
   1127 	tdb_off_t off = *d;
   1128 	return tdb->methods->tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
   1129 }
   1130 
   1131 
   1132 /* read a lump of data, allocating the space for it */
   1133 unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len)
   1134 {
   1135 	unsigned char *buf;
   1136 
   1137 	/* some systems don't like zero length malloc */
   1138 	if (len == 0) {
   1139 		len = 1;
   1140 	}
   1141 
   1142 	if (!(buf = (unsigned char *)malloc(len))) {
   1143 		/* Ensure ecode is set for log fn. */
   1144 		tdb->ecode = TDB_ERR_OOM;
   1145 		TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_alloc_read malloc failed len=%d (%s)\n",
   1146 			   len, strerror(errno)));
   1147 		return TDB_ERRCODE(TDB_ERR_OOM, buf);
   1148 	}
   1149 	if (tdb->methods->tdb_read(tdb, offset, buf, len, 0) == -1) {
   1150 		SAFE_FREE(buf);
   1151 		return NULL;
   1152 	}
   1153 	return buf;
   1154 }
   1155 
   1156 /* Give a piece of tdb data to a parser */
   1157 
   1158 int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
   1159 		   tdb_off_t offset, tdb_len_t len,
   1160 		   int (*parser)(TDB_DATA key, TDB_DATA data,
   1161 				 void *private_data),
   1162 		   void *private_data)
   1163 {
   1164 	TDB_DATA data;
   1165 	int result;
   1166 
   1167 	data.dsize = len;
   1168 
   1169 	if ((tdb->transaction == NULL) && (tdb->map_ptr != NULL)) {
   1170 		/*
   1171 		 * Optimize by avoiding the malloc/memcpy/free, point the
   1172 		 * parser directly at the mmap area.
   1173 		 */
   1174 		if (tdb->methods->tdb_oob(tdb, offset+len, 0) != 0) {
   1175 			return -1;
   1176 		}
   1177 		data.dptr = offset + (unsigned char *)tdb->map_ptr;
   1178 		return parser(key, data, private_data);
   1179 	}
   1180 
   1181 	if (!(data.dptr = tdb_alloc_read(tdb, offset, len))) {
   1182 		return -1;
   1183 	}
   1184 
   1185 	result = parser(key, data, private_data);
   1186 	free(data.dptr);
   1187 	return result;
   1188 }
   1189 
   1190 /* read/write a record */
   1191 int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
   1192 {
   1193 	if (tdb->methods->tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
   1194 		return -1;
   1195 	if (TDB_BAD_MAGIC(rec)) {
   1196 		/* Ensure ecode is set for log fn. */
   1197 		tdb->ecode = TDB_ERR_CORRUPT;
   1198 		TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
   1199 		return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
   1200 	}
   1201 	return tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0);
   1202 }
   1203 
   1204 int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
   1205 {
   1206 	struct list_struct r = *rec;
   1207 	return tdb->methods->tdb_write(tdb, offset, CONVERT(r), sizeof(r));
   1208 }
   1209 
   1210 static const struct tdb_methods io_methods = {
   1211 	tdb_read,
   1212 	tdb_write,
   1213 	tdb_next_hash_chain,
   1214 	tdb_oob,
   1215 	tdb_expand_file,
   1216 	tdb_brlock
   1217 };
   1218 
   1219 /*
   1220   initialise the default methods table
   1221 */
   1222 void tdb_io_init(struct tdb_context *tdb)
   1223 {
   1224 	tdb->methods = &io_methods;
   1225 }
   1226 
   1227 /* file: transaction.c */
   1228 
   1229 /*
   1230   transaction design:
   1231 
   1232   - only allow a single transaction at a time per database. This makes
   1233     using the transaction API simpler, as otherwise the caller would
   1234     have to cope with temporary failures in transactions that conflict
   1235     with other current transactions
   1236 
   1237   - keep the transaction recovery information in the same file as the
   1238     database, using a special 'transaction recovery' record pointed at
   1239     by the header. This removes the need for extra journal files as
   1240     used by some other databases
   1241 
   1242   - dynamically allocated the transaction recover record, re-using it
   1243     for subsequent transactions. If a larger record is needed then
   1244     tdb_free() the old record to place it on the normal tdb freelist
   1245     before allocating the new record
   1246 
   1247   - during transactions, keep a linked list of writes all that have
   1248     been performed by intercepting all tdb_write() calls. The hooked
   1249     transaction versions of tdb_read() and tdb_write() check this
   1250     linked list and try to use the elements of the list in preference
   1251     to the real database.
   1252 
   1253   - don't allow any locks to be held when a transaction starts,
   1254     otherwise we can end up with deadlock (plus lack of lock nesting
   1255     in posix locks would mean the lock is lost)
   1256 
   1257   - if the caller gains a lock during the transaction but doesn't
   1258     release it then fail the commit
   1259 
   1260   - allow for nested calls to tdb_transaction_start(), re-using the
   1261     existing transaction record. If the inner transaction is cancelled
   1262     then a subsequent commit will fail
   1263 
   1264   - keep a mirrored copy of the tdb hash chain heads to allow for the
   1265     fast hash heads scan on traverse, updating the mirrored copy in
   1266     the transaction version of tdb_write
   1267 
   1268   - allow callers to mix transaction and non-transaction use of tdb,
   1269     although once a transaction is started then an exclusive lock is
   1270     gained until the transaction is committed or cancelled
   1271 
   1272   - the commit stategy involves first saving away all modified data
   1273     into a linearised buffer in the transaction recovery area, then
   1274     marking the transaction recovery area with a magic value to
   1275     indicate a valid recovery record. In total 4 fsync/msync calls are
   1276     needed per commit to prevent race conditions. It might be possible
   1277     to reduce this to 3 or even 2 with some more work.
   1278 
   1279   - check for a valid recovery record on open of the tdb, while the
   1280     global lock is held. Automatically recover from the transaction
   1281     recovery area if needed, then continue with the open as
   1282     usual. This allows for smooth crash recovery with no administrator
   1283     intervention.
   1284 
   1285   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
   1286     still available, but no transaction recovery area is used and no
   1287     fsync/msync calls are made.
   1288 
   1289 */
   1290 
   1291 struct tdb_transaction_el {
   1292 	struct tdb_transaction_el *next, *prev;
   1293 	tdb_off_t offset;
   1294 	tdb_len_t length;
   1295 	unsigned char *data;
   1296 };
   1297 
   1298 /*
   1299   hold the context of any current transaction
   1300 */
   1301 struct tdb_transaction {
   1302 	/* we keep a mirrored copy of the tdb hash heads here so
   1303 	   tdb_next_hash_chain() can operate efficiently */
   1304 	u32 *hash_heads;
   1305 
   1306 	/* the original io methods - used to do IOs to the real db */
   1307 	const struct tdb_methods *io_methods;
   1308 
   1309 	/* the list of transaction elements. We use a doubly linked
   1310 	   list with a last pointer to allow us to keep the list
   1311 	   ordered, with first element at the front of the list. It
   1312 	   needs to be doubly linked as the read/write traversals need
   1313 	   to be backwards, while the commit needs to be forwards */
   1314 	struct tdb_transaction_el *elements, *elements_last;
   1315 
   1316 	/* non-zero when an internal transaction error has
   1317 	   occurred. All write operations will then fail until the
   1318 	   transaction is ended */
   1319 	int transaction_error;
   1320 
   1321 	/* when inside a transaction we need to keep track of any
   1322 	   nested tdb_transaction_start() calls, as these are allowed,
   1323 	   but don't create a new transaction */
   1324 	int nesting;
   1325 
   1326 	/* old file size before transaction */
   1327 	tdb_len_t old_map_size;
   1328 };
   1329 
   1330 
   1331 /*
   1332   read while in a transaction. We need to check first if the data is in our list
   1333   of transaction elements, then if not do a real read
   1334 */
   1335 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
   1336 			    tdb_len_t len, int cv)
   1337 {
   1338 	struct tdb_transaction_el *el;
   1339 
   1340 	/* we need to walk the list backwards to get the most recent data */
   1341 	for (el=tdb->transaction->elements_last;el;el=el->prev) {
   1342 		tdb_len_t partial;
   1343 
   1344 		if (off+len <= el->offset) {
   1345 			continue;
   1346 		}
   1347 		if (off >= el->offset + el->length) {
   1348 			continue;
   1349 		}
   1350 
   1351 		/* an overlapping read - needs to be split into up to
   1352 		   2 reads and a memcpy */
   1353 		if (off < el->offset) {
   1354 			partial = el->offset - off;
   1355 			if (transaction_read(tdb, off, buf, partial, cv) != 0) {
   1356 				goto fail;
   1357 			}
   1358 			len -= partial;
   1359 			off += partial;
   1360 			buf = (void *)(partial + (char *)buf);
   1361 		}
   1362 		if (off + len <= el->offset + el->length) {
   1363 			partial = len;
   1364 		} else {
   1365 			partial = el->offset + el->length - off;
   1366 		}
   1367 		memcpy(buf, el->data + (off - el->offset), partial);
   1368 		if (cv) {
   1369 			tdb_convert(buf, len);
   1370 		}
   1371 		len -= partial;
   1372 		off += partial;
   1373 		buf = (void *)(partial + (char *)buf);
   1374 
   1375 		if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
   1376 			goto fail;
   1377 		}
   1378 
   1379 		return 0;
   1380 	}
   1381 
   1382 	/* its not in the transaction elements - do a real read */
   1383 	return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
   1384 
   1385 fail:
   1386 	TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
   1387 	tdb->ecode = TDB_ERR_IO;
   1388 	tdb->transaction->transaction_error = 1;
   1389 	return -1;
   1390 }
   1391 
   1392 
   1393 /*
   1394   write while in a transaction
   1395 */
   1396 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
   1397 			     const void *buf, tdb_len_t len)
   1398 {
   1399 	struct tdb_transaction_el *el, *best_el=NULL;
   1400 
   1401 	if (len == 0) {
   1402 		return 0;
   1403 	}
   1404 
   1405 	/* if the write is to a hash head, then update the transaction
   1406 	   hash heads */
   1407 	if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
   1408 	    off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
   1409 		u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
   1410 		memcpy(&tdb->transaction->hash_heads[chain], buf, len);
   1411 	}
   1412 
   1413 	/* first see if we can replace an existing entry */
   1414 	for (el=tdb->transaction->elements_last;el;el=el->prev) {
   1415 		tdb_len_t partial;
   1416 
   1417 		if (best_el == NULL && off == el->offset+el->length) {
   1418 			best_el = el;
   1419 		}
   1420 
   1421 		if (off+len <= el->offset) {
   1422 			continue;
   1423 		}
   1424 		if (off >= el->offset + el->length) {
   1425 			continue;
   1426 		}
   1427 
   1428 		/* an overlapping write - needs to be split into up to
   1429 		   2 writes and a memcpy */
   1430 		if (off < el->offset) {
   1431 			partial = el->offset - off;
   1432 			if (transaction_write(tdb, off, buf, partial) != 0) {
   1433 				goto fail;
   1434 			}
   1435 			len -= partial;
   1436 			off += partial;
   1437 			buf = (const void *)(partial + (const char *)buf);
   1438 		}
   1439 		if (off + len <= el->offset + el->length) {
   1440 			partial = len;
   1441 		} else {
   1442 			partial = el->offset + el->length - off;
   1443 		}
   1444 		memcpy(el->data + (off - el->offset), buf, partial);
   1445 		len -= partial;
   1446 		off += partial;
   1447 		buf = (const void *)(partial + (const char *)buf);
   1448 
   1449 		if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
   1450 			goto fail;
   1451 		}
   1452 
   1453 		return 0;
   1454 	}
   1455 
   1456 	/* see if we can append the new entry to an existing entry */
   1457 	if (best_el && best_el->offset + best_el->length == off &&
   1458 	    (off+len < tdb->transaction->old_map_size ||
   1459 	     off > tdb->transaction->old_map_size)) {
   1460 		unsigned char *data = best_el->data;
   1461 		el = best_el;
   1462 		el->data = (unsigned char *)realloc(el->data,
   1463 						    el->length + len);
   1464 		if (el->data == NULL) {
   1465 			tdb->ecode = TDB_ERR_OOM;
   1466 			tdb->transaction->transaction_error = 1;
   1467 			el->data = data;
   1468 			return -1;
   1469 		}
   1470 		if (buf) {
   1471 			memcpy(el->data + el->length, buf, len);
   1472 		} else {
   1473 			memset(el->data + el->length, TDB_PAD_BYTE, len);
   1474 		}
   1475 		el->length += len;
   1476 		return 0;
   1477 	}
   1478 
   1479 	/* add a new entry at the end of the list */
   1480 	el = (struct tdb_transaction_el *)malloc(sizeof(*el));
   1481 	if (el == NULL) {
   1482 		tdb->ecode = TDB_ERR_OOM;
   1483 		tdb->transaction->transaction_error = 1;
   1484 		return -1;
   1485 	}
   1486 	el->next = NULL;
   1487 	el->prev = tdb->transaction->elements_last;
   1488 	el->offset = off;
   1489 	el->length = len;
   1490 	el->data = (unsigned char *)malloc(len);
   1491 	if (el->data == NULL) {
   1492 		free(el);
   1493 		tdb->ecode = TDB_ERR_OOM;
   1494 		tdb->transaction->transaction_error = 1;
   1495 		return -1;
   1496 	}
   1497 	if (buf) {
   1498 		memcpy(el->data, buf, len);
   1499 	} else {
   1500 		memset(el->data, TDB_PAD_BYTE, len);
   1501 	}
   1502 	if (el->prev) {
   1503 		el->prev->next = el;
   1504 	} else {
   1505 		tdb->transaction->elements = el;
   1506 	}
   1507 	tdb->transaction->elements_last = el;
   1508 	return 0;
   1509 
   1510 fail:
   1511 	TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
   1512 	tdb->ecode = TDB_ERR_IO;
   1513 	tdb->transaction->transaction_error = 1;
   1514 	return -1;
   1515 }
   1516 
   1517 /*
   1518   accelerated hash chain head search, using the cached hash heads
   1519 */
   1520 static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
   1521 {
   1522 	u32 h = *chain;
   1523 	for (;h < tdb->header.hash_size;h++) {
   1524 		/* the +1 takes account of the freelist */
   1525 		if (0 != tdb->transaction->hash_heads[h+1]) {
   1526 			break;
   1527 		}
   1528 	}
   1529 	(*chain) = h;
   1530 }
   1531 
   1532 /*
   1533   out of bounds check during a transaction
   1534 */
   1535 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
   1536 {
   1537 	if (len <= tdb->map_size) {
   1538 		return 0;
   1539 	}
   1540 	return TDB_ERRCODE(TDB_ERR_IO, -1);
   1541 }
   1542 
   1543 /*
   1544   transaction version of tdb_expand().
   1545 */
   1546 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
   1547 				   tdb_off_t addition)
   1548 {
   1549 	/* add a write to the transaction elements, so subsequent
   1550 	   reads see the zero data */
   1551 	if (transaction_write(tdb, size, NULL, addition) != 0) {
   1552 		return -1;
   1553 	}
   1554 
   1555 	return 0;
   1556 }
   1557 
   1558 /*
   1559   brlock during a transaction - ignore them
   1560 */
   1561 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
   1562 			      int rw_type, int lck_type, int probe, size_t len)
   1563 {
   1564 	return 0;
   1565 }
   1566 
   1567 static const struct tdb_methods transaction_methods = {
   1568 	transaction_read,
   1569 	transaction_write,
   1570 	transaction_next_hash_chain,
   1571 	transaction_oob,
   1572 	transaction_expand_file,
   1573 	transaction_brlock
   1574 };
   1575 
   1576 
   1577 /*
   1578   start a tdb transaction. No token is returned, as only a single
   1579   transaction is allowed to be pending per tdb_context
   1580 */
   1581 int tdb_transaction_start(struct tdb_context *tdb)
   1582 {
   1583 	/* some sanity checks */
   1584 	if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
   1585 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
   1586 		tdb->ecode = TDB_ERR_EINVAL;
   1587 		return -1;
   1588 	}
   1589 
   1590 	/* cope with nested tdb_transaction_start() calls */
   1591 	if (tdb->transaction != NULL) {
   1592 		tdb->transaction->nesting++;
   1593 		TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
   1594 			 tdb->transaction->nesting));
   1595 		return 0;
   1596 	}
   1597 
   1598 	if (tdb->num_locks != 0 || tdb->global_lock.count) {
   1599 		/* the caller must not have any locks when starting a
   1600 		   transaction as otherwise we'll be screwed by lack
   1601 		   of nested locks in posix */
   1602 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
   1603 		tdb->ecode = TDB_ERR_LOCK;
   1604 		return -1;
   1605 	}
   1606 
   1607 	if (tdb->travlocks.next != NULL) {
   1608 		/* you cannot use transactions inside a traverse (although you can use
   1609 		   traverse inside a transaction) as otherwise you can end up with
   1610 		   deadlock */
   1611 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
   1612 		tdb->ecode = TDB_ERR_LOCK;
   1613 		return -1;
   1614 	}
   1615 
   1616 	tdb->transaction = (struct tdb_transaction *)
   1617 		calloc(sizeof(struct tdb_transaction), 1);
   1618 	if (tdb->transaction == NULL) {
   1619 		tdb->ecode = TDB_ERR_OOM;
   1620 		return -1;
   1621 	}
   1622 
   1623 	/* get the transaction write lock. This is a blocking lock. As
   1624 	   discussed with Volker, there are a number of ways we could
   1625 	   make this async, which we will probably do in the future */
   1626 	if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
   1627 		SAFE_FREE(tdb->transaction);
   1628 		return -1;
   1629 	}
   1630 
   1631 	/* get a read lock from the freelist to the end of file. This
   1632 	   is upgraded to a write lock during the commit */
   1633 	if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
   1634 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
   1635 		tdb->ecode = TDB_ERR_LOCK;
   1636 		goto fail;
   1637 	}
   1638 
   1639 	/* setup a copy of the hash table heads so the hash scan in
   1640 	   traverse can be fast */
   1641 	tdb->transaction->hash_heads = (u32 *)
   1642 		calloc(tdb->header.hash_size+1, sizeof(u32));
   1643 	if (tdb->transaction->hash_heads == NULL) {
   1644 		tdb->ecode = TDB_ERR_OOM;
   1645 		goto fail;
   1646 	}
   1647 	if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
   1648 				   TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
   1649 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
   1650 		tdb->ecode = TDB_ERR_IO;
   1651 		goto fail;
   1652 	}
   1653 
   1654 	/* make sure we know about any file expansions already done by
   1655 	   anyone else */
   1656 	tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
   1657 	tdb->transaction->old_map_size = tdb->map_size;
   1658 
   1659 	/* finally hook the io methods, replacing them with
   1660 	   transaction specific methods */
   1661 	tdb->transaction->io_methods = tdb->methods;
   1662 	tdb->methods = &transaction_methods;
   1663 
   1664 	/* by calling this transaction write here, we ensure that we don't grow the
   1665 	   transaction linked list due to hash table updates */
   1666 	if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
   1667 			      TDB_HASHTABLE_SIZE(tdb)) != 0) {
   1668 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
   1669 		tdb->ecode = TDB_ERR_IO;
   1670 		tdb->methods = tdb->transaction->io_methods;
   1671 		goto fail;
   1672 	}
   1673 
   1674 	return 0;
   1675 
   1676 fail:
   1677 	tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
   1678 	tdb_transaction_unlock(tdb);
   1679 	SAFE_FREE(tdb->transaction->hash_heads);
   1680 	SAFE_FREE(tdb->transaction);
   1681 	return -1;
   1682 }
   1683 
   1684 
   1685 /*
   1686   cancel the current transaction
   1687 */
   1688 int tdb_transaction_cancel(struct tdb_context *tdb)
   1689 {
   1690 	if (tdb->transaction == NULL) {
   1691 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
   1692 		return -1;
   1693 	}
   1694 
   1695 	if (tdb->transaction->nesting != 0) {
   1696 		tdb->transaction->transaction_error = 1;
   1697 		tdb->transaction->nesting--;
   1698 		return 0;
   1699 	}
   1700 
   1701 	tdb->map_size = tdb->transaction->old_map_size;
   1702 
   1703 	/* free all the transaction elements */
   1704 	while (tdb->transaction->elements) {
   1705 		struct tdb_transaction_el *el = tdb->transaction->elements;
   1706 		tdb->transaction->elements = el->next;
   1707 		free(el->data);
   1708 		free(el);
   1709 	}
   1710 
   1711 	/* remove any global lock created during the transaction */
   1712 	if (tdb->global_lock.count != 0) {
   1713 		tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
   1714 		tdb->global_lock.count = 0;
   1715 	}
   1716 
   1717 	/* remove any locks created during the transaction */
   1718 	if (tdb->num_locks != 0) {
   1719 		int i;
   1720 		for (i=0;i<tdb->num_lockrecs;i++) {
   1721 			tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
   1722 				   F_UNLCK,F_SETLKW, 0, 1);
   1723 		}
   1724 		tdb->num_locks = 0;
   1725 		tdb->num_lockrecs = 0;
   1726 		SAFE_FREE(tdb->lockrecs);
   1727 	}
   1728 
   1729 	/* restore the normal io methods */
   1730 	tdb->methods = tdb->transaction->io_methods;
   1731 
   1732 	tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
   1733 	tdb_transaction_unlock(tdb);
   1734 	SAFE_FREE(tdb->transaction->hash_heads);
   1735 	SAFE_FREE(tdb->transaction);
   1736 
   1737 	return 0;
   1738 }
   1739 
   1740 /*
   1741   sync to disk
   1742 */
   1743 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
   1744 {
   1745 	if (fsync(tdb->fd) != 0) {
   1746 		tdb->ecode = TDB_ERR_IO;
   1747 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
   1748 		return -1;
   1749 	}
   1750 #if defined(HAVE_MSYNC) && defined(MS_SYNC)
   1751 	if (tdb->map_ptr) {
   1752 		tdb_off_t moffset = offset & ~(tdb->page_size-1);
   1753 		if (msync(moffset + (char *)tdb->map_ptr,
   1754 			  length + (offset - moffset), MS_SYNC) != 0) {
   1755 			tdb->ecode = TDB_ERR_IO;
   1756 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
   1757 				 strerror(errno)));
   1758 			return -1;
   1759 		}
   1760 	}
   1761 #endif
   1762 	return 0;
   1763 }
   1764 
   1765 
   1766 /*
   1767   work out how much space the linearised recovery data will consume
   1768 */
   1769 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
   1770 {
   1771 	struct tdb_transaction_el *el;
   1772 	tdb_len_t recovery_size = 0;
   1773 
   1774 	recovery_size = sizeof(u32);
   1775 	for (el=tdb->transaction->elements;el;el=el->next) {
   1776 		if (el->offset >= tdb->transaction->old_map_size) {
   1777 			continue;
   1778 		}
   1779 		recovery_size += 2*sizeof(tdb_off_t) + el->length;
   1780 	}
   1781 
   1782 	return recovery_size;
   1783 }
   1784 
   1785 /*
   1786   allocate the recovery area, or use an existing recovery area if it is
   1787   large enough
   1788 */
   1789 static int tdb_recovery_allocate(struct tdb_context *tdb,
   1790 				 tdb_len_t *recovery_size,
   1791 				 tdb_off_t *recovery_offset,
   1792 				 tdb_len_t *recovery_max_size)
   1793 {
   1794 	struct list_struct rec;
   1795 	const struct tdb_methods *methods = tdb->transaction->io_methods;
   1796 	tdb_off_t recovery_head;
   1797 
   1798 	if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
   1799 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
   1800 		return -1;
   1801 	}
   1802 
   1803 	rec.rec_len = 0;
   1804 
   1805 	if (recovery_head != 0 &&
   1806 	    methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
   1807 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
   1808 		return -1;
   1809 	}
   1810 
   1811 	*recovery_size = tdb_recovery_size(tdb);
   1812 
   1813 	if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
   1814 		/* it fits in the existing area */
   1815 		*recovery_max_size = rec.rec_len;
   1816 		*recovery_offset = recovery_head;
   1817 		return 0;
   1818 	}
   1819 
   1820 	/* we need to free up the old recovery area, then allocate a
   1821 	   new one at the end of the file. Note that we cannot use
   1822 	   tdb_allocate() to allocate the new one as that might return
   1823 	   us an area that is being currently used (as of the start of
   1824 	   the transaction) */
   1825 	if (recovery_head != 0) {
   1826 		if (tdb_free(tdb, recovery_head, &rec) == -1) {
   1827 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
   1828 			return -1;
   1829 		}
   1830 	}
   1831 
   1832 	/* the tdb_free() call might have increased the recovery size */
   1833 	*recovery_size = tdb_recovery_size(tdb);
   1834 
   1835 	/* round up to a multiple of page size */
   1836 	*recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
   1837 	*recovery_offset = tdb->map_size;
   1838 	recovery_head = *recovery_offset;
   1839 
   1840 	if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
   1841 				     (tdb->map_size - tdb->transaction->old_map_size) +
   1842 				     sizeof(rec) + *recovery_max_size) == -1) {
   1843 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
   1844 		return -1;
   1845 	}
   1846 
   1847 	/* remap the file (if using mmap) */
   1848 	methods->tdb_oob(tdb, tdb->map_size + 1, 1);
   1849 
   1850 	/* we have to reset the old map size so that we don't try to expand the file
   1851 	   again in the transaction commit, which would destroy the recovery area */
   1852 	tdb->transaction->old_map_size = tdb->map_size;
   1853 
   1854 	/* write the recovery header offset and sync - we can sync without a race here
   1855 	   as the magic ptr in the recovery record has not been set */
   1856 	CONVERT(recovery_head);
   1857 	if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
   1858 			       &recovery_head, sizeof(tdb_off_t)) == -1) {
   1859 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
   1860 		return -1;
   1861 	}
   1862 
   1863 	return 0;
   1864 }
   1865 
   1866 
   1867 /*
   1868   setup the recovery data that will be used on a crash during commit
   1869 */
   1870 static int transaction_setup_recovery(struct tdb_context *tdb,
   1871 				      tdb_off_t *magic_offset)
   1872 {
   1873 	struct tdb_transaction_el *el;
   1874 	tdb_len_t recovery_size;
   1875 	unsigned char *data, *p;
   1876 	const struct tdb_methods *methods = tdb->transaction->io_methods;
   1877 	struct list_struct *rec;
   1878 	tdb_off_t recovery_offset, recovery_max_size;
   1879 	tdb_off_t old_map_size = tdb->transaction->old_map_size;
   1880 	u32 magic, tailer;
   1881 
   1882 	/*
   1883 	  check that the recovery area has enough space
   1884 	*/
   1885 	if (tdb_recovery_allocate(tdb, &recovery_size,
   1886 				  &recovery_offset, &recovery_max_size) == -1) {
   1887 		return -1;
   1888 	}
   1889 
   1890 	data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
   1891 	if (data == NULL) {
   1892 		tdb->ecode = TDB_ERR_OOM;
   1893 		return -1;
   1894 	}
   1895 
   1896 	rec = (struct list_struct *)data;
   1897 	memset(rec, 0, sizeof(*rec));
   1898 
   1899 	rec->magic    = 0;
   1900 	rec->data_len = recovery_size;
   1901 	rec->rec_len  = recovery_max_size;
   1902 	rec->key_len  = old_map_size;
   1903 	CONVERT(rec);
   1904 
   1905 	/* build the recovery data into a single blob to allow us to do a single
   1906 	   large write, which should be more efficient */
   1907 	p = data + sizeof(*rec);
   1908 	for (el=tdb->transaction->elements;el;el=el->next) {
   1909 		if (el->offset >= old_map_size) {
   1910 			continue;
   1911 		}
   1912 		if (el->offset + el->length > tdb->transaction->old_map_size) {
   1913 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
   1914 			free(data);
   1915 			tdb->ecode = TDB_ERR_CORRUPT;
   1916 			return -1;
   1917 		}
   1918 		memcpy(p, &el->offset, 4);
   1919 		memcpy(p+4, &el->length, 4);
   1920 		if (DOCONV()) {
   1921 			tdb_convert(p, 8);
   1922 		}
   1923 		/* the recovery area contains the old data, not the
   1924 		   new data, so we have to call the original tdb_read
   1925 		   method to get it */
   1926 		if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
   1927 			free(data);
   1928 			tdb->ecode = TDB_ERR_IO;
   1929 			return -1;
   1930 		}
   1931 		p += 8 + el->length;
   1932 	}
   1933 
   1934 	/* and the tailer */
   1935 	tailer = sizeof(*rec) + recovery_max_size;
   1936 	memcpy(p, &tailer, 4);
   1937 	CONVERT(p);
   1938 
   1939 	/* write the recovery data to the recovery area */
   1940 	if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
   1941 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
   1942 		free(data);
   1943 		tdb->ecode = TDB_ERR_IO;
   1944 		return -1;
   1945 	}
   1946 
   1947 	/* as we don't have ordered writes, we have to sync the recovery
   1948 	   data before we update the magic to indicate that the recovery
   1949 	   data is present */
   1950 	if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
   1951 		free(data);
   1952 		return -1;
   1953 	}
   1954 
   1955 	free(data);
   1956 
   1957 	magic = TDB_RECOVERY_MAGIC;
   1958 	CONVERT(magic);
   1959 
   1960 	*magic_offset = recovery_offset + offsetof(struct list_struct, magic);
   1961 
   1962 	if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
   1963 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
   1964 		tdb->ecode = TDB_ERR_IO;
   1965 		return -1;
   1966 	}
   1967 
   1968 	/* ensure the recovery magic marker is on disk */
   1969 	if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
   1970 		return -1;
   1971 	}
   1972 
   1973 	return 0;
   1974 }
   1975 
   1976 /*
   1977   commit the current transaction
   1978 */
   1979 int tdb_transaction_commit(struct tdb_context *tdb)
   1980 {
   1981 	const struct tdb_methods *methods;
   1982 	tdb_off_t magic_offset = 0;
   1983 	u32 zero = 0;
   1984 
   1985 	if (tdb->transaction == NULL) {
   1986 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
   1987 		return -1;
   1988 	}
   1989 
   1990 	if (tdb->transaction->transaction_error) {
   1991 		tdb->ecode = TDB_ERR_IO;
   1992 		tdb_transaction_cancel(tdb);
   1993 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
   1994 		return -1;
   1995 	}
   1996 
   1997 	if (tdb->transaction->nesting != 0) {
   1998 		tdb->transaction->nesting--;
   1999 		return 0;
   2000 	}
   2001 
   2002 	/* check for a null transaction */
   2003 	if (tdb->transaction->elements == NULL) {
   2004 		tdb_transaction_cancel(tdb);
   2005 		return 0;
   2006 	}
   2007 
   2008 	methods = tdb->transaction->io_methods;
   2009 
   2010 	/* if there are any locks pending then the caller has not
   2011 	   nested their locks properly, so fail the transaction */
   2012 	if (tdb->num_locks || tdb->global_lock.count) {
   2013 		tdb->ecode = TDB_ERR_LOCK;
   2014 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
   2015 		tdb_transaction_cancel(tdb);
   2016 		return -1;
   2017 	}
   2018 
   2019 	/* upgrade the main transaction lock region to a write lock */
   2020 	if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
   2021 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
   2022 		tdb->ecode = TDB_ERR_LOCK;
   2023 		tdb_transaction_cancel(tdb);
   2024 		return -1;
   2025 	}
   2026 
   2027 	/* get the global lock - this prevents new users attaching to the database
   2028 	   during the commit */
   2029 	if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
   2030 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
   2031 		tdb->ecode = TDB_ERR_LOCK;
   2032 		tdb_transaction_cancel(tdb);
   2033 		return -1;
   2034 	}
   2035 
   2036 	if (!(tdb->flags & TDB_NOSYNC)) {
   2037 		/* write the recovery data to the end of the file */
   2038 		if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
   2039 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
   2040 			tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
   2041 			tdb_transaction_cancel(tdb);
   2042 			return -1;
   2043 		}
   2044 	}
   2045 
   2046 	/* expand the file to the new size if needed */
   2047 	if (tdb->map_size != tdb->transaction->old_map_size) {
   2048 		if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
   2049 					     tdb->map_size -
   2050 					     tdb->transaction->old_map_size) == -1) {
   2051 			tdb->ecode = TDB_ERR_IO;
   2052 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
   2053 			tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
   2054 			tdb_transaction_cancel(tdb);
   2055 			return -1;
   2056 		}
   2057 		tdb->map_size = tdb->transaction->old_map_size;
   2058 		methods->tdb_oob(tdb, tdb->map_size + 1, 1);
   2059 	}
   2060 
   2061 	/* perform all the writes */
   2062 	while (tdb->transaction->elements) {
   2063 		struct tdb_transaction_el *el = tdb->transaction->elements;
   2064 
   2065 		if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
   2066 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
   2067 
   2068 			/* we've overwritten part of the data and
   2069 			   possibly expanded the file, so we need to
   2070 			   run the crash recovery code */
   2071 			tdb->methods = methods;
   2072 			tdb_transaction_recover(tdb);
   2073 
   2074 			tdb_transaction_cancel(tdb);
   2075 			tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
   2076 
   2077 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
   2078 			return -1;
   2079 		}
   2080 		tdb->transaction->elements = el->next;
   2081 		free(el->data);
   2082 		free(el);
   2083 	}
   2084 
   2085 	if (!(tdb->flags & TDB_NOSYNC)) {
   2086 		/* ensure the new data is on disk */
   2087 		if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
   2088 			return -1;
   2089 		}
   2090 
   2091 		/* remove the recovery marker */
   2092 		if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
   2093 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
   2094 			return -1;
   2095 		}
   2096 
   2097 		/* ensure the recovery marker has been removed on disk */
   2098 		if (transaction_sync(tdb, magic_offset, 4) == -1) {
   2099 			return -1;
   2100 		}
   2101 	}
   2102 
   2103 	tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
   2104 
   2105 	/*
   2106 	  TODO: maybe write to some dummy hdr field, or write to magic
   2107 	  offset without mmap, before the last sync, instead of the
   2108 	  utime() call
   2109 	*/
   2110 
   2111 	/* on some systems (like Linux 2.6.x) changes via mmap/msync
   2112 	   don't change the mtime of the file, this means the file may
   2113 	   not be backed up (as tdb rounding to block sizes means that
   2114 	   file size changes are quite rare too). The following forces
   2115 	   mtime changes when a transaction completes */
   2116 #ifdef HAVE_UTIME
   2117 	utime(tdb->name, NULL);
   2118 #endif
   2119 
   2120 	/* use a transaction cancel to free memory and remove the
   2121 	   transaction locks */
   2122 	tdb_transaction_cancel(tdb);
   2123 	return 0;
   2124 }
   2125 
   2126 
   2127 /*
   2128   recover from an aborted transaction. Must be called with exclusive
   2129   database write access already established (including the global
   2130   lock to prevent new processes attaching)
   2131 */
   2132 int tdb_transaction_recover(struct tdb_context *tdb)
   2133 {
   2134 	tdb_off_t recovery_head, recovery_eof;
   2135 	unsigned char *data, *p;
   2136 	u32 zero = 0;
   2137 	struct list_struct rec;
   2138 
   2139 	/* find the recovery area */
   2140 	if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
   2141 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
   2142 		tdb->ecode = TDB_ERR_IO;
   2143 		return -1;
   2144 	}
   2145 
   2146 	if (recovery_head == 0) {
   2147 		/* we have never allocated a recovery record */
   2148 		return 0;
   2149 	}
   2150 
   2151 	/* read the recovery record */
   2152 	if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
   2153 				   sizeof(rec), DOCONV()) == -1) {
   2154 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
   2155 		tdb->ecode = TDB_ERR_IO;
   2156 		return -1;
   2157 	}
   2158 
   2159 	if (rec.magic != TDB_RECOVERY_MAGIC) {
   2160 		/* there is no valid recovery data */
   2161 		return 0;
   2162 	}
   2163 
   2164 	if (tdb->read_only) {
   2165 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
   2166 		tdb->ecode = TDB_ERR_CORRUPT;
   2167 		return -1;
   2168 	}
   2169 
   2170 	recovery_eof = rec.key_len;
   2171 
   2172 	data = (unsigned char *)malloc(rec.data_len);
   2173 	if (data == NULL) {
   2174 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
   2175 		tdb->ecode = TDB_ERR_OOM;
   2176 		return -1;
   2177 	}
   2178 
   2179 	/* read the full recovery data */
   2180 	if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
   2181 				   rec.data_len, 0) == -1) {
   2182 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
   2183 		tdb->ecode = TDB_ERR_IO;
   2184 		return -1;
   2185 	}
   2186 
   2187 	/* recover the file data */
   2188 	p = data;
   2189 	while (p+8 < data + rec.data_len) {
   2190 		u32 ofs, len;
   2191 		if (DOCONV()) {
   2192 			tdb_convert(p, 8);
   2193 		}
   2194 		memcpy(&ofs, p, 4);
   2195 		memcpy(&len, p+4, 4);
   2196 
   2197 		if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
   2198 			free(data);
   2199 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
   2200 			tdb->ecode = TDB_ERR_IO;
   2201 			return -1;
   2202 		}
   2203 		p += 8 + len;
   2204 	}
   2205 
   2206 	free(data);
   2207 
   2208 	if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
   2209 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
   2210 		tdb->ecode = TDB_ERR_IO;
   2211 		return -1;
   2212 	}
   2213 
   2214 	/* if the recovery area is after the recovered eof then remove it */
   2215 	if (recovery_eof <= recovery_head) {
   2216 		if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
   2217 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
   2218 			tdb->ecode = TDB_ERR_IO;
   2219 			return -1;
   2220 		}
   2221 	}
   2222 
   2223 	/* remove the recovery magic */
   2224 	if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
   2225 			  &zero) == -1) {
   2226 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
   2227 		tdb->ecode = TDB_ERR_IO;
   2228 		return -1;
   2229 	}
   2230 
   2231 	/* reduce the file size to the old size */
   2232 	tdb_munmap(tdb);
   2233 	if (ftruncate(tdb->fd, recovery_eof) != 0) {
   2234 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
   2235 		tdb->ecode = TDB_ERR_IO;
   2236 		return -1;
   2237 	}
   2238 	tdb->map_size = recovery_eof;
   2239 	tdb_mmap(tdb);
   2240 
   2241 	if (transaction_sync(tdb, 0, recovery_eof) == -1) {
   2242 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
   2243 		tdb->ecode = TDB_ERR_IO;
   2244 		return -1;
   2245 	}
   2246 
   2247 	TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",
   2248 		 recovery_eof));
   2249 
   2250 	/* all done */
   2251 	return 0;
   2252 }
   2253 
   2254 /* file: freelist.c */
   2255 
   2256 /* read a freelist record and check for simple errors */
   2257 static int tdb_rec_free_read(struct tdb_context *tdb, tdb_off_t off, struct list_struct *rec)
   2258 {
   2259 	if (tdb->methods->tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
   2260 		return -1;
   2261 
   2262 	if (rec->magic == TDB_MAGIC) {
   2263 		/* this happens when a app is showdown while deleting a record - we should
   2264 		   not completely fail when this happens */
   2265 		TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_rec_free_read non-free magic 0x%x at offset=%d - fixing\n",
   2266 			 rec->magic, off));
   2267 		rec->magic = TDB_FREE_MAGIC;
   2268 		if (tdb->methods->tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
   2269 			return -1;
   2270 	}
   2271 
   2272 	if (rec->magic != TDB_FREE_MAGIC) {
   2273 		/* Ensure ecode is set for log fn. */
   2274 		tdb->ecode = TDB_ERR_CORRUPT;
   2275 		TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_rec_free_read bad magic 0x%x at offset=%d\n",
   2276 			   rec->magic, off));
   2277 		return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
   2278 	}
   2279 	if (tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
   2280 		return -1;
   2281 	return 0;
   2282 }
   2283 
   2284 
   2285 
   2286 /* Remove an element from the freelist.  Must have alloc lock. */
   2287 static int remove_from_freelist(struct tdb_context *tdb, tdb_off_t off, tdb_off_t next)
   2288 {
   2289 	tdb_off_t last_ptr, i;
   2290 
   2291 	/* read in the freelist top */
   2292 	last_ptr = FREELIST_TOP;
   2293 	while (tdb_ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
   2294 		if (i == off) {
   2295 			/* We've found it! */
   2296 			return tdb_ofs_write(tdb, last_ptr, &next);
   2297 		}
   2298 		/* Follow chain (next offset is at start of record) */
   2299 		last_ptr = i;
   2300 	}
   2301 	TDB_LOG((tdb, TDB_DEBUG_FATAL,"remove_from_freelist: not on list at off=%d\n", off));
   2302 	return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
   2303 }
   2304 
   2305 
   2306 /* update a record tailer (must hold allocation lock) */
   2307 static int update_tailer(struct tdb_context *tdb, tdb_off_t offset,
   2308 			 const struct list_struct *rec)
   2309 {
   2310 	tdb_off_t totalsize;
   2311 
   2312 	/* Offset of tailer from record header */
   2313 	totalsize = sizeof(*rec) + rec->rec_len;
   2314 	return tdb_ofs_write(tdb, offset + totalsize - sizeof(tdb_off_t),
   2315 			 &totalsize);
   2316 }
   2317 
   2318 /* Add an element into the freelist. Merge adjacent records if
   2319    neccessary. */
   2320 int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
   2321 {
   2322 	tdb_off_t right, left;
   2323 
   2324 	/* Allocation and tailer lock */
   2325 	if (tdb_lock(tdb, -1, F_WRLCK) != 0)
   2326 		return -1;
   2327 
   2328 	/* set an initial tailer, so if we fail we don't leave a bogus record */
   2329 	if (update_tailer(tdb, offset, rec) != 0) {
   2330 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: update_tailer failed!\n"));
   2331 		goto fail;
   2332 	}
   2333 
   2334 	/* Look right first (I'm an Australian, dammit) */
   2335 	right = offset + sizeof(*rec) + rec->rec_len;
   2336 	if (right + sizeof(*rec) <= tdb->map_size) {
   2337 		struct list_struct r;
   2338 
   2339 		if (tdb->methods->tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
   2340 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: right read failed at %u\n", right));
   2341 			goto left;
   2342 		}
   2343 
   2344 		/* If it's free, expand to include it. */
   2345 		if (r.magic == TDB_FREE_MAGIC) {
   2346 			if (remove_from_freelist(tdb, right, r.next) == -1) {
   2347 				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: right free failed at %u\n", right));
   2348 				goto left;
   2349 			}
   2350 			rec->rec_len += sizeof(r) + r.rec_len;
   2351 		}
   2352 	}
   2353 
   2354 left:
   2355 	/* Look left */
   2356 	left = offset - sizeof(tdb_off_t);
   2357 	if (left > TDB_DATA_START(tdb->header.hash_size)) {
   2358 		struct list_struct l;
   2359 		tdb_off_t leftsize;
   2360 
   2361 		/* Read in tailer and jump back to header */
   2362 		if (tdb_ofs_read(tdb, left, &leftsize) == -1) {
   2363 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left offset read failed at %u\n", left));
   2364 			goto update;
   2365 		}
   2366 
   2367 		/* it could be uninitialised data */
   2368 		if (leftsize == 0 || leftsize == TDB_PAD_U32) {
   2369 			goto update;
   2370 		}
   2371 
   2372 		left = offset - leftsize;
   2373 
   2374 		/* Now read in record */
   2375 		if (tdb->methods->tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
   2376 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
   2377 			goto update;
   2378 		}
   2379 
   2380 		/* If it's free, expand to include it. */
   2381 		if (l.magic == TDB_FREE_MAGIC) {
   2382 			if (remove_from_freelist(tdb, left, l.next) == -1) {
   2383 				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left free failed at %u\n", left));
   2384 				goto update;
   2385 			} else {
   2386 				offset = left;
   2387 				rec->rec_len += leftsize;
   2388 			}
   2389 		}
   2390 	}
   2391 
   2392 update:
   2393 	if (update_tailer(tdb, offset, rec) == -1) {
   2394 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: update_tailer failed at %u\n", offset));
   2395 		goto fail;
   2396 	}
   2397 
   2398 	/* Now, prepend to free list */
   2399 	rec->magic = TDB_FREE_MAGIC;
   2400 
   2401 	if (tdb_ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
   2402 	    tdb_rec_write(tdb, offset, rec) == -1 ||
   2403 	    tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
   2404 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free record write failed at offset=%d\n", offset));
   2405 		goto fail;
   2406 	}
   2407 
   2408 	/* And we're done. */
   2409 	tdb_unlock(tdb, -1, F_WRLCK);
   2410 	return 0;
   2411 
   2412  fail:
   2413 	tdb_unlock(tdb, -1, F_WRLCK);
   2414 	return -1;
   2415 }
   2416 
   2417 
   2418 /*
   2419    the core of tdb_allocate - called when we have decided which
   2420    free list entry to use
   2421  */
   2422 static tdb_off_t tdb_allocate_ofs(struct tdb_context *tdb, tdb_len_t length, tdb_off_t rec_ptr,
   2423 				struct list_struct *rec, tdb_off_t last_ptr)
   2424 {
   2425 	struct list_struct newrec;
   2426 	tdb_off_t newrec_ptr;
   2427 
   2428 	memset(&newrec, '\0', sizeof(newrec));
   2429 
   2430 	/* found it - now possibly split it up  */
   2431 	if (rec->rec_len > length + MIN_REC_SIZE) {
   2432 		/* Length of left piece */
   2433 		length = TDB_ALIGN(length, TDB_ALIGNMENT);
   2434 
   2435 		/* Right piece to go on free list */
   2436 		newrec.rec_len = rec->rec_len - (sizeof(*rec) + length);
   2437 		newrec_ptr = rec_ptr + sizeof(*rec) + length;
   2438 
   2439 		/* And left record is shortened */
   2440 		rec->rec_len = length;
   2441 	} else {
   2442 		newrec_ptr = 0;
   2443 	}
   2444 
   2445 	/* Remove allocated record from the free list */
   2446 	if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1) {
   2447 		return 0;
   2448 	}
   2449 
   2450 	/* Update header: do this before we drop alloc
   2451 	   lock, otherwise tdb_free() might try to
   2452 	   merge with us, thinking we're free.
   2453 	   (Thanks Jeremy Allison). */
   2454 	rec->magic = TDB_MAGIC;
   2455 	if (tdb_rec_write(tdb, rec_ptr, rec) == -1) {
   2456 		return 0;
   2457 	}
   2458 
   2459 	/* Did we create new block? */
   2460 	if (newrec_ptr) {
   2461 		/* Update allocated record tailer (we
   2462 		   shortened it). */
   2463 		if (update_tailer(tdb, rec_ptr, rec) == -1) {
   2464 			return 0;
   2465 		}
   2466 
   2467 		/* Free new record */
   2468 		if (tdb_free(tdb, newrec_ptr, &newrec) == -1) {
   2469 			return 0;
   2470 		}
   2471 	}
   2472 
   2473 	/* all done - return the new record offset */
   2474 	return rec_ptr;
   2475 }
   2476 
   2477 /* allocate some space from the free list. The offset returned points
   2478    to a unconnected list_struct within the database with room for at
   2479    least length bytes of total data
   2480 
   2481    0 is returned if the space could not be allocated
   2482  */
   2483 tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_struct *rec)
   2484 {
   2485 	tdb_off_t rec_ptr, last_ptr, newrec_ptr;
   2486 	struct {
   2487 		tdb_off_t rec_ptr, last_ptr;
   2488 		tdb_len_t rec_len;
   2489 	} bestfit;
   2490 
   2491 	if (tdb_lock(tdb, -1, F_WRLCK) == -1)
   2492 		return 0;
   2493 
   2494 	/* Extra bytes required for tailer */
   2495 	length += sizeof(tdb_off_t);
   2496 
   2497  again:
   2498 	last_ptr = FREELIST_TOP;
   2499 
   2500 	/* read in the freelist top */
   2501 	if (tdb_ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
   2502 		goto fail;
   2503 
   2504 	bestfit.rec_ptr = 0;
   2505 	bestfit.last_ptr = 0;
   2506 	bestfit.rec_len = 0;
   2507 
   2508 	/*
   2509 	   this is a best fit allocation strategy. Originally we used
   2510 	   a first fit strategy, but it suffered from massive fragmentation
   2511 	   issues when faced with a slowly increasing record size.
   2512 	 */
   2513 	while (rec_ptr) {
   2514 		if (tdb_rec_free_read(tdb, rec_ptr, rec) == -1) {
   2515 			goto fail;
   2516 		}
   2517 
   2518 		if (rec->rec_len >= length) {
   2519 			if (bestfit.rec_ptr == 0 ||
   2520 			    rec->rec_len < bestfit.rec_len) {
   2521 				bestfit.rec_len = rec->rec_len;
   2522 				bestfit.rec_ptr = rec_ptr;
   2523 				bestfit.last_ptr = last_ptr;
   2524 				/* consider a fit to be good enough if
   2525 				   we aren't wasting more than half
   2526 				   the space */
   2527 				if (bestfit.rec_len < 2*length) {
   2528 					break;
   2529 				}
   2530 			}
   2531 		}
   2532 
   2533 		/* move to the next record */
   2534 		last_ptr = rec_ptr;
   2535 		rec_ptr = rec->next;
   2536 	}
   2537 
   2538 	if (bestfit.rec_ptr != 0) {
   2539 		if (tdb_rec_free_read(tdb, bestfit.rec_ptr, rec) == -1) {
   2540 			goto fail;
   2541 		}
   2542 
   2543 		newrec_ptr = tdb_allocate_ofs(tdb, length, bestfit.rec_ptr, rec, bestfit.last_ptr);
   2544 		tdb_unlock(tdb, -1, F_WRLCK);
   2545 		return newrec_ptr;
   2546 	}
   2547 
   2548 	/* we didn't find enough space. See if we can expand the
   2549 	   database and if we can then try again */
   2550 	if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
   2551 		goto again;
   2552  fail:
   2553 	tdb_unlock(tdb, -1, F_WRLCK);
   2554 	return 0;
   2555 }
   2556 
   2557 /* file: freelistcheck.c */
   2558 
   2559 /* Check the freelist is good and contains no loops.
   2560    Very memory intensive - only do this as a consistency
   2561    checker. Heh heh - uses an in memory tdb as the storage
   2562    for the "seen" record list. For some reason this strikes
   2563    me as extremely clever as I don't have to write another tree
   2564    data structure implementation :-).
   2565  */
   2566 
   2567 static int seen_insert(struct tdb_context *mem_tdb, tdb_off_t rec_ptr)
   2568 {
   2569 	TDB_DATA key, data;
   2570 
   2571 	memset(&data, '\0', sizeof(data));
   2572 	key.dptr = (unsigned char *)&rec_ptr;
   2573 	key.dsize = sizeof(rec_ptr);
   2574 	return tdb_store(mem_tdb, key, data, TDB_INSERT);
   2575 }
   2576 
   2577 int tdb_validate_freelist(struct tdb_context *tdb, int *pnum_entries)
   2578 {
   2579 	struct tdb_context *mem_tdb = NULL;
   2580 	struct list_struct rec;
   2581 	tdb_off_t rec_ptr, last_ptr;
   2582 	int ret = -1;
   2583 
   2584 	*pnum_entries = 0;
   2585 
   2586 	mem_tdb = tdb_open("flval", tdb->header.hash_size,
   2587 				TDB_INTERNAL, O_RDWR, 0600);
   2588 	if (!mem_tdb) {
   2589 		return -1;
   2590 	}
   2591 
   2592 	if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
   2593 		tdb_close(mem_tdb);
   2594 		return 0;
   2595 	}
   2596 
   2597 	last_ptr = FREELIST_TOP;
   2598 
   2599 	/* Store the FREELIST_TOP record. */
   2600 	if (seen_insert(mem_tdb, last_ptr) == -1) {
   2601 		ret = TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
   2602 		goto fail;
   2603 	}
   2604 
   2605 	/* read in the freelist top */
   2606 	if (tdb_ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1) {
   2607 		goto fail;
   2608 	}
   2609 
   2610 	while (rec_ptr) {
   2611 
   2612 		/* If we can't store this record (we've seen it
   2613 		   before) then the free list has a loop and must
   2614 		   be corrupt. */
   2615 
   2616 		if (seen_insert(mem_tdb, rec_ptr)) {
   2617 			ret = TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
   2618 			goto fail;
   2619 		}
   2620 
   2621 		if (tdb_rec_free_read(tdb, rec_ptr, &rec) == -1) {
   2622 			goto fail;
   2623 		}
   2624 
   2625 		/* move to the next record */
   2626 		last_ptr = rec_ptr;
   2627 		rec_ptr = rec.next;
   2628 		*pnum_entries += 1;
   2629 	}
   2630 
   2631 	ret = 0;
   2632 
   2633   fail:
   2634 
   2635 	tdb_close(mem_tdb);
   2636 	tdb_unlock(tdb, -1, F_WRLCK);
   2637 	return ret;
   2638 }
   2639 
   2640 /* file: traverse.c */
   2641 
   2642 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
   2643 static int tdb_next_lock(struct tdb_context *tdb, struct tdb_traverse_lock *tlock,
   2644 			 struct list_struct *rec)
   2645 {
   2646 	int want_next = (tlock->off != 0);
   2647 
   2648 	/* Lock each chain from the start one. */
   2649 	for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
   2650 		if (!tlock->off && tlock->hash != 0) {
   2651 			/* this is an optimisation for the common case where
   2652 			   the hash chain is empty, which is particularly
   2653 			   common for the use of tdb with ldb, where large
   2654 			   hashes are used. In that case we spend most of our
   2655 			   time in tdb_brlock(), locking empty hash chains.
   2656 
   2657 			   To avoid this, we do an unlocked pre-check to see
   2658 			   if the hash chain is empty before starting to look
   2659 			   inside it. If it is empty then we can avoid that
   2660 			   hash chain. If it isn't empty then we can't believe
   2661 			   the value we get back, as we read it without a
   2662 			   lock, so instead we get the lock and re-fetch the
   2663 			   value below.
   2664 
   2665 			   Notice that not doing this optimisation on the
   2666 			   first hash chain is critical. We must guarantee
   2667 			   that we have done at least one fcntl lock at the
   2668 			   start of a search to guarantee that memory is
   2669 			   coherent on SMP systems. If records are added by
   2670 			   others during the search then thats OK, and we
   2671 			   could possibly miss those with this trick, but we
   2672 			   could miss them anyway without this trick, so the
   2673 			   semantics don't change.
   2674 
   2675 			   With a non-indexed ldb search this trick gains us a
   2676 			   factor of around 80 in speed on a linux 2.6.x
   2677 			   system (testing using ldbtest).
   2678 			*/
   2679 			tdb->methods->next_hash_chain(tdb, &tlock->hash);
   2680 			if (tlock->hash == tdb->header.hash_size) {
   2681 				continue;
   2682 			}
   2683 		}
   2684 
   2685 		if (tdb_lock(tdb, tlock->hash, tlock->lock_rw) == -1)
   2686 			return -1;
   2687 
   2688 		/* No previous record?  Start at top of chain. */
   2689 		if (!tlock->off) {
   2690 			if (tdb_ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
   2691 				     &tlock->off) == -1)
   2692 				goto fail;
   2693 		} else {
   2694 			/* Otherwise unlock the previous record. */
   2695 			if (tdb_unlock_record(tdb, tlock->off) != 0)
   2696 				goto fail;
   2697 		}
   2698 
   2699 		if (want_next) {
   2700 			/* We have offset of old record: grab next */
   2701 			if (tdb_rec_read(tdb, tlock->off, rec) == -1)
   2702 				goto fail;
   2703 			tlock->off = rec->next;
   2704 		}
   2705 
   2706 		/* Iterate through chain */
   2707 		while( tlock->off) {
   2708 			tdb_off_t current;
   2709 			if (tdb_rec_read(tdb, tlock->off, rec) == -1)
   2710 				goto fail;
   2711 
   2712 			/* Detect infinite loops. From "Shlomi Yaakobovich" <Shlomi (at) exanet.com>. */
   2713 			if (tlock->off == rec->next) {
   2714 				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: loop detected.\n"));
   2715 				goto fail;
   2716 			}
   2717 
   2718 			if (!TDB_DEAD(rec)) {
   2719 				/* Woohoo: we found one! */
   2720 				if (tdb_lock_record(tdb, tlock->off) != 0)
   2721 					goto fail;
   2722 				return tlock->off;
   2723 			}
   2724 
   2725 			/* Try to clean dead ones from old traverses */
   2726 			current = tlock->off;
   2727 			tlock->off = rec->next;
   2728 			if (!(tdb->read_only || tdb->traverse_read) &&
   2729 			    tdb_do_delete(tdb, current, rec) != 0)
   2730 				goto fail;
   2731 		}
   2732 		tdb_unlock(tdb, tlock->hash, tlock->lock_rw);
   2733 		want_next = 0;
   2734 	}
   2735 	/* We finished iteration without finding anything */
   2736 	return TDB_ERRCODE(TDB_SUCCESS, 0);
   2737 
   2738  fail:
   2739 	tlock->off = 0;
   2740 	if (tdb_unlock(tdb, tlock->hash, tlock->lock_rw) != 0)
   2741 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: On error unlock failed!\n"));
   2742 	return -1;
   2743 }
   2744 
   2745 /* traverse the entire database - calling fn(tdb, key, data) on each element.
   2746    return -1 on error or the record count traversed
   2747    if fn is NULL then it is not called
   2748    a non-zero return value from fn() indicates that the traversal should stop
   2749   */
   2750 static int tdb_traverse_internal(struct tdb_context *tdb,
   2751 				 tdb_traverse_func fn, void *private_data,
   2752 				 struct tdb_traverse_lock *tl)
   2753 {
   2754 	TDB_DATA key, dbuf;
   2755 	struct list_struct rec;
   2756 	int ret, count = 0;
   2757 
   2758 	/* This was in the initializaton, above, but the IRIX compiler
   2759 	 * did not like it.  crh
   2760 	 */
   2761 	tl->next = tdb->travlocks.next;
   2762 
   2763 	/* fcntl locks don't stack: beware traverse inside traverse */
   2764 	tdb->travlocks.next = tl;
   2765 
   2766 	/* tdb_next_lock places locks on the record returned, and its chain */
   2767 	while ((ret = tdb_next_lock(tdb, tl, &rec)) > 0) {
   2768 		count++;
   2769 		/* now read the full record */
   2770 		key.dptr = tdb_alloc_read(tdb, tl->off + sizeof(rec),
   2771 					  rec.key_len + rec.data_len);
   2772 		if (!key.dptr) {
   2773 			ret = -1;
   2774 			if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0)
   2775 				goto out;
   2776 			if (tdb_unlock_record(tdb, tl->off) != 0)
   2777 				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
   2778 			goto out;
   2779 		}
   2780 		key.dsize = rec.key_len;
   2781 		dbuf.dptr = key.dptr + rec.key_len;
   2782 		dbuf.dsize = rec.data_len;
   2783 
   2784 		/* Drop chain lock, call out */
   2785 		if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0) {
   2786 			ret = -1;
   2787 			SAFE_FREE(key.dptr);
   2788 			goto out;
   2789 		}
   2790 		if (fn && fn(tdb, key, dbuf, private_data)) {
   2791 			/* They want us to terminate traversal */
   2792 			ret = count;
   2793 			if (tdb_unlock_record(tdb, tl->off) != 0) {
   2794 				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: unlock_record failed!\n"));;
   2795 				ret = -1;
   2796 			}
   2797 			SAFE_FREE(key.dptr);
   2798 			goto out;
   2799 		}
   2800 		SAFE_FREE(key.dptr);
   2801 	}
   2802 out:
   2803 	tdb->travlocks.next = tl->next;
   2804 	if (ret < 0)
   2805 		return -1;
   2806 	else
   2807 		return count;
   2808 }
   2809 
   2810 
   2811 /*
   2812   a write style traverse - temporarily marks the db read only
   2813 */
   2814 int tdb_traverse_read(struct tdb_context *tdb,
   2815 		      tdb_traverse_func fn, void *private_data)
   2816 {
   2817 	struct tdb_traverse_lock tl = { NULL, 0, 0, F_RDLCK };
   2818 	int ret;
   2819 
   2820 	/* we need to get a read lock on the transaction lock here to
   2821 	   cope with the lock ordering semantics of solaris10 */
   2822 	if (tdb_transaction_lock(tdb, F_RDLCK)) {
   2823 		return -1;
   2824 	}
   2825 
   2826 	tdb->traverse_read++;
   2827 	ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
   2828 	tdb->traverse_read--;
   2829 
   2830 	tdb_transaction_unlock(tdb);
   2831 
   2832 	return ret;
   2833 }
   2834 
   2835 /*
   2836   a write style traverse - needs to get the transaction lock to
   2837   prevent deadlocks
   2838 */
   2839 int tdb_traverse(struct tdb_context *tdb,
   2840 		 tdb_traverse_func fn, void *private_data)
   2841 {
   2842 	struct tdb_traverse_lock tl = { NULL, 0, 0, F_WRLCK };
   2843 	int ret;
   2844 
   2845 	if (tdb->read_only || tdb->traverse_read) {
   2846 		return tdb_traverse_read(tdb, fn, private_data);
   2847 	}
   2848 
   2849 	if (tdb_transaction_lock(tdb, F_WRLCK)) {
   2850 		return -1;
   2851 	}
   2852 
   2853 	ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
   2854 
   2855 	tdb_transaction_unlock(tdb);
   2856 
   2857 	return ret;
   2858 }
   2859 
   2860 
   2861 /* find the first entry in the database and return its key */
   2862 TDB_DATA tdb_firstkey(struct tdb_context *tdb)
   2863 {
   2864 	TDB_DATA key;
   2865 	struct list_struct rec;
   2866 
   2867 	/* release any old lock */
   2868 	if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0)
   2869 		return tdb_null;
   2870 	tdb->travlocks.off = tdb->travlocks.hash = 0;
   2871 	tdb->travlocks.lock_rw = F_RDLCK;
   2872 
   2873 	/* Grab first record: locks chain and returned record. */
   2874 	if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
   2875 		return tdb_null;
   2876 	/* now read the key */
   2877 	key.dsize = rec.key_len;
   2878 	key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
   2879 
   2880 	/* Unlock the hash chain of the record we just read. */
   2881 	if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0)
   2882 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
   2883 	return key;
   2884 }
   2885 
   2886 /* find the next entry in the database, returning its key */
   2887 TDB_DATA tdb_nextkey(struct tdb_context *tdb, TDB_DATA oldkey)
   2888 {
   2889 	u32 oldhash;
   2890 	TDB_DATA key = tdb_null;
   2891 	struct list_struct rec;
   2892 	unsigned char *k = NULL;
   2893 
   2894 	/* Is locked key the old key?  If so, traverse will be reliable. */
   2895 	if (tdb->travlocks.off) {
   2896 		if (tdb_lock(tdb,tdb->travlocks.hash,tdb->travlocks.lock_rw))
   2897 			return tdb_null;
   2898 		if (tdb_rec_read(tdb, tdb->travlocks.off, &rec) == -1
   2899 		    || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
   2900 					    rec.key_len))
   2901 		    || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
   2902 			/* No, it wasn't: unlock it and start from scratch */
   2903 			if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0) {
   2904 				SAFE_FREE(k);
   2905 				return tdb_null;
   2906 			}
   2907 			if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0) {
   2908 				SAFE_FREE(k);
   2909 				return tdb_null;
   2910 			}
   2911 			tdb->travlocks.off = 0;
   2912 		}
   2913 
   2914 		SAFE_FREE(k);
   2915 	}
   2916 
   2917 	if (!tdb->travlocks.off) {
   2918 		/* No previous element: do normal find, and lock record */
   2919 		tdb->travlocks.off = tdb_find_lock_hash(tdb, oldkey, tdb->hash_fn(&oldkey), tdb->travlocks.lock_rw, &rec);
   2920 		if (!tdb->travlocks.off)
   2921 			return tdb_null;
   2922 		tdb->travlocks.hash = BUCKET(rec.full_hash);
   2923 		if (tdb_lock_record(tdb, tdb->travlocks.off) != 0) {
   2924 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
   2925 			return tdb_null;
   2926 		}
   2927 	}
   2928 	oldhash = tdb->travlocks.hash;
   2929 
   2930 	/* Grab next record: locks chain and returned record,
   2931 	   unlocks old record */
   2932 	if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
   2933 		key.dsize = rec.key_len;
   2934 		key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
   2935 					  key.dsize);
   2936 		/* Unlock the chain of this new record */
   2937 		if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0)
   2938 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
   2939 	}
   2940 	/* Unlock the chain of old record */
   2941 	if (tdb_unlock(tdb, BUCKET(oldhash), tdb->travlocks.lock_rw) != 0)
   2942 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
   2943 	return key;
   2944 }
   2945 
   2946 /* file: dump.c */
   2947 
   2948 static tdb_off_t tdb_dump_record(struct tdb_context *tdb, int hash,
   2949 				 tdb_off_t offset)
   2950 {
   2951 	struct list_struct rec;
   2952 	tdb_off_t tailer_ofs, tailer;
   2953 
   2954 	if (tdb->methods->tdb_read(tdb, offset, (char *)&rec,
   2955 				   sizeof(rec), DOCONV()) == -1) {
   2956 		printf("ERROR: failed to read record at %u\n", offset);
   2957 		return 0;
   2958 	}
   2959 
   2960 	printf(" rec: hash=%d offset=0x%08x next=0x%08x rec_len=%d "
   2961 	       "key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
   2962 	       hash, offset, rec.next, rec.rec_len, rec.key_len, rec.data_len,
   2963 	       rec.full_hash, rec.magic);
   2964 
   2965 	tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off_t);
   2966 
   2967 	if (tdb_ofs_read(tdb, tailer_ofs, &tailer) == -1) {
   2968 		printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
   2969 		return rec.next;
   2970 	}
   2971 
   2972 	if (tailer != rec.rec_len + sizeof(rec)) {
   2973 		printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
   2974 				(unsigned int)tailer, (unsigned int)(rec.rec_len + sizeof(rec)));
   2975 	}
   2976 	return rec.next;
   2977 }
   2978 
   2979 static int tdb_dump_chain(struct tdb_context *tdb, int i)
   2980 {
   2981 	tdb_off_t rec_ptr, top;
   2982 
   2983 	top = TDB_HASH_TOP(i);
   2984 
   2985 	if (tdb_lock(tdb, i, F_WRLCK) != 0)
   2986 		return -1;
   2987 
   2988 	if (tdb_ofs_read(tdb, top, &rec_ptr) == -1)
   2989 		return tdb_unlock(tdb, i, F_WRLCK);
   2990 
   2991 	if (rec_ptr)
   2992 		printf("hash=%d\n", i);
   2993 
   2994 	while (rec_ptr) {
   2995 		rec_ptr = tdb_dump_record(tdb, i, rec_ptr);
   2996 	}
   2997 
   2998 	return tdb_unlock(tdb, i, F_WRLCK);
   2999 }
   3000 
   3001 void tdb_dump_all(struct tdb_context *tdb)
   3002 {
   3003 	int i;
   3004 	for (i=0;i<tdb->header.hash_size;i++) {
   3005 		tdb_dump_chain(tdb, i);
   3006 	}
   3007 	printf("freelist:\n");
   3008 	tdb_dump_chain(tdb, -1);
   3009 }
   3010 
   3011 int tdb_printfreelist(struct tdb_context *tdb)
   3012 {
   3013 	int ret;
   3014 	long total_free = 0;
   3015 	tdb_off_t offset, rec_ptr;
   3016 	struct list_struct rec;
   3017 
   3018 	if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
   3019 		return ret;
   3020 
   3021 	offset = FREELIST_TOP;
   3022 
   3023 	/* read in the freelist top */
   3024 	if (tdb_ofs_read(tdb, offset, &rec_ptr) == -1) {
   3025 		tdb_unlock(tdb, -1, F_WRLCK);
   3026 		return 0;
   3027 	}
   3028 
   3029 	printf("freelist top=[0x%08x]\n", rec_ptr );
   3030 	while (rec_ptr) {
   3031 		if (tdb->methods->tdb_read(tdb, rec_ptr, (char *)&rec,
   3032 					   sizeof(rec), DOCONV()) == -1) {
   3033 			tdb_unlock(tdb, -1, F_WRLCK);
   3034 			return -1;
   3035 		}
   3036 
   3037 		if (rec.magic != TDB_FREE_MAGIC) {
   3038 			printf("bad magic 0x%08x in free list\n", rec.magic);
   3039 			tdb_unlock(tdb, -1, F_WRLCK);
   3040 			return -1;
   3041 		}
   3042 
   3043 		printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)] (end = 0x%08x)\n",
   3044 		       rec_ptr, rec.rec_len, rec.rec_len, rec_ptr + rec.rec_len);
   3045 		total_free += rec.rec_len;
   3046 
   3047 		/* move to the next record */
   3048 		rec_ptr = rec.next;
   3049 	}
   3050 	printf("total rec_len = [0x%08x (%d)]\n", (int)total_free,
   3051                (int)total_free);
   3052 
   3053 	return tdb_unlock(tdb, -1, F_WRLCK);
   3054 }
   3055 
   3056 /* file: tdb.c */
   3057 
   3058 /*
   3059   non-blocking increment of the tdb sequence number if the tdb has been opened using
   3060   the TDB_SEQNUM flag
   3061 */
   3062 void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
   3063 {
   3064 	tdb_off_t seqnum=0;
   3065 
   3066 	if (!(tdb->flags & TDB_SEQNUM)) {
   3067 		return;
   3068 	}
   3069 
   3070 	/* we ignore errors from this, as we have no sane way of
   3071 	   dealing with them.
   3072 	*/
   3073 	tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
   3074 	seqnum++;
   3075 	tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
   3076 }
   3077 
   3078 /*
   3079   increment the tdb sequence number if the tdb has been opened using
   3080   the TDB_SEQNUM flag
   3081 */
   3082 static void tdb_increment_seqnum(struct tdb_context *tdb)
   3083 {
   3084 	if (!(tdb->flags & TDB_SEQNUM)) {
   3085 		return;
   3086 	}
   3087 
   3088 	if (tdb_brlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, F_SETLKW, 1, 1) != 0) {
   3089 		return;
   3090 	}
   3091 
   3092 	tdb_increment_seqnum_nonblock(tdb);
   3093 
   3094 	tdb_brlock(tdb, TDB_SEQNUM_OFS, F_UNLCK, F_SETLKW, 1, 1);
   3095 }
   3096 
   3097 static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
   3098 {
   3099 	return memcmp(data.dptr, key.dptr, data.dsize);
   3100 }
   3101 
   3102 /* Returns 0 on fail.  On success, return offset of record, and fills
   3103    in rec */
   3104 static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, u32 hash,
   3105 			struct list_struct *r)
   3106 {
   3107 	tdb_off_t rec_ptr;
   3108 
   3109 	/* read in the hash top */
   3110 	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
   3111 		return 0;
   3112 
   3113 	/* keep looking until we find the right record */
   3114 	while (rec_ptr) {
   3115 		if (tdb_rec_read(tdb, rec_ptr, r) == -1)
   3116 			return 0;
   3117 
   3118 		if (!TDB_DEAD(r) && hash==r->full_hash
   3119 		    && key.dsize==r->key_len
   3120 		    && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
   3121 				      r->key_len, tdb_key_compare,
   3122 				      NULL) == 0) {
   3123 			return rec_ptr;
   3124 		}
   3125 		rec_ptr = r->next;
   3126 	}
   3127 	return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
   3128 }
   3129 
   3130 /* As tdb_find, but if you succeed, keep the lock */
   3131 tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
   3132 			   struct list_struct *rec)
   3133 {
   3134 	u32 rec_ptr;
   3135 
   3136 	if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
   3137 		return 0;
   3138 	if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
   3139 		tdb_unlock(tdb, BUCKET(hash), locktype);
   3140 	return rec_ptr;
   3141 }
   3142 
   3143 
   3144 /* update an entry in place - this only works if the new data size
   3145    is <= the old data size and the key exists.
   3146    on failure return -1.
   3147 */
   3148 static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, TDB_DATA dbuf)
   3149 {
   3150 	struct list_struct rec;
   3151 	tdb_off_t rec_ptr;
   3152 
   3153 	/* find entry */
   3154 	if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
   3155 		return -1;
   3156 
   3157 	/* must be long enough key, data and tailer */
   3158 	if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off_t)) {
   3159 		tdb->ecode = TDB_SUCCESS; /* Not really an error */
   3160 		return -1;
   3161 	}
   3162 
   3163 	if (tdb->methods->tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
   3164 		      dbuf.dptr, dbuf.dsize) == -1)
   3165 		return -1;
   3166 
   3167 	if (dbuf.dsize != rec.data_len) {
   3168 		/* update size */
   3169 		rec.data_len = dbuf.dsize;
   3170 		return tdb_rec_write(tdb, rec_ptr, &rec);
   3171 	}
   3172 
   3173 	return 0;
   3174 }
   3175 
   3176 /* find an entry in the database given a key */
   3177 /* If an entry doesn't exist tdb_err will be set to
   3178  * TDB_ERR_NOEXIST. If a key has no data attached
   3179  * then the TDB_DATA will have zero length but
   3180  * a non-zero pointer
   3181  */
   3182 TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
   3183 {
   3184 	tdb_off_t rec_ptr;
   3185 	struct list_struct rec;
   3186 	TDB_DATA ret;
   3187 	u32 hash;
   3188 
   3189 	/* find which hash bucket it is in */
   3190 	hash = tdb->hash_fn(&key);
   3191 	if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
   3192 		return tdb_null;
   3193 
   3194 	ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
   3195 				  rec.data_len);
   3196 	ret.dsize = rec.data_len;
   3197 	tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
   3198 	return ret;
   3199 }
   3200 
   3201 /*
   3202  * Find an entry in the database and hand the record's data to a parsing
   3203  * function. The parsing function is executed under the chain read lock, so it
   3204  * should be fast and should not block on other syscalls.
   3205  *
   3206  * DONT CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
   3207  *
   3208  * For mmapped tdb's that do not have a transaction open it points the parsing
   3209  * function directly at the mmap area, it avoids the malloc/memcpy in this
   3210  * case. If a transaction is open or no mmap is available, it has to do
   3211  * malloc/read/parse/free.
   3212  *
   3213  * This is interesting for all readers of potentially large data structures in
   3214  * the tdb records, ldb indexes being one example.
   3215  */
   3216 
   3217 int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
   3218 		     int (*parser)(TDB_DATA key, TDB_DATA data,
   3219 				   void *private_data),
   3220 		     void *private_data)
   3221 {
   3222 	tdb_off_t rec_ptr;
   3223 	struct list_struct rec;
   3224 	int ret;
   3225 	u32 hash;
   3226 
   3227 	/* find which hash bucket it is in */
   3228 	hash = tdb->hash_fn(&key);
   3229 
   3230 	if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
   3231 		return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
   3232 	}
   3233 
   3234 	ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
   3235 			     rec.data_len, parser, private_data);
   3236 
   3237 	tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
   3238 
   3239 	return ret;
   3240 }
   3241 
   3242 /* check if an entry in the database exists
   3243 
   3244    note that 1 is returned if the key is found and 0 is returned if not found
   3245    this doesn't match the conventions in the rest of this module, but is
   3246    compatible with gdbm
   3247 */
   3248 static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
   3249 {
   3250 	struct list_struct rec;
   3251 
   3252 	if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
   3253 		return 0;
   3254 	tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
   3255 	return 1;
   3256 }
   3257 
   3258 int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
   3259 {
   3260 	u32 hash = tdb->hash_fn(&key);
   3261 	return tdb_exists_hash(tdb, key, hash);
   3262 }
   3263 
   3264 /* actually delete an entry in the database given the offset */
   3265 int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct*rec)
   3266 {
   3267 	tdb_off_t last_ptr, i;
   3268 	struct list_struct lastrec;
   3269 
   3270 	if (tdb->read_only || tdb->traverse_read) return -1;
   3271 
   3272 	if (tdb_write_lock_record(tdb, rec_ptr) == -1) {
   3273 		/* Someone traversing here: mark it as dead */
   3274 		rec->magic = TDB_DEAD_MAGIC;
   3275 		return tdb_rec_write(tdb, rec_ptr, rec);
   3276 	}
   3277 	if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
   3278 		return -1;
   3279 
   3280 	/* find previous record in hash chain */
   3281 	if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
   3282 		return -1;
   3283 	for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
   3284 		if (tdb_rec_read(tdb, i, &lastrec) == -1)
   3285 			return -1;
   3286 
   3287 	/* unlink it: next ptr is at start of record. */
   3288 	if (last_ptr == 0)
   3289 		last_ptr = TDB_HASH_TOP(rec->full_hash);
   3290 	if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
   3291 		return -1;
   3292 
   3293 	/* recover the space */
   3294 	if (tdb_free(tdb, rec_ptr, rec) == -1)
   3295 		return -1;
   3296 	return 0;
   3297 }
   3298 
   3299 static int tdb_count_dead(struct tdb_context *tdb, u32 hash)
   3300 {
   3301 	int res = 0;
   3302 	tdb_off_t rec_ptr;
   3303 	struct list_struct rec;
   3304 
   3305 	/* read in the hash top */
   3306 	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
   3307 		return 0;
   3308 
   3309 	while (rec_ptr) {
   3310 		if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
   3311 			return 0;
   3312 
   3313 		if (rec.magic == TDB_DEAD_MAGIC) {
   3314 			res += 1;
   3315 		}
   3316 		rec_ptr = rec.next;
   3317 	}
   3318 	return res;
   3319 }
   3320 
   3321 /*
   3322  * Purge all DEAD records from a hash chain
   3323  */
   3324 static int tdb_purge_dead(struct tdb_context *tdb, u32 hash)
   3325 {
   3326 	int res = -1;
   3327 	struct list_struct rec;
   3328 	tdb_off_t rec_ptr;
   3329 
   3330 	if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
   3331 		return -1;
   3332 	}
   3333 
   3334 	/* read in the hash top */
   3335 	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
   3336 		goto fail;
   3337 
   3338 	while (rec_ptr) {
   3339 		tdb_off_t next;
   3340 
   3341 		if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
   3342 			goto fail;
   3343 		}
   3344 
   3345 		next = rec.next;
   3346 
   3347 		if (rec.magic == TDB_DEAD_MAGIC
   3348 		    && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
   3349 			goto fail;
   3350 		}
   3351 		rec_ptr = next;
   3352 	}
   3353 	res = 0;
   3354  fail:
   3355 	tdb_unlock(tdb, -1, F_WRLCK);
   3356 	return res;
   3357 }
   3358 
   3359 /* delete an entry in the database given a key */
   3360 static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
   3361 {
   3362 	tdb_off_t rec_ptr;
   3363 	struct list_struct rec;
   3364 	int ret;
   3365 
   3366 	if (tdb->max_dead_records != 0) {
   3367 
   3368 		/*
   3369 		 * Allow for some dead records per hash chain, mainly for
   3370 		 * tdb's with a very high create/delete rate like locking.tdb.
   3371 		 */
   3372 
   3373 		if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
   3374 			return -1;
   3375 
   3376 		if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
   3377 			/*
   3378 			 * Don't let the per-chain freelist grow too large,
   3379 			 * delete all existing dead records
   3380 			 */
   3381 			tdb_purge_dead(tdb, hash);
   3382 		}
   3383 
   3384 		if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
   3385 			tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
   3386 			return -1;
   3387 		}
   3388 
   3389 		/*
   3390 		 * Just mark the record as dead.
   3391 		 */
   3392 		rec.magic = TDB_DEAD_MAGIC;
   3393 		ret = tdb_rec_write(tdb, rec_ptr, &rec);
   3394 	}
   3395 	else {
   3396 		if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK,
   3397 						   &rec)))
   3398 			return -1;
   3399 
   3400 		ret = tdb_do_delete(tdb, rec_ptr, &rec);
   3401 	}
   3402 
   3403 	if (ret == 0) {
   3404 		tdb_increment_seqnum(tdb);
   3405 	}
   3406 
   3407 	if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
   3408 		TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
   3409 	return ret;
   3410 }
   3411 
   3412 int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
   3413 {
   3414 	u32 hash = tdb->hash_fn(&key);
   3415 	return tdb_delete_hash(tdb, key, hash);
   3416 }
   3417 
   3418 /*
   3419  * See if we have a dead record around with enough space
   3420  */
   3421 static tdb_off_t tdb_find_dead(struct tdb_context *tdb, u32 hash,
   3422 			       struct list_struct *r, tdb_len_t length)
   3423 {
   3424 	tdb_off_t rec_ptr;
   3425 
   3426 	/* read in the hash top */
   3427 	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
   3428 		return 0;
   3429 
   3430 	/* keep looking until we find the right record */
   3431 	while (rec_ptr) {
   3432 		if (tdb_rec_read(tdb, rec_ptr, r) == -1)
   3433 			return 0;
   3434 
   3435 		if (TDB_DEAD(r) && r->rec_len >= length) {
   3436 			/*
   3437 			 * First fit for simple coding, TODO: change to best
   3438 			 * fit
   3439 			 */
   3440 			return rec_ptr;
   3441 		}
   3442 		rec_ptr = r->next;
   3443 	}
   3444 	return 0;
   3445 }
   3446 
   3447 /* store an element in the database, replacing any existing element
   3448    with the same key
   3449 
   3450    return 0 on success, -1 on failure
   3451 */
   3452 int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
   3453 {
   3454 	struct list_struct rec;
   3455 	u32 hash;
   3456 	tdb_off_t rec_ptr;
   3457 	char *p = NULL;
   3458 	int ret = -1;
   3459 
   3460 	if (tdb->read_only || tdb->traverse_read) {
   3461 		tdb->ecode = TDB_ERR_RDONLY;
   3462 		return -1;
   3463 	}
   3464 
   3465 	/* find which hash bucket it is in */
   3466 	hash = tdb->hash_fn(&key);
   3467 	if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
   3468 		return -1;
   3469 
   3470 	/* check for it existing, on insert. */
   3471 	if (flag == TDB_INSERT) {
   3472 		if (tdb_exists_hash(tdb, key, hash)) {
   3473 			tdb->ecode = TDB_ERR_EXISTS;
   3474 			goto fail;
   3475 		}
   3476 	} else {
   3477 		/* first try in-place update, on modify or replace. */
   3478 		if (tdb_update_hash(tdb, key, hash, dbuf) == 0) {
   3479 			goto done;
   3480 		}
   3481 		if (tdb->ecode == TDB_ERR_NOEXIST &&
   3482 		    flag == TDB_MODIFY) {
   3483 			/* if the record doesn't exist and we are in TDB_MODIFY mode then
   3484 			 we should fail the store */
   3485 			goto fail;
   3486 		}
   3487 	}
   3488 	/* reset the error code potentially set by the tdb_update() */
   3489 	tdb->ecode = TDB_SUCCESS;
   3490 
   3491 	/* delete any existing record - if it doesn't exist we don't
   3492            care.  Doing this first reduces fragmentation, and avoids
   3493            coalescing with `allocated' block before it's updated. */
   3494 	if (flag != TDB_INSERT)
   3495 		tdb_delete_hash(tdb, key, hash);
   3496 
   3497 	/* Copy key+value *before* allocating free space in case malloc
   3498 	   fails and we are left with a dead spot in the tdb. */
   3499 
   3500 	if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
   3501 		tdb->ecode = TDB_ERR_OOM;
   3502 		goto fail;
   3503 	}
   3504 
   3505 	memcpy(p, key.dptr, key.dsize);
   3506 	if (dbuf.dsize)
   3507 		memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
   3508 
   3509 	if (tdb->max_dead_records != 0) {
   3510 		/*
   3511 		 * Allow for some dead records per hash chain, look if we can
   3512 		 * find one that can hold the new record. We need enough space
   3513 		 * for key, data and tailer. If we find one, we don't have to
   3514 		 * consult the central freelist.
   3515 		 */
   3516 		rec_ptr = tdb_find_dead(
   3517 			tdb, hash, &rec,
   3518 			key.dsize + dbuf.dsize + sizeof(tdb_off_t));
   3519 
   3520 		if (rec_ptr != 0) {
   3521 			rec.key_len = key.dsize;
   3522 			rec.data_len = dbuf.dsize;
   3523 			rec.full_hash = hash;
   3524 			rec.magic = TDB_MAGIC;
   3525 			if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
   3526 			    || tdb->methods->tdb_write(
   3527 				    tdb, rec_ptr + sizeof(rec),
   3528 				    p, key.dsize + dbuf.dsize) == -1) {
   3529 				goto fail;
   3530 			}
   3531 			goto done;
   3532 		}
   3533 	}
   3534 
   3535 	/*
   3536 	 * We have to allocate some space from the freelist, so this means we
   3537 	 * have to lock it. Use the chance to purge all the DEAD records from
   3538 	 * the hash chain under the freelist lock.
   3539 	 */
   3540 
   3541 	if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
   3542 		goto fail;
   3543 	}
   3544 
   3545 	if ((tdb->max_dead_records != 0)
   3546 	    && (tdb_purge_dead(tdb, hash) == -1)) {
   3547 		tdb_unlock(tdb, -1, F_WRLCK);
   3548 		goto fail;
   3549 	}
   3550 
   3551 	/* we have to allocate some space */
   3552 	rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec);
   3553 
   3554 	tdb_unlock(tdb, -1, F_WRLCK);
   3555 
   3556 	if (rec_ptr == 0) {
   3557 		goto fail;
   3558 	}
   3559 
   3560 	/* Read hash top into next ptr */
   3561 	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
   3562 		goto fail;
   3563 
   3564 	rec.key_len = key.dsize;
   3565 	rec.data_len = dbuf.dsize;
   3566 	rec.full_hash = hash;
   3567 	rec.magic = TDB_MAGIC;
   3568 
   3569 	/* write out and point the top of the hash chain at it */
   3570 	if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
   3571 	    || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
   3572 	    || tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
   3573 		/* Need to tdb_unallocate() here */
   3574 		goto fail;
   3575 	}
   3576 
   3577  done:
   3578 	ret = 0;
   3579  fail:
   3580 	if (ret == 0) {
   3581 		tdb_increment_seqnum(tdb);
   3582 	}
   3583 
   3584 	SAFE_FREE(p);
   3585 	tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
   3586 	return ret;
   3587 }
   3588 
   3589 
   3590 /* Append to an entry. Create if not exist. */
   3591 int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
   3592 {
   3593 	u32 hash;
   3594 	TDB_DATA dbuf;
   3595 	int ret = -1;
   3596 
   3597 	/* find which hash bucket it is in */
   3598 	hash = tdb->hash_fn(&key);
   3599 	if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
   3600 		return -1;
   3601 
   3602 	dbuf = tdb_fetch(tdb, key);
   3603 
   3604 	if (dbuf.dptr == NULL) {
   3605 		dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
   3606 	} else {
   3607 		unsigned char *new_dptr = (unsigned char *)realloc(dbuf.dptr,
   3608 						     dbuf.dsize + new_dbuf.dsize);
   3609 		if (new_dptr == NULL) {
   3610 			free(dbuf.dptr);
   3611 		}
   3612 		dbuf.dptr = new_dptr;
   3613 	}
   3614 
   3615 	if (dbuf.dptr == NULL) {
   3616 		tdb->ecode = TDB_ERR_OOM;
   3617 		goto failed;
   3618 	}
   3619 
   3620 	memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
   3621 	dbuf.dsize += new_dbuf.dsize;
   3622 
   3623 	ret = tdb_store(tdb, key, dbuf, 0);
   3624 
   3625 failed:
   3626 	tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
   3627 	SAFE_FREE(dbuf.dptr);
   3628 	return ret;
   3629 }
   3630 
   3631 
   3632 /*
   3633   return the name of the current tdb file
   3634   useful for external logging functions
   3635 */
   3636 const char *tdb_name(struct tdb_context *tdb)
   3637 {
   3638 	return tdb->name;
   3639 }
   3640 
   3641 /*
   3642   return the underlying file descriptor being used by tdb, or -1
   3643   useful for external routines that want to check the device/inode
   3644   of the fd
   3645 */
   3646 int tdb_fd(struct tdb_context *tdb)
   3647 {
   3648 	return tdb->fd;
   3649 }
   3650 
   3651 /*
   3652   return the current logging function
   3653   useful for external tdb routines that wish to log tdb errors
   3654 */
   3655 tdb_log_func tdb_log_fn(struct tdb_context *tdb)
   3656 {
   3657 	return tdb->log.log_fn;
   3658 }
   3659 
   3660 
   3661 /*
   3662   get the tdb sequence number. Only makes sense if the writers opened
   3663   with TDB_SEQNUM set. Note that this sequence number will wrap quite
   3664   quickly, so it should only be used for a 'has something changed'
   3665   test, not for code that relies on the count of the number of changes
   3666   made. If you want a counter then use a tdb record.
   3667 
   3668   The aim of this sequence number is to allow for a very lightweight
   3669   test of a possible tdb change.
   3670 */
   3671 int tdb_get_seqnum(struct tdb_context *tdb)
   3672 {
   3673 	tdb_off_t seqnum=0;
   3674 
   3675 	tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
   3676 	return seqnum;
   3677 }
   3678 
   3679 int tdb_hash_size(struct tdb_context *tdb)
   3680 {
   3681 	return tdb->header.hash_size;
   3682 }
   3683 
   3684 size_t tdb_map_size(struct tdb_context *tdb)
   3685 {
   3686 	return tdb->map_size;
   3687 }
   3688 
   3689 int tdb_get_flags(struct tdb_context *tdb)
   3690 {
   3691 	return tdb->flags;
   3692 }
   3693 
   3694 
   3695 /*
   3696   enable sequence number handling on an open tdb
   3697 */
   3698 void tdb_enable_seqnum(struct tdb_context *tdb)
   3699 {
   3700 	tdb->flags |= TDB_SEQNUM;
   3701 }
   3702 
   3703 /* file: open.c */
   3704 
   3705 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
   3706 static struct tdb_context *tdbs = NULL;
   3707 
   3708 
   3709 /* This is from a hash algorithm suggested by Rogier Wolff */
   3710 static unsigned int default_tdb_hash(TDB_DATA *key)
   3711 {
   3712 	u32 value;	/* Used to compute the hash value.  */
   3713 	u32   i;	/* Used to cycle through random values. */
   3714 
   3715 	/* Set the initial value from the key size. */
   3716 	for (value = 0, i=0; i < key->dsize; i++)
   3717 		value = value * 256 + key->dptr[i] + (value >> 24) * 241;
   3718 
   3719 	return value;
   3720 }
   3721 
   3722 
   3723 /* initialise a new database with a specified hash size */
   3724 static int tdb_new_database(struct tdb_context *tdb, int hash_size)
   3725 {
   3726 	struct tdb_header *newdb;
   3727 	int size, ret = -1;
   3728 
   3729 	/* We make it up in memory, then write it out if not internal */
   3730 	size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off_t);
   3731 	if (!(newdb = (struct tdb_header *)calloc(size, 1)))
   3732 		return TDB_ERRCODE(TDB_ERR_OOM, -1);
   3733 
   3734 	/* Fill in the header */
   3735 	newdb->version = TDB_VERSION;
   3736 	newdb->hash_size = hash_size;
   3737 	if (tdb->flags & TDB_INTERNAL) {
   3738 		tdb->map_size = size;
   3739 		tdb->map_ptr = (char *)newdb;
   3740 		memcpy(&tdb->header, newdb, sizeof(tdb->header));
   3741 		/* Convert the `ondisk' version if asked. */
   3742 		CONVERT(*newdb);
   3743 		return 0;
   3744 	}
   3745 	if (lseek(tdb->fd, 0, SEEK_SET) == -1)
   3746 		goto fail;
   3747 
   3748 	if (ftruncate(tdb->fd, 0) == -1)
   3749 		goto fail;
   3750 
   3751 	/* This creates an endian-converted header, as if read from disk */
   3752 	CONVERT(*newdb);
   3753 	memcpy(&tdb->header, newdb, sizeof(tdb->header));
   3754 	/* Don't endian-convert the magic food! */
   3755 	memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
   3756 	if (write(tdb->fd, newdb, size) != size) {
   3757 		ret = -1;
   3758 	} else {
   3759 		ret = 0;
   3760 	}
   3761 
   3762   fail:
   3763 	SAFE_FREE(newdb);
   3764 	return ret;
   3765 }
   3766 
   3767 
   3768 
   3769 static int tdb_already_open(dev_t device,
   3770 			    ino_t ino)
   3771 {
   3772 	struct tdb_context *i;
   3773 
   3774 	for (i = tdbs; i; i = i->next) {
   3775 		if (i->device == device && i->inode == ino) {
   3776 			return 1;
   3777 		}
   3778 	}
   3779 
   3780 	return 0;
   3781 }
   3782 
   3783 /* open the database, creating it if necessary
   3784 
   3785    The open_flags and mode are passed straight to the open call on the
   3786    database file. A flags value of O_WRONLY is invalid. The hash size
   3787    is advisory, use zero for a default value.
   3788 
   3789    Return is NULL on error, in which case errno is also set.  Don't
   3790    try to call tdb_error or tdb_errname, just do strerror(errno).
   3791 
   3792    @param name may be NULL for internal databases. */
   3793 struct tdb_context *tdb_open(const char *name, int hash_size, int tdb_flags,
   3794 		      int open_flags, mode_t mode)
   3795 {
   3796 	return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL, NULL);
   3797 }
   3798 
   3799 /* a default logging function */
   3800 static void null_log_fn(struct tdb_context *tdb, enum tdb_debug_level level, const char *fmt, ...) PRINTF_ATTRIBUTE(3, 4);
   3801 static void null_log_fn(struct tdb_context *tdb, enum tdb_debug_level level, const char *fmt, ...)
   3802 {
   3803 }
   3804 
   3805 
   3806 struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
   3807 				int open_flags, mode_t mode,
   3808 				const struct tdb_logging_context *log_ctx,
   3809 				tdb_hash_func hash_fn)
   3810 {
   3811 	struct tdb_context *tdb;
   3812 	struct stat st;
   3813 	int rev = 0, locked = 0;
   3814 	unsigned char *vp;
   3815 	u32 vertest;
   3816 
   3817 	if (!(tdb = (struct tdb_context *)calloc(1, sizeof *tdb))) {
   3818 		/* Can't log this */
   3819 		errno = ENOMEM;
   3820 		goto fail;
   3821 	}
   3822 	tdb_io_init(tdb);
   3823 	tdb->fd = -1;
   3824 	tdb->name = NULL;
   3825 	tdb->map_ptr = NULL;
   3826 	tdb->flags = tdb_flags;
   3827 	tdb->open_flags = open_flags;
   3828 	if (log_ctx) {
   3829 		tdb->log = *log_ctx;
   3830 	} else {
   3831 		tdb->log.log_fn = null_log_fn;
   3832 		tdb->log.log_private = NULL;
   3833 	}
   3834 	tdb->hash_fn = hash_fn ? hash_fn : default_tdb_hash;
   3835 
   3836 	/* cache the page size */
   3837 	tdb->page_size = sysconf(_SC_PAGESIZE);
   3838 	if (tdb->page_size <= 0) {
   3839 		tdb->page_size = 0x2000;
   3840 	}
   3841 
   3842 	if ((open_flags & O_ACCMODE) == O_WRONLY) {
   3843 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: can't open tdb %s write-only\n",
   3844 			 name));
   3845 		errno = EINVAL;
   3846 		goto fail;
   3847 	}
   3848 
   3849 	if (hash_size == 0)
   3850 		hash_size = DEFAULT_HASH_SIZE;
   3851 	if ((open_flags & O_ACCMODE) == O_RDONLY) {
   3852 		tdb->read_only = 1;
   3853 		/* read only databases don't do locking or clear if first */
   3854 		tdb->flags |= TDB_NOLOCK;
   3855 		tdb->flags &= ~TDB_CLEAR_IF_FIRST;
   3856 	}
   3857 
   3858 	/* internal databases don't mmap or lock, and start off cleared */
   3859 	if (tdb->flags & TDB_INTERNAL) {
   3860 		tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
   3861 		tdb->flags &= ~TDB_CLEAR_IF_FIRST;
   3862 		if (tdb_new_database(tdb, hash_size) != 0) {
   3863 			TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: tdb_new_database failed!"));
   3864 			goto fail;
   3865 		}
   3866 		goto internal;
   3867 	}
   3868 
   3869 	if ((tdb->fd = open(name, open_flags, mode)) == -1) {
   3870 		TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_open_ex: could not open file %s: %s\n",
   3871 			 name, strerror(errno)));
   3872 		goto fail;	/* errno set by open(2) */
   3873 	}
   3874 
   3875 	/* ensure there is only one process initialising at once */
   3876 	if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
   3877 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to get global lock on %s: %s\n",
   3878 			 name, strerror(errno)));
   3879 		goto fail;	/* errno set by tdb_brlock */
   3880 	}
   3881 
   3882 	/* we need to zero database if we are the only one with it open */
   3883 	if ((tdb_flags & TDB_CLEAR_IF_FIRST) &&
   3884 	    (locked = (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0, 1) == 0))) {
   3885 		open_flags |= O_CREAT;
   3886 		if (ftruncate(tdb->fd, 0) == -1) {
   3887 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_open_ex: "
   3888 				 "failed to truncate %s: %s\n",
   3889 				 name, strerror(errno)));
   3890 			goto fail; /* errno set by ftruncate */
   3891 		}
   3892 	}
   3893 
   3894 	if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
   3895 	    || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
   3896 	    || (tdb->header.version != TDB_VERSION
   3897 		&& !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
   3898 		/* its not a valid database - possibly initialise it */
   3899 		if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
   3900 			errno = EIO; /* ie bad format or something */
   3901 			goto fail;
   3902 		}
   3903 		rev = (tdb->flags & TDB_CONVERT);
   3904 	}
   3905 	vp = (unsigned char *)&tdb->header.version;
   3906 	vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
   3907 		  (((u32)vp[2]) << 8) | (u32)vp[3];
   3908 	tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
   3909 	if (!rev)
   3910 		tdb->flags &= ~TDB_CONVERT;
   3911 	else {
   3912 		tdb->flags |= TDB_CONVERT;
   3913 		tdb_convert(&tdb->header, sizeof(tdb->header));
   3914 	}
   3915 	if (fstat(tdb->fd, &st) == -1)
   3916 		goto fail;
   3917 
   3918 	if (tdb->header.rwlocks != 0) {
   3919 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: spinlocks no longer supported\n"));
   3920 		goto fail;
   3921 	}
   3922 
   3923 	/* Is it already in the open list?  If so, fail. */
   3924 	if (tdb_already_open(st.st_dev, st.st_ino)) {
   3925 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
   3926 			 "%s (%d,%d) is already open in this process\n",
   3927 			 name, (int)st.st_dev, (int)st.st_ino));
   3928 		errno = EBUSY;
   3929 		goto fail;
   3930 	}
   3931 
   3932 	if (!(tdb->name = (char *)strdup(name))) {
   3933 		errno = ENOMEM;
   3934 		goto fail;
   3935 	}
   3936 
   3937 	tdb->map_size = st.st_size;
   3938 	tdb->device = st.st_dev;
   3939 	tdb->inode = st.st_ino;
   3940 	tdb->max_dead_records = 0;
   3941 	tdb_mmap(tdb);
   3942 	if (locked) {
   3943 		if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0, 1) == -1) {
   3944 			TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
   3945 				 "failed to take ACTIVE_LOCK on %s: %s\n",
   3946 				 name, strerror(errno)));
   3947 			goto fail;
   3948 		}
   3949 
   3950 	}
   3951 
   3952 	/* We always need to do this if the CLEAR_IF_FIRST flag is set, even if
   3953 	   we didn't get the initial exclusive lock as we need to let all other
   3954 	   users know we're using it. */
   3955 
   3956 	if (tdb_flags & TDB_CLEAR_IF_FIRST) {
   3957 		/* leave this lock in place to indicate it's in use */
   3958 		if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)
   3959 			goto fail;
   3960 	}
   3961 
   3962 	/* if needed, run recovery */
   3963 	if (tdb_transaction_recover(tdb) == -1) {
   3964 		goto fail;
   3965 	}
   3966 
   3967  internal:
   3968 	/* Internal (memory-only) databases skip all the code above to
   3969 	 * do with disk files, and resume here by releasing their
   3970 	 * global lock and hooking into the active list. */
   3971 	if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1) == -1)
   3972 		goto fail;
   3973 	tdb->next = tdbs;
   3974 	tdbs = tdb;
   3975 	return tdb;
   3976 
   3977  fail:
   3978 	{ int save_errno = errno;
   3979 
   3980 	if (!tdb)
   3981 		return NULL;
   3982 
   3983 	if (tdb->map_ptr) {
   3984 		if (tdb->flags & TDB_INTERNAL)
   3985 			SAFE_FREE(tdb->map_ptr);
   3986 		else
   3987 			tdb_munmap(tdb);
   3988 	}
   3989 	SAFE_FREE(tdb->name);
   3990 	if (tdb->fd != -1)
   3991 		if (close(tdb->fd) != 0)
   3992 			TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to close tdb->fd on error!\n"));
   3993 	SAFE_FREE(tdb);
   3994 	errno = save_errno;
   3995 	return NULL;
   3996 	}
   3997 }
   3998 
   3999 /*
   4000  * Set the maximum number of dead records per hash chain
   4001  */
   4002 
   4003 void tdb_set_max_dead(struct tdb_context *tdb, int max_dead)
   4004 {
   4005 	tdb->max_dead_records = max_dead;
   4006 }
   4007 
   4008 /**
   4009  * Close a database.
   4010  *
   4011  * @returns -1 for error; 0 for success.
   4012  **/
   4013 int tdb_close(struct tdb_context *tdb)
   4014 {
   4015 	struct tdb_context **i;
   4016 	int ret = 0;
   4017 
   4018 	if (tdb->transaction) {
   4019 		tdb_transaction_cancel(tdb);
   4020 	}
   4021 
   4022 	if (tdb->map_ptr) {
   4023 		if (tdb->flags & TDB_INTERNAL)
   4024 			SAFE_FREE(tdb->map_ptr);
   4025 		else
   4026 			tdb_munmap(tdb);
   4027 	}
   4028 	SAFE_FREE(tdb->name);
   4029 	if (tdb->fd != -1)
   4030 		ret = close(tdb->fd);
   4031 	SAFE_FREE(tdb->lockrecs);
   4032 
   4033 	/* Remove from contexts list */
   4034 	for (i = &tdbs; *i; i = &(*i)->next) {
   4035 		if (*i == tdb) {
   4036 			*i = tdb->next;
   4037 			break;
   4038 		}
   4039 	}
   4040 
   4041 	memset(tdb, 0, sizeof(*tdb));
   4042 	SAFE_FREE(tdb);
   4043 
   4044 	return ret;
   4045 }
   4046 
   4047 /* register a loging function */
   4048 void tdb_set_logging_function(struct tdb_context *tdb,
   4049                               const struct tdb_logging_context *log_ctx)
   4050 {
   4051         tdb->log = *log_ctx;
   4052 }
   4053 
   4054 void *tdb_get_logging_private(struct tdb_context *tdb)
   4055 {
   4056 	return tdb->log.log_private;
   4057 }
   4058 
   4059 /* reopen a tdb - this can be used after a fork to ensure that we have an independent
   4060    seek pointer from our parent and to re-establish locks */
   4061 int tdb_reopen(struct tdb_context *tdb)
   4062 {
   4063 	struct stat st;
   4064 
   4065 	if (tdb->flags & TDB_INTERNAL) {
   4066 		return 0; /* Nothing to do. */
   4067 	}
   4068 
   4069 	if (tdb->num_locks != 0 || tdb->global_lock.count) {
   4070 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_reopen: reopen not allowed with locks held\n"));
   4071 		goto fail;
   4072 	}
   4073 
   4074 	if (tdb->transaction != 0) {
   4075 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_reopen: reopen not allowed inside a transaction\n"));
   4076 		goto fail;
   4077 	}
   4078 
   4079 	if (tdb_munmap(tdb) != 0) {
   4080 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
   4081 		goto fail;
   4082 	}
   4083 	if (close(tdb->fd) != 0)
   4084 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
   4085 	tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
   4086 	if (tdb->fd == -1) {
   4087 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: open failed (%s)\n", strerror(errno)));
   4088 		goto fail;
   4089 	}
   4090 	if ((tdb->flags & TDB_CLEAR_IF_FIRST) &&
   4091 	    (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)) {
   4092 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: failed to obtain active lock\n"));
   4093 		goto fail;
   4094 	}
   4095 	if (fstat(tdb->fd, &st) != 0) {
   4096 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
   4097 		goto fail;
   4098 	}
   4099 	if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
   4100 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: file dev/inode has changed!\n"));
   4101 		goto fail;
   4102 	}
   4103 	tdb_mmap(tdb);
   4104 
   4105 	return 0;
   4106 
   4107 fail:
   4108 	tdb_close(tdb);
   4109 	return -1;
   4110 }
   4111 
   4112 /* reopen all tdb's */
   4113 int tdb_reopen_all(int parent_longlived)
   4114 {
   4115 	struct tdb_context *tdb;
   4116 
   4117 	for (tdb=tdbs; tdb; tdb = tdb->next) {
   4118 		/*
   4119 		 * If the parent is longlived (ie. a
   4120 		 * parent daemon architecture), we know
   4121 		 * it will keep it's active lock on a
   4122 		 * tdb opened with CLEAR_IF_FIRST. Thus
   4123 		 * for child processes we don't have to
   4124 		 * add an active lock. This is essential
   4125 		 * to improve performance on systems that
   4126 		 * keep POSIX locks as a non-scalable data
   4127 		 * structure in the kernel.
   4128 		 */
   4129 		if (parent_longlived) {
   4130 			/* Ensure no clear-if-first. */
   4131 			tdb->flags &= ~TDB_CLEAR_IF_FIRST;
   4132 		}
   4133 
   4134 		if (tdb_reopen(tdb) != 0)
   4135 			return -1;
   4136 	}
   4137 
   4138 	return 0;
   4139 }
   4140