Home | History | Annotate | Download | only in ext2fs
      1 /*
      2 URL: svn://svnanon.samba.org/samba/branches/SAMBA_4_0/source/lib/tdb/common
      3 Rev: 23590
      4 Last Changed Date: 2007-06-22 13:36:10 -0400 (Fri, 22 Jun 2007)
      5 */
      6  /*
      7    trivial database library - standalone version
      8 
      9    Copyright (C) Andrew Tridgell              1999-2005
     10    Copyright (C) Jeremy Allison               2000-2006
     11    Copyright (C) Paul `Rusty' Russell         2000
     12 
     13      ** NOTE! The following LGPL license applies to the tdb
     14      ** library. This does NOT imply that all of Samba is released
     15      ** under the LGPL
     16 
     17    This library is free software; you can redistribute it and/or
     18    modify it under the terms of the GNU Lesser General Public
     19    License as published by the Free Software Foundation; either
     20    version 2 of the License, or (at your option) any later version.
     21 
     22    This library is distributed in the hope that it will be useful,
     23    but WITHOUT ANY WARRANTY; without even the implied warranty of
     24    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
     25    Lesser General Public License for more details.
     26 
     27    You should have received a copy of the GNU Lesser General Public
     28    License along with this library; if not, write to the Free Software
     29    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
     30 */
     31 
     32 #ifdef CONFIG_STAND_ALONE
     33 #define HAVE_MMAP
     34 #define HAVE_STRDUP
     35 #define HAVE_SYS_MMAN_H
     36 #define HAVE_UTIME_H
     37 #define HAVE_UTIME
     38 #endif
     39 #define _XOPEN_SOURCE 500
     40 
     41 #include <unistd.h>
     42 #include <stdio.h>
     43 #include <stdlib.h>
     44 #include <stdarg.h>
     45 #include <stddef.h>
     46 #include <errno.h>
     47 #include <string.h>
     48 #ifdef HAVE_SYS_SELECT_H
     49 #include <sys/select.h>
     50 #endif
     51 #include <sys/time.h>
     52 #include <sys/types.h>
     53 #include <time.h>
     54 #ifdef HAVE_UTIME_H
     55 #include <utime.h>
     56 #endif
     57 #include <sys/stat.h>
     58 #include <sys/file.h>
     59 #include <fcntl.h>
     60 
     61 #ifdef HAVE_SYS_MMAN_H
     62 #include <sys/mman.h>
     63 #endif
     64 
     65 #ifndef MAP_FILE
     66 #define MAP_FILE 0
     67 #endif
     68 
     69 #ifndef MAP_FAILED
     70 #define MAP_FAILED ((void *)-1)
     71 #endif
     72 
     73 #ifndef HAVE_STRDUP
     74 #define strdup rep_strdup
     75 static char *rep_strdup(const char *s)
     76 {
     77 	char *ret;
     78 	int length;
     79 	if (!s)
     80 		return NULL;
     81 
     82 	if (!length)
     83 		length = strlen(s);
     84 
     85 	ret = malloc(length + 1);
     86 	if (ret) {
     87 		strncpy(ret, s, length);
     88 		ret[length] = '\0';
     89 	}
     90 	return ret;
     91 }
     92 #endif
     93 
     94 #ifndef PRINTF_ATTRIBUTE
     95 #if (__GNUC__ >= 3) && (__GNUC_MINOR__ >= 1 )
     96 /** Use gcc attribute to check printf fns.  a1 is the 1-based index of
     97  * the parameter containing the format, and a2 the index of the first
     98  * argument. Note that some gcc 2.x versions don't handle this
     99  * properly **/
    100 #define PRINTF_ATTRIBUTE(a1, a2) __attribute__ ((format (__printf__, a1, a2)))
    101 #else
    102 #define PRINTF_ATTRIBUTE(a1, a2)
    103 #endif
    104 #endif
    105 
    106 typedef int bool;
    107 
    108 #include "tdb.h"
    109 
    110 #ifndef u32
    111 #define u32 unsigned
    112 #endif
    113 
    114 #ifndef HAVE_GETPAGESIZE
    115 #define getpagesize() 0x2000
    116 #endif
    117 
    118 typedef u32 tdb_len_t;
    119 typedef u32 tdb_off_t;
    120 
    121 #ifndef offsetof
    122 #define offsetof(t,f) ((unsigned int)&((t *)0)->f)
    123 #endif
    124 
    125 #define TDB_MAGIC_FOOD "TDB file\n"
    126 #define TDB_VERSION (0x26011967 + 6)
    127 #define TDB_MAGIC (0x26011999U)
    128 #define TDB_FREE_MAGIC (~TDB_MAGIC)
    129 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
    130 #define TDB_RECOVERY_MAGIC (0xf53bc0e7U)
    131 #define TDB_ALIGNMENT 4
    132 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
    133 #define DEFAULT_HASH_SIZE 131
    134 #define FREELIST_TOP (sizeof(struct tdb_header))
    135 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
    136 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
    137 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
    138 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
    139 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off_t))
    140 #define TDB_HASHTABLE_SIZE(tdb) ((tdb->header.hash_size+1)*sizeof(tdb_off_t))
    141 #define TDB_DATA_START(hash_size) TDB_HASH_TOP(hash_size-1)
    142 #define TDB_RECOVERY_HEAD offsetof(struct tdb_header, recovery_start)
    143 #define TDB_SEQNUM_OFS    offsetof(struct tdb_header, sequence_number)
    144 #define TDB_PAD_BYTE 0x42
    145 #define TDB_PAD_U32  0x42424242
    146 
    147 /* NB assumes there is a local variable called "tdb" that is the
    148  * current context, also takes doubly-parenthesized print-style
    149  * argument. */
    150 #define TDB_LOG(x) tdb->log.log_fn x
    151 
    152 /* lock offsets */
    153 #define GLOBAL_LOCK      0
    154 #define ACTIVE_LOCK      4
    155 #define TRANSACTION_LOCK 8
    156 
    157 /* free memory if the pointer is valid and zero the pointer */
    158 #ifndef SAFE_FREE
    159 #define SAFE_FREE(x) do { if ((x) != NULL) {free(x); (x)=NULL;} } while(0)
    160 #endif
    161 
    162 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
    163 
    164 #define DOCONV() (tdb->flags & TDB_CONVERT)
    165 #define CONVERT(x) (DOCONV() ? tdb_convert(&x, sizeof(x)) : &x)
    166 
    167 
    168 /* the body of the database is made of one list_struct for the free space
    169    plus a separate data list for each hash value */
    170 struct list_struct {
    171 	tdb_off_t next; /* offset of the next record in the list */
    172 	tdb_len_t rec_len; /* total byte length of record */
    173 	tdb_len_t key_len; /* byte length of key */
    174 	tdb_len_t data_len; /* byte length of data */
    175 	u32 full_hash; /* the full 32 bit hash of the key */
    176 	u32 magic;   /* try to catch errors */
    177 	/* the following union is implied:
    178 		union {
    179 			char record[rec_len];
    180 			struct {
    181 				char key[key_len];
    182 				char data[data_len];
    183 			}
    184 			u32 totalsize; (tailer)
    185 		}
    186 	*/
    187 };
    188 
    189 
    190 /* this is stored at the front of every database */
    191 struct tdb_header {
    192 	char magic_food[32]; /* for /etc/magic */
    193 	u32 version; /* version of the code */
    194 	u32 hash_size; /* number of hash entries */
    195 	tdb_off_t rwlocks; /* obsolete - kept to detect old formats */
    196 	tdb_off_t recovery_start; /* offset of transaction recovery region */
    197 	tdb_off_t sequence_number; /* used when TDB_SEQNUM is set */
    198 	tdb_off_t reserved[29];
    199 };
    200 
    201 struct tdb_lock_type {
    202 	int list;
    203 	u32 count;
    204 	u32 ltype;
    205 };
    206 
    207 struct tdb_traverse_lock {
    208 	struct tdb_traverse_lock *next;
    209 	u32 off;
    210 	u32 hash;
    211 	int lock_rw;
    212 };
    213 
    214 
    215 struct tdb_methods {
    216 	int (*tdb_read)(struct tdb_context *, tdb_off_t , void *, tdb_len_t , int );
    217 	int (*tdb_write)(struct tdb_context *, tdb_off_t, const void *, tdb_len_t);
    218 	void (*next_hash_chain)(struct tdb_context *, u32 *);
    219 	int (*tdb_oob)(struct tdb_context *, tdb_off_t , int );
    220 	int (*tdb_expand_file)(struct tdb_context *, tdb_off_t , tdb_off_t );
    221 	int (*tdb_brlock)(struct tdb_context *, tdb_off_t , int, int, int, size_t);
    222 };
    223 
    224 struct tdb_context {
    225 	char *name; /* the name of the database */
    226 	void *map_ptr; /* where it is currently mapped */
    227 	int fd; /* open file descriptor for the database */
    228 	tdb_len_t map_size; /* how much space has been mapped */
    229 	int read_only; /* opened read-only */
    230 	int traverse_read; /* read-only traversal */
    231 	struct tdb_lock_type global_lock;
    232 	int num_lockrecs;
    233 	struct tdb_lock_type *lockrecs; /* only real locks, all with count>0 */
    234 	enum TDB_ERROR ecode; /* error code for last tdb error */
    235 	struct tdb_header header; /* a cached copy of the header */
    236 	u32 flags; /* the flags passed to tdb_open */
    237 	struct tdb_traverse_lock travlocks; /* current traversal locks */
    238 	struct tdb_context *next; /* all tdbs to avoid multiple opens */
    239 	dev_t device;	/* uniquely identifies this tdb */
    240 	ino_t inode;	/* uniquely identifies this tdb */
    241 	struct tdb_logging_context log;
    242 	unsigned int (*hash_fn)(TDB_DATA *key);
    243 	int open_flags; /* flags used in the open - needed by reopen */
    244 	unsigned int num_locks; /* number of chain locks held */
    245 	const struct tdb_methods *methods;
    246 	struct tdb_transaction *transaction;
    247 	int page_size;
    248 	int max_dead_records;
    249 	bool have_transaction_lock;
    250 };
    251 
    252 
    253 /*
    254   internal prototypes
    255 */
    256 static int tdb_munmap(struct tdb_context *tdb);
    257 static void tdb_mmap(struct tdb_context *tdb);
    258 static int tdb_lock(struct tdb_context *tdb, int list, int ltype);
    259 static int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
    260 static int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset, int rw_type, int lck_type, int probe, size_t len);
    261 static int tdb_transaction_lock(struct tdb_context *tdb, int ltype);
    262 static int tdb_transaction_unlock(struct tdb_context *tdb);
    263 static int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len);
    264 static int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off);
    265 static int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off);
    266 static int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
    267 static int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
    268 static void *tdb_convert(void *buf, u32 size);
    269 static int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
    270 static tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_struct *rec);
    271 static int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
    272 static int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
    273 static int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off);
    274 static int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off);
    275 static int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
    276 static int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
    277 static int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct *rec);
    278 static unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len);
    279 static int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
    280 		   tdb_off_t offset, tdb_len_t len,
    281 		   int (*parser)(TDB_DATA key, TDB_DATA data,
    282 				 void *private_data),
    283 		   void *private_data);
    284 static tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
    285 			   struct list_struct *rec);
    286 static void tdb_io_init(struct tdb_context *tdb);
    287 static int tdb_expand(struct tdb_context *tdb, tdb_off_t size);
    288 static int tdb_rec_free_read(struct tdb_context *tdb, tdb_off_t off,
    289 		      struct list_struct *rec);
    290 
    291 
    292 /* file: error.c */
    293 
    294 enum TDB_ERROR tdb_error(struct tdb_context *tdb)
    295 {
    296 	return tdb->ecode;
    297 }
    298 
    299 static struct tdb_errname {
    300 	enum TDB_ERROR ecode; const char *estring;
    301 } emap[] = { {TDB_SUCCESS, "Success"},
    302 	     {TDB_ERR_CORRUPT, "Corrupt database"},
    303 	     {TDB_ERR_IO, "IO Error"},
    304 	     {TDB_ERR_LOCK, "Locking error"},
    305 	     {TDB_ERR_OOM, "Out of memory"},
    306 	     {TDB_ERR_EXISTS, "Record exists"},
    307 	     {TDB_ERR_NOLOCK, "Lock exists on other keys"},
    308 	     {TDB_ERR_EINVAL, "Invalid parameter"},
    309 	     {TDB_ERR_NOEXIST, "Record does not exist"},
    310 	     {TDB_ERR_RDONLY, "write not permitted"} };
    311 
    312 /* Error string for the last tdb error */
    313 const char *tdb_errorstr(struct tdb_context *tdb)
    314 {
    315 	u32 i;
    316 	for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
    317 		if (tdb->ecode == emap[i].ecode)
    318 			return emap[i].estring;
    319 	return "Invalid error code";
    320 }
    321 
    322 /* file: lock.c */
    323 
    324 #define TDB_MARK_LOCK 0x80000000
    325 
    326 /* a byte range locking function - return 0 on success
    327    this functions locks/unlocks 1 byte at the specified offset.
    328 
    329    On error, errno is also set so that errors are passed back properly
    330    through tdb_open().
    331 
    332    note that a len of zero means lock to end of file
    333 */
    334 int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset,
    335 	       int rw_type, int lck_type, int probe, size_t len)
    336 {
    337 	struct flock fl;
    338 	int ret;
    339 
    340 	if (tdb->flags & TDB_NOLOCK) {
    341 		return 0;
    342 	}
    343 
    344 	if ((rw_type == F_WRLCK) && (tdb->read_only || tdb->traverse_read)) {
    345 		tdb->ecode = TDB_ERR_RDONLY;
    346 		return -1;
    347 	}
    348 
    349 	fl.l_type = rw_type;
    350 	fl.l_whence = SEEK_SET;
    351 	fl.l_start = offset;
    352 	fl.l_len = len;
    353 	fl.l_pid = 0;
    354 
    355 	do {
    356 		ret = fcntl(tdb->fd,lck_type,&fl);
    357 	} while (ret == -1 && errno == EINTR);
    358 
    359 	if (ret == -1) {
    360 		/* Generic lock error. errno set by fcntl.
    361 		 * EAGAIN is an expected return from non-blocking
    362 		 * locks. */
    363 		if (!probe && lck_type != F_SETLK) {
    364 			/* Ensure error code is set for log fun to examine. */
    365 			tdb->ecode = TDB_ERR_LOCK;
    366 			TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d len=%d\n",
    367 				 tdb->fd, offset, rw_type, lck_type, (int)len));
    368 		}
    369 		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
    370 	}
    371 	return 0;
    372 }
    373 
    374 
    375 /*
    376   upgrade a read lock to a write lock. This needs to be handled in a
    377   special way as some OSes (such as solaris) have too conservative
    378   deadlock detection and claim a deadlock when progress can be
    379   made. For those OSes we may loop for a while.
    380 */
    381 int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len)
    382 {
    383 	int count = 1000;
    384 	while (count--) {
    385 		struct timeval tv;
    386 		if (tdb_brlock(tdb, offset, F_WRLCK, F_SETLKW, 1, len) == 0) {
    387 			return 0;
    388 		}
    389 		if (errno != EDEADLK) {
    390 			break;
    391 		}
    392 		/* sleep for as short a time as we can - more portable than usleep() */
    393 		tv.tv_sec = 0;
    394 		tv.tv_usec = 1;
    395 		select(0, NULL, NULL, NULL, &tv);
    396 	}
    397 	TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock_upgrade failed at offset %d\n", offset));
    398 	return -1;
    399 }
    400 
    401 
    402 /* lock a list in the database. list -1 is the alloc list */
    403 static int _tdb_lock(struct tdb_context *tdb, int list, int ltype, int op)
    404 {
    405 	struct tdb_lock_type *new_lck;
    406 	int i;
    407 	bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
    408 
    409 	ltype &= ~TDB_MARK_LOCK;
    410 
    411 	/* a global lock allows us to avoid per chain locks */
    412 	if (tdb->global_lock.count &&
    413 	    (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
    414 		return 0;
    415 	}
    416 
    417 	if (tdb->global_lock.count) {
    418 		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
    419 	}
    420 
    421 	if (list < -1 || list >= (int)tdb->header.hash_size) {
    422 		TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_lock: invalid list %d for ltype=%d\n",
    423 			   list, ltype));
    424 		return -1;
    425 	}
    426 	if (tdb->flags & TDB_NOLOCK)
    427 		return 0;
    428 
    429 	for (i=0; i<tdb->num_lockrecs; i++) {
    430 		if (tdb->lockrecs[i].list == list) {
    431 			if (tdb->lockrecs[i].count == 0) {
    432 				/*
    433 				 * Can't happen, see tdb_unlock(). It should
    434 				 * be an assert.
    435 				 */
    436 				TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock: "
    437 					 "lck->count == 0 for list %d", list));
    438 			}
    439 			/*
    440 			 * Just increment the in-memory struct, posix locks
    441 			 * don't stack.
    442 			 */
    443 			tdb->lockrecs[i].count++;
    444 			return 0;
    445 		}
    446 	}
    447 
    448 	new_lck = (struct tdb_lock_type *)realloc(
    449 		tdb->lockrecs,
    450 		sizeof(*tdb->lockrecs) * (tdb->num_lockrecs+1));
    451 	if (new_lck == NULL) {
    452 		errno = ENOMEM;
    453 		return -1;
    454 	}
    455 	tdb->lockrecs = new_lck;
    456 
    457 	/* Since fcntl locks don't nest, we do a lock for the first one,
    458 	   and simply bump the count for future ones */
    459 	if (!mark_lock &&
    460 	    tdb->methods->tdb_brlock(tdb,FREELIST_TOP+4*list, ltype, op,
    461 				     0, 1)) {
    462 		return -1;
    463 	}
    464 
    465 	tdb->num_locks++;
    466 
    467 	tdb->lockrecs[tdb->num_lockrecs].list = list;
    468 	tdb->lockrecs[tdb->num_lockrecs].count = 1;
    469 	tdb->lockrecs[tdb->num_lockrecs].ltype = ltype;
    470 	tdb->num_lockrecs += 1;
    471 
    472 	return 0;
    473 }
    474 
    475 /* lock a list in the database. list -1 is the alloc list */
    476 int tdb_lock(struct tdb_context *tdb, int list, int ltype)
    477 {
    478 	int ret;
    479 	ret = _tdb_lock(tdb, list, ltype, F_SETLKW);
    480 	if (ret) {
    481 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock failed on list %d "
    482 			 "ltype=%d (%s)\n",  list, ltype, strerror(errno)));
    483 	}
    484 	return ret;
    485 }
    486 
    487 /* lock a list in the database. list -1 is the alloc list. non-blocking lock */
    488 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype)
    489 {
    490 	return _tdb_lock(tdb, list, ltype, F_SETLK);
    491 }
    492 
    493 
    494 /* unlock the database: returns void because it's too late for errors. */
    495 	/* changed to return int it may be interesting to know there
    496 	   has been an error  --simo */
    497 int tdb_unlock(struct tdb_context *tdb, int list, int ltype)
    498 {
    499 	int ret = -1;
    500 	int i;
    501 	struct tdb_lock_type *lck = NULL;
    502 	bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
    503 
    504 	ltype &= ~TDB_MARK_LOCK;
    505 
    506 	/* a global lock allows us to avoid per chain locks */
    507 	if (tdb->global_lock.count &&
    508 	    (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
    509 		return 0;
    510 	}
    511 
    512 	if (tdb->global_lock.count) {
    513 		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
    514 	}
    515 
    516 	if (tdb->flags & TDB_NOLOCK)
    517 		return 0;
    518 
    519 	/* Sanity checks */
    520 	if (list < -1 || list >= (int)tdb->header.hash_size) {
    521 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
    522 		return ret;
    523 	}
    524 
    525 	for (i=0; i<tdb->num_lockrecs; i++) {
    526 		if (tdb->lockrecs[i].list == list) {
    527 			lck = &tdb->lockrecs[i];
    528 			break;
    529 		}
    530 	}
    531 
    532 	if ((lck == NULL) || (lck->count == 0)) {
    533 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: count is 0\n"));
    534 		return -1;
    535 	}
    536 
    537 	if (lck->count > 1) {
    538 		lck->count--;
    539 		return 0;
    540 	}
    541 
    542 	/*
    543 	 * This lock has count==1 left, so we need to unlock it in the
    544 	 * kernel. We don't bother with decrementing the in-memory array
    545 	 * element, we're about to overwrite it with the last array element
    546 	 * anyway.
    547 	 */
    548 
    549 	if (mark_lock) {
    550 		ret = 0;
    551 	} else {
    552 		ret = tdb->methods->tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK,
    553 					       F_SETLKW, 0, 1);
    554 	}
    555 	tdb->num_locks--;
    556 
    557 	/*
    558 	 * Shrink the array by overwriting the element just unlocked with the
    559 	 * last array element.
    560 	 */
    561 
    562 	if (tdb->num_lockrecs > 1) {
    563 		*lck = tdb->lockrecs[tdb->num_lockrecs-1];
    564 	}
    565 	tdb->num_lockrecs -= 1;
    566 
    567 	/*
    568 	 * We don't bother with realloc when the array shrinks, but if we have
    569 	 * a completely idle tdb we should get rid of the locked array.
    570 	 */
    571 
    572 	if (tdb->num_lockrecs == 0) {
    573 		SAFE_FREE(tdb->lockrecs);
    574 	}
    575 
    576 	if (ret)
    577 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: An error occurred unlocking!\n"));
    578 	return ret;
    579 }
    580 
    581 /*
    582   get the transaction lock
    583  */
    584 int tdb_transaction_lock(struct tdb_context *tdb, int ltype)
    585 {
    586 	if (tdb->have_transaction_lock || tdb->global_lock.count) {
    587 		return 0;
    588 	}
    589 	if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, ltype,
    590 				     F_SETLKW, 0, 1) == -1) {
    591 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_lock: failed to get transaction lock\n"));
    592 		tdb->ecode = TDB_ERR_LOCK;
    593 		return -1;
    594 	}
    595 	tdb->have_transaction_lock = 1;
    596 	return 0;
    597 }
    598 
    599 /*
    600   release the transaction lock
    601  */
    602 int tdb_transaction_unlock(struct tdb_context *tdb)
    603 {
    604 	int ret;
    605 	if (!tdb->have_transaction_lock) {
    606 		return 0;
    607 	}
    608 	ret = tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
    609 	if (ret == 0) {
    610 		tdb->have_transaction_lock = 0;
    611 	}
    612 	return ret;
    613 }
    614 
    615 
    616 
    617 
    618 /* lock/unlock entire database */
    619 static int _tdb_lockall(struct tdb_context *tdb, int ltype, int op)
    620 {
    621 	bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
    622 
    623 	ltype &= ~TDB_MARK_LOCK;
    624 
    625 	/* There are no locks on read-only dbs */
    626 	if (tdb->read_only || tdb->traverse_read)
    627 		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
    628 
    629 	if (tdb->global_lock.count && tdb->global_lock.ltype == ltype) {
    630 		tdb->global_lock.count++;
    631 		return 0;
    632 	}
    633 
    634 	if (tdb->global_lock.count) {
    635 		/* a global lock of a different type exists */
    636 		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
    637 	}
    638 
    639 	if (tdb->num_locks != 0) {
    640 		/* can't combine global and chain locks */
    641 		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
    642 	}
    643 
    644 	if (!mark_lock &&
    645 	    tdb->methods->tdb_brlock(tdb, FREELIST_TOP, ltype, op,
    646 				     0, 4*tdb->header.hash_size)) {
    647 		if (op == F_SETLKW) {
    648 			TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lockall failed (%s)\n", strerror(errno)));
    649 		}
    650 		return -1;
    651 	}
    652 
    653 	tdb->global_lock.count = 1;
    654 	tdb->global_lock.ltype = ltype;
    655 
    656 	return 0;
    657 }
    658 
    659 
    660 
    661 /* unlock entire db */
    662 static int _tdb_unlockall(struct tdb_context *tdb, int ltype)
    663 {
    664 	bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
    665 
    666 	ltype &= ~TDB_MARK_LOCK;
    667 
    668 	/* There are no locks on read-only dbs */
    669 	if (tdb->read_only || tdb->traverse_read) {
    670 		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
    671 	}
    672 
    673 	if (tdb->global_lock.ltype != ltype || tdb->global_lock.count == 0) {
    674 		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
    675 	}
    676 
    677 	if (tdb->global_lock.count > 1) {
    678 		tdb->global_lock.count--;
    679 		return 0;
    680 	}
    681 
    682 	if (!mark_lock &&
    683 	    tdb->methods->tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW,
    684 				     0, 4*tdb->header.hash_size)) {
    685 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlockall failed (%s)\n", strerror(errno)));
    686 		return -1;
    687 	}
    688 
    689 	tdb->global_lock.count = 0;
    690 	tdb->global_lock.ltype = 0;
    691 
    692 	return 0;
    693 }
    694 
    695 /* lock entire database with write lock */
    696 int tdb_lockall(struct tdb_context *tdb)
    697 {
    698 	return _tdb_lockall(tdb, F_WRLCK, F_SETLKW);
    699 }
    700 
    701 /* lock entire database with write lock - mark only */
    702 int tdb_lockall_mark(struct tdb_context *tdb)
    703 {
    704 	return _tdb_lockall(tdb, F_WRLCK | TDB_MARK_LOCK, F_SETLKW);
    705 }
    706 
    707 /* unlock entire database with write lock - unmark only */
    708 int tdb_lockall_unmark(struct tdb_context *tdb)
    709 {
    710 	return _tdb_unlockall(tdb, F_WRLCK | TDB_MARK_LOCK);
    711 }
    712 
    713 /* lock entire database with write lock - nonblocking varient */
    714 int tdb_lockall_nonblock(struct tdb_context *tdb)
    715 {
    716 	return _tdb_lockall(tdb, F_WRLCK, F_SETLK);
    717 }
    718 
    719 /* unlock entire database with write lock */
    720 int tdb_unlockall(struct tdb_context *tdb)
    721 {
    722 	return _tdb_unlockall(tdb, F_WRLCK);
    723 }
    724 
    725 /* lock entire database with read lock */
    726 int tdb_lockall_read(struct tdb_context *tdb)
    727 {
    728 	return _tdb_lockall(tdb, F_RDLCK, F_SETLKW);
    729 }
    730 
    731 /* lock entire database with read lock - nonblock varient */
    732 int tdb_lockall_read_nonblock(struct tdb_context *tdb)
    733 {
    734 	return _tdb_lockall(tdb, F_RDLCK, F_SETLK);
    735 }
    736 
    737 /* unlock entire database with read lock */
    738 int tdb_unlockall_read(struct tdb_context *tdb)
    739 {
    740 	return _tdb_unlockall(tdb, F_RDLCK);
    741 }
    742 
    743 /* lock/unlock one hash chain. This is meant to be used to reduce
    744    contention - it cannot guarantee how many records will be locked */
    745 int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
    746 {
    747 	return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
    748 }
    749 
    750 /* lock/unlock one hash chain, non-blocking. This is meant to be used
    751    to reduce contention - it cannot guarantee how many records will be
    752    locked */
    753 int tdb_chainlock_nonblock(struct tdb_context *tdb, TDB_DATA key)
    754 {
    755 	return tdb_lock_nonblock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
    756 }
    757 
    758 /* mark a chain as locked without actually locking it. Warning! use with great caution! */
    759 int tdb_chainlock_mark(struct tdb_context *tdb, TDB_DATA key)
    760 {
    761 	return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
    762 }
    763 
    764 /* unmark a chain as locked without actually locking it. Warning! use with great caution! */
    765 int tdb_chainlock_unmark(struct tdb_context *tdb, TDB_DATA key)
    766 {
    767 	return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
    768 }
    769 
    770 int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
    771 {
    772 	return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
    773 }
    774 
    775 int tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
    776 {
    777 	return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
    778 }
    779 
    780 int tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
    781 {
    782 	return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
    783 }
    784 
    785 
    786 
    787 /* record lock stops delete underneath */
    788 int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off)
    789 {
    790 	return off ? tdb->methods->tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0, 1) : 0;
    791 }
    792 
    793 /*
    794   Write locks override our own fcntl readlocks, so check it here.
    795   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
    796   an error to fail to get the lock here.
    797 */
    798 int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off)
    799 {
    800 	struct tdb_traverse_lock *i;
    801 	for (i = &tdb->travlocks; i; i = i->next)
    802 		if (i->off == off)
    803 			return -1;
    804 	return tdb->methods->tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1, 1);
    805 }
    806 
    807 /*
    808   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
    809   an error to fail to get the lock here.
    810 */
    811 int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off)
    812 {
    813 	return tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0, 1);
    814 }
    815 
    816 /* fcntl locks don't stack: avoid unlocking someone else's */
    817 int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off)
    818 {
    819 	struct tdb_traverse_lock *i;
    820 	u32 count = 0;
    821 
    822 	if (off == 0)
    823 		return 0;
    824 	for (i = &tdb->travlocks; i; i = i->next)
    825 		if (i->off == off)
    826 			count++;
    827 	return (count == 1 ? tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0, 1) : 0);
    828 }
    829 
    830 /* file: io.c */
    831 
    832 /* check for an out of bounds access - if it is out of bounds then
    833    see if the database has been expanded by someone else and expand
    834    if necessary
    835    note that "len" is the minimum length needed for the db
    836 */
    837 static int tdb_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
    838 {
    839 	struct stat st;
    840 	if (len <= tdb->map_size)
    841 		return 0;
    842 	if (tdb->flags & TDB_INTERNAL) {
    843 		if (!probe) {
    844 			/* Ensure ecode is set for log fn. */
    845 			tdb->ecode = TDB_ERR_IO;
    846 			TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_oob len %d beyond internal malloc size %d\n",
    847 				 (int)len, (int)tdb->map_size));
    848 		}
    849 		return TDB_ERRCODE(TDB_ERR_IO, -1);
    850 	}
    851 
    852 	if (fstat(tdb->fd, &st) == -1) {
    853 		return TDB_ERRCODE(TDB_ERR_IO, -1);
    854 	}
    855 
    856 	if (st.st_size < (size_t)len) {
    857 		if (!probe) {
    858 			/* Ensure ecode is set for log fn. */
    859 			tdb->ecode = TDB_ERR_IO;
    860 			TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_oob len %d beyond eof at %d\n",
    861 				 (int)len, (int)st.st_size));
    862 		}
    863 		return TDB_ERRCODE(TDB_ERR_IO, -1);
    864 	}
    865 
    866 	/* Unmap, update size, remap */
    867 	if (tdb_munmap(tdb) == -1)
    868 		return TDB_ERRCODE(TDB_ERR_IO, -1);
    869 	tdb->map_size = st.st_size;
    870 	tdb_mmap(tdb);
    871 	return 0;
    872 }
    873 
    874 /* write a lump of data at a specified offset */
    875 static int tdb_write(struct tdb_context *tdb, tdb_off_t off,
    876 		     const void *buf, tdb_len_t len)
    877 {
    878 	if (len == 0) {
    879 		return 0;
    880 	}
    881 
    882 	if (tdb->read_only || tdb->traverse_read) {
    883 		tdb->ecode = TDB_ERR_RDONLY;
    884 		return -1;
    885 	}
    886 
    887 	if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0)
    888 		return -1;
    889 
    890 	if (tdb->map_ptr) {
    891 		memcpy(off + (char *)tdb->map_ptr, buf, len);
    892 	} else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
    893 		/* Ensure ecode is set for log fn. */
    894 		tdb->ecode = TDB_ERR_IO;
    895 		TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_write failed at %d len=%d (%s)\n",
    896 			   off, len, strerror(errno)));
    897 		return TDB_ERRCODE(TDB_ERR_IO, -1);
    898 	}
    899 	return 0;
    900 }
    901 
    902 /* Endian conversion: we only ever deal with 4 byte quantities */
    903 void *tdb_convert(void *buf, u32 size)
    904 {
    905 	u32 i, *p = (u32 *)buf;
    906 	for (i = 0; i < size / 4; i++)
    907 		p[i] = TDB_BYTEREV(p[i]);
    908 	return buf;
    909 }
    910 
    911 
    912 /* read a lump of data at a specified offset, maybe convert */
    913 static int tdb_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
    914 		    tdb_len_t len, int cv)
    915 {
    916 	if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0) {
    917 		return -1;
    918 	}
    919 
    920 	if (tdb->map_ptr) {
    921 		memcpy(buf, off + (char *)tdb->map_ptr, len);
    922 	} else {
    923 		ssize_t ret = pread(tdb->fd, buf, len, off);
    924 		if (ret != (ssize_t)len) {
    925 			/* Ensure ecode is set for log fn. */
    926 			tdb->ecode = TDB_ERR_IO;
    927 			TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_read failed at %d "
    928 				 "len=%d ret=%d (%s) map_size=%d\n",
    929 				 (int)off, (int)len, (int)ret, strerror(errno),
    930 				 (int)tdb->map_size));
    931 			return TDB_ERRCODE(TDB_ERR_IO, -1);
    932 		}
    933 	}
    934 	if (cv) {
    935 		tdb_convert(buf, len);
    936 	}
    937 	return 0;
    938 }
    939 
    940 
    941 
    942 /*
    943   do an unlocked scan of the hash table heads to find the next non-zero head. The value
    944   will then be confirmed with the lock held
    945 */
    946 static void tdb_next_hash_chain(struct tdb_context *tdb, u32 *chain)
    947 {
    948 	u32 h = *chain;
    949 	if (tdb->map_ptr) {
    950 		for (;h < tdb->header.hash_size;h++) {
    951 			if (0 != *(u32 *)(TDB_HASH_TOP(h) + (unsigned char *)tdb->map_ptr)) {
    952 				break;
    953 			}
    954 		}
    955 	} else {
    956 		u32 off=0;
    957 		for (;h < tdb->header.hash_size;h++) {
    958 			if (tdb_ofs_read(tdb, TDB_HASH_TOP(h), &off) != 0 || off != 0) {
    959 				break;
    960 			}
    961 		}
    962 	}
    963 	(*chain) = h;
    964 }
    965 
    966 
    967 int tdb_munmap(struct tdb_context *tdb)
    968 {
    969 	if (tdb->flags & TDB_INTERNAL)
    970 		return 0;
    971 
    972 #ifdef HAVE_MMAP
    973 	if (tdb->map_ptr) {
    974 		int ret = munmap(tdb->map_ptr, tdb->map_size);
    975 		if (ret != 0)
    976 			return ret;
    977 	}
    978 #endif
    979 	tdb->map_ptr = NULL;
    980 	return 0;
    981 }
    982 
    983 void tdb_mmap(struct tdb_context *tdb)
    984 {
    985 	if (tdb->flags & TDB_INTERNAL)
    986 		return;
    987 
    988 #ifdef HAVE_MMAP
    989 	if (!(tdb->flags & TDB_NOMMAP)) {
    990 		tdb->map_ptr = mmap(NULL, tdb->map_size,
    991 				    PROT_READ|(tdb->read_only? 0:PROT_WRITE),
    992 				    MAP_SHARED|MAP_FILE, tdb->fd, 0);
    993 
    994 		/*
    995 		 * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
    996 		 */
    997 
    998 		if (tdb->map_ptr == MAP_FAILED) {
    999 			tdb->map_ptr = NULL;
   1000 			TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_mmap failed for size %d (%s)\n",
   1001 				 tdb->map_size, strerror(errno)));
   1002 		}
   1003 	} else {
   1004 		tdb->map_ptr = NULL;
   1005 	}
   1006 #else
   1007 	tdb->map_ptr = NULL;
   1008 #endif
   1009 }
   1010 
   1011 /* expand a file.  we prefer to use ftruncate, as that is what posix
   1012   says to use for mmap expansion */
   1013 static int tdb_expand_file(struct tdb_context *tdb, tdb_off_t size, tdb_off_t addition)
   1014 {
   1015 	char buf[1024];
   1016 
   1017 	if (tdb->read_only || tdb->traverse_read) {
   1018 		tdb->ecode = TDB_ERR_RDONLY;
   1019 		return -1;
   1020 	}
   1021 
   1022 	if (ftruncate(tdb->fd, size+addition) == -1) {
   1023 		char b = 0;
   1024 		if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
   1025 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file to %d failed (%s)\n",
   1026 				 size+addition, strerror(errno)));
   1027 			return -1;
   1028 		}
   1029 	}
   1030 
   1031 	/* now fill the file with something. This ensures that the
   1032 	   file isn't sparse, which would be very bad if we ran out of
   1033 	   disk. This must be done with write, not via mmap */
   1034 	memset(buf, TDB_PAD_BYTE, sizeof(buf));
   1035 	while (addition) {
   1036 		int n = addition>sizeof(buf)?sizeof(buf):addition;
   1037 		int ret = pwrite(tdb->fd, buf, n, size);
   1038 		if (ret != n) {
   1039 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file write of %d failed (%s)\n",
   1040 				   n, strerror(errno)));
   1041 			return -1;
   1042 		}
   1043 		addition -= n;
   1044 		size += n;
   1045 	}
   1046 	return 0;
   1047 }
   1048 
   1049 
   1050 /* expand the database at least size bytes by expanding the underlying
   1051    file and doing the mmap again if necessary */
   1052 int tdb_expand(struct tdb_context *tdb, tdb_off_t size)
   1053 {
   1054 	struct list_struct rec;
   1055 	tdb_off_t offset;
   1056 
   1057 	if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
   1058 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "lock failed in tdb_expand\n"));
   1059 		return -1;
   1060 	}
   1061 
   1062 	/* must know about any previous expansions by another process */
   1063 	tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
   1064 
   1065 	/* always make room for at least 10 more records, and round
   1066            the database up to a multiple of the page size */
   1067 	size = TDB_ALIGN(tdb->map_size + size*10, tdb->page_size) - tdb->map_size;
   1068 
   1069 	if (!(tdb->flags & TDB_INTERNAL))
   1070 		tdb_munmap(tdb);
   1071 
   1072 	/*
   1073 	 * We must ensure the file is unmapped before doing this
   1074 	 * to ensure consistency with systems like OpenBSD where
   1075 	 * writes and mmaps are not consistent.
   1076 	 */
   1077 
   1078 	/* expand the file itself */
   1079 	if (!(tdb->flags & TDB_INTERNAL)) {
   1080 		if (tdb->methods->tdb_expand_file(tdb, tdb->map_size, size) != 0)
   1081 			goto fail;
   1082 	}
   1083 
   1084 	tdb->map_size += size;
   1085 
   1086 	if (tdb->flags & TDB_INTERNAL) {
   1087 		char *new_map_ptr = (char *)realloc(tdb->map_ptr,
   1088 						    tdb->map_size);
   1089 		if (!new_map_ptr) {
   1090 			tdb->map_size -= size;
   1091 			goto fail;
   1092 		}
   1093 		tdb->map_ptr = new_map_ptr;
   1094 	} else {
   1095 		/*
   1096 		 * We must ensure the file is remapped before adding the space
   1097 		 * to ensure consistency with systems like OpenBSD where
   1098 		 * writes and mmaps are not consistent.
   1099 		 */
   1100 
   1101 		/* We're ok if the mmap fails as we'll fallback to read/write */
   1102 		tdb_mmap(tdb);
   1103 	}
   1104 
   1105 	/* form a new freelist record */
   1106 	memset(&rec,'\0',sizeof(rec));
   1107 	rec.rec_len = size - sizeof(rec);
   1108 
   1109 	/* link it into the free list */
   1110 	offset = tdb->map_size - size;
   1111 	if (tdb_free(tdb, offset, &rec) == -1)
   1112 		goto fail;
   1113 
   1114 	tdb_unlock(tdb, -1, F_WRLCK);
   1115 	return 0;
   1116  fail:
   1117 	tdb_unlock(tdb, -1, F_WRLCK);
   1118 	return -1;
   1119 }
   1120 
   1121 /* read/write a tdb_off_t */
   1122 int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
   1123 {
   1124 	return tdb->methods->tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
   1125 }
   1126 
   1127 int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
   1128 {
   1129 	tdb_off_t off = *d;
   1130 	return tdb->methods->tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
   1131 }
   1132 
   1133 
   1134 /* read a lump of data, allocating the space for it */
   1135 unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len)
   1136 {
   1137 	unsigned char *buf;
   1138 
   1139 	/* some systems don't like zero length malloc */
   1140 	if (len == 0) {
   1141 		len = 1;
   1142 	}
   1143 
   1144 	if (!(buf = (unsigned char *)malloc(len))) {
   1145 		/* Ensure ecode is set for log fn. */
   1146 		tdb->ecode = TDB_ERR_OOM;
   1147 		TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_alloc_read malloc failed len=%d (%s)\n",
   1148 			   len, strerror(errno)));
   1149 		return TDB_ERRCODE(TDB_ERR_OOM, buf);
   1150 	}
   1151 	if (tdb->methods->tdb_read(tdb, offset, buf, len, 0) == -1) {
   1152 		SAFE_FREE(buf);
   1153 		return NULL;
   1154 	}
   1155 	return buf;
   1156 }
   1157 
   1158 /* Give a piece of tdb data to a parser */
   1159 
   1160 int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
   1161 		   tdb_off_t offset, tdb_len_t len,
   1162 		   int (*parser)(TDB_DATA key, TDB_DATA data,
   1163 				 void *private_data),
   1164 		   void *private_data)
   1165 {
   1166 	TDB_DATA data;
   1167 	int result;
   1168 
   1169 	data.dsize = len;
   1170 
   1171 	if ((tdb->transaction == NULL) && (tdb->map_ptr != NULL)) {
   1172 		/*
   1173 		 * Optimize by avoiding the malloc/memcpy/free, point the
   1174 		 * parser directly at the mmap area.
   1175 		 */
   1176 		if (tdb->methods->tdb_oob(tdb, offset+len, 0) != 0) {
   1177 			return -1;
   1178 		}
   1179 		data.dptr = offset + (unsigned char *)tdb->map_ptr;
   1180 		return parser(key, data, private_data);
   1181 	}
   1182 
   1183 	if (!(data.dptr = tdb_alloc_read(tdb, offset, len))) {
   1184 		return -1;
   1185 	}
   1186 
   1187 	result = parser(key, data, private_data);
   1188 	free(data.dptr);
   1189 	return result;
   1190 }
   1191 
   1192 /* read/write a record */
   1193 int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
   1194 {
   1195 	if (tdb->methods->tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
   1196 		return -1;
   1197 	if (TDB_BAD_MAGIC(rec)) {
   1198 		/* Ensure ecode is set for log fn. */
   1199 		tdb->ecode = TDB_ERR_CORRUPT;
   1200 		TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
   1201 		return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
   1202 	}
   1203 	return tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0);
   1204 }
   1205 
   1206 int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
   1207 {
   1208 	struct list_struct r = *rec;
   1209 	return tdb->methods->tdb_write(tdb, offset, CONVERT(r), sizeof(r));
   1210 }
   1211 
   1212 static const struct tdb_methods io_methods = {
   1213 	tdb_read,
   1214 	tdb_write,
   1215 	tdb_next_hash_chain,
   1216 	tdb_oob,
   1217 	tdb_expand_file,
   1218 	tdb_brlock
   1219 };
   1220 
   1221 /*
   1222   initialise the default methods table
   1223 */
   1224 void tdb_io_init(struct tdb_context *tdb)
   1225 {
   1226 	tdb->methods = &io_methods;
   1227 }
   1228 
   1229 /* file: transaction.c */
   1230 
   1231 /*
   1232   transaction design:
   1233 
   1234   - only allow a single transaction at a time per database. This makes
   1235     using the transaction API simpler, as otherwise the caller would
   1236     have to cope with temporary failures in transactions that conflict
   1237     with other current transactions
   1238 
   1239   - keep the transaction recovery information in the same file as the
   1240     database, using a special 'transaction recovery' record pointed at
   1241     by the header. This removes the need for extra journal files as
   1242     used by some other databases
   1243 
   1244   - dynamically allocated the transaction recover record, re-using it
   1245     for subsequent transactions. If a larger record is needed then
   1246     tdb_free() the old record to place it on the normal tdb freelist
   1247     before allocating the new record
   1248 
   1249   - during transactions, keep a linked list of writes all that have
   1250     been performed by intercepting all tdb_write() calls. The hooked
   1251     transaction versions of tdb_read() and tdb_write() check this
   1252     linked list and try to use the elements of the list in preference
   1253     to the real database.
   1254 
   1255   - don't allow any locks to be held when a transaction starts,
   1256     otherwise we can end up with deadlock (plus lack of lock nesting
   1257     in posix locks would mean the lock is lost)
   1258 
   1259   - if the caller gains a lock during the transaction but doesn't
   1260     release it then fail the commit
   1261 
   1262   - allow for nested calls to tdb_transaction_start(), re-using the
   1263     existing transaction record. If the inner transaction is cancelled
   1264     then a subsequent commit will fail
   1265 
   1266   - keep a mirrored copy of the tdb hash chain heads to allow for the
   1267     fast hash heads scan on traverse, updating the mirrored copy in
   1268     the transaction version of tdb_write
   1269 
   1270   - allow callers to mix transaction and non-transaction use of tdb,
   1271     although once a transaction is started then an exclusive lock is
   1272     gained until the transaction is committed or cancelled
   1273 
   1274   - the commit stategy involves first saving away all modified data
   1275     into a linearised buffer in the transaction recovery area, then
   1276     marking the transaction recovery area with a magic value to
   1277     indicate a valid recovery record. In total 4 fsync/msync calls are
   1278     needed per commit to prevent race conditions. It might be possible
   1279     to reduce this to 3 or even 2 with some more work.
   1280 
   1281   - check for a valid recovery record on open of the tdb, while the
   1282     global lock is held. Automatically recover from the transaction
   1283     recovery area if needed, then continue with the open as
   1284     usual. This allows for smooth crash recovery with no administrator
   1285     intervention.
   1286 
   1287   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
   1288     still available, but no transaction recovery area is used and no
   1289     fsync/msync calls are made.
   1290 
   1291 */
   1292 
   1293 struct tdb_transaction_el {
   1294 	struct tdb_transaction_el *next, *prev;
   1295 	tdb_off_t offset;
   1296 	tdb_len_t length;
   1297 	unsigned char *data;
   1298 };
   1299 
   1300 /*
   1301   hold the context of any current transaction
   1302 */
   1303 struct tdb_transaction {
   1304 	/* we keep a mirrored copy of the tdb hash heads here so
   1305 	   tdb_next_hash_chain() can operate efficiently */
   1306 	u32 *hash_heads;
   1307 
   1308 	/* the original io methods - used to do IOs to the real db */
   1309 	const struct tdb_methods *io_methods;
   1310 
   1311 	/* the list of transaction elements. We use a doubly linked
   1312 	   list with a last pointer to allow us to keep the list
   1313 	   ordered, with first element at the front of the list. It
   1314 	   needs to be doubly linked as the read/write traversals need
   1315 	   to be backwards, while the commit needs to be forwards */
   1316 	struct tdb_transaction_el *elements, *elements_last;
   1317 
   1318 	/* non-zero when an internal transaction error has
   1319 	   occurred. All write operations will then fail until the
   1320 	   transaction is ended */
   1321 	int transaction_error;
   1322 
   1323 	/* when inside a transaction we need to keep track of any
   1324 	   nested tdb_transaction_start() calls, as these are allowed,
   1325 	   but don't create a new transaction */
   1326 	int nesting;
   1327 
   1328 	/* old file size before transaction */
   1329 	tdb_len_t old_map_size;
   1330 };
   1331 
   1332 
   1333 /*
   1334   read while in a transaction. We need to check first if the data is in our list
   1335   of transaction elements, then if not do a real read
   1336 */
   1337 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
   1338 			    tdb_len_t len, int cv)
   1339 {
   1340 	struct tdb_transaction_el *el;
   1341 
   1342 	/* we need to walk the list backwards to get the most recent data */
   1343 	for (el=tdb->transaction->elements_last;el;el=el->prev) {
   1344 		tdb_len_t partial;
   1345 
   1346 		if (off+len <= el->offset) {
   1347 			continue;
   1348 		}
   1349 		if (off >= el->offset + el->length) {
   1350 			continue;
   1351 		}
   1352 
   1353 		/* an overlapping read - needs to be split into up to
   1354 		   2 reads and a memcpy */
   1355 		if (off < el->offset) {
   1356 			partial = el->offset - off;
   1357 			if (transaction_read(tdb, off, buf, partial, cv) != 0) {
   1358 				goto fail;
   1359 			}
   1360 			len -= partial;
   1361 			off += partial;
   1362 			buf = (void *)(partial + (char *)buf);
   1363 		}
   1364 		if (off + len <= el->offset + el->length) {
   1365 			partial = len;
   1366 		} else {
   1367 			partial = el->offset + el->length - off;
   1368 		}
   1369 		memcpy(buf, el->data + (off - el->offset), partial);
   1370 		if (cv) {
   1371 			tdb_convert(buf, len);
   1372 		}
   1373 		len -= partial;
   1374 		off += partial;
   1375 		buf = (void *)(partial + (char *)buf);
   1376 
   1377 		if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
   1378 			goto fail;
   1379 		}
   1380 
   1381 		return 0;
   1382 	}
   1383 
   1384 	/* its not in the transaction elements - do a real read */
   1385 	return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
   1386 
   1387 fail:
   1388 	TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
   1389 	tdb->ecode = TDB_ERR_IO;
   1390 	tdb->transaction->transaction_error = 1;
   1391 	return -1;
   1392 }
   1393 
   1394 
   1395 /*
   1396   write while in a transaction
   1397 */
   1398 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
   1399 			     const void *buf, tdb_len_t len)
   1400 {
   1401 	struct tdb_transaction_el *el, *best_el=NULL;
   1402 
   1403 	if (len == 0) {
   1404 		return 0;
   1405 	}
   1406 
   1407 	/* if the write is to a hash head, then update the transaction
   1408 	   hash heads */
   1409 	if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
   1410 	    off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
   1411 		u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
   1412 		memcpy(&tdb->transaction->hash_heads[chain], buf, len);
   1413 	}
   1414 
   1415 	/* first see if we can replace an existing entry */
   1416 	for (el=tdb->transaction->elements_last;el;el=el->prev) {
   1417 		tdb_len_t partial;
   1418 
   1419 		if (best_el == NULL && off == el->offset+el->length) {
   1420 			best_el = el;
   1421 		}
   1422 
   1423 		if (off+len <= el->offset) {
   1424 			continue;
   1425 		}
   1426 		if (off >= el->offset + el->length) {
   1427 			continue;
   1428 		}
   1429 
   1430 		/* an overlapping write - needs to be split into up to
   1431 		   2 writes and a memcpy */
   1432 		if (off < el->offset) {
   1433 			partial = el->offset - off;
   1434 			if (transaction_write(tdb, off, buf, partial) != 0) {
   1435 				goto fail;
   1436 			}
   1437 			len -= partial;
   1438 			off += partial;
   1439 			buf = (const void *)(partial + (const char *)buf);
   1440 		}
   1441 		if (off + len <= el->offset + el->length) {
   1442 			partial = len;
   1443 		} else {
   1444 			partial = el->offset + el->length - off;
   1445 		}
   1446 		memcpy(el->data + (off - el->offset), buf, partial);
   1447 		len -= partial;
   1448 		off += partial;
   1449 		buf = (const void *)(partial + (const char *)buf);
   1450 
   1451 		if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
   1452 			goto fail;
   1453 		}
   1454 
   1455 		return 0;
   1456 	}
   1457 
   1458 	/* see if we can append the new entry to an existing entry */
   1459 	if (best_el && best_el->offset + best_el->length == off &&
   1460 	    (off+len < tdb->transaction->old_map_size ||
   1461 	     off > tdb->transaction->old_map_size)) {
   1462 		unsigned char *data = best_el->data;
   1463 		el = best_el;
   1464 		el->data = (unsigned char *)realloc(el->data,
   1465 						    el->length + len);
   1466 		if (el->data == NULL) {
   1467 			tdb->ecode = TDB_ERR_OOM;
   1468 			tdb->transaction->transaction_error = 1;
   1469 			el->data = data;
   1470 			return -1;
   1471 		}
   1472 		if (buf) {
   1473 			memcpy(el->data + el->length, buf, len);
   1474 		} else {
   1475 			memset(el->data + el->length, TDB_PAD_BYTE, len);
   1476 		}
   1477 		el->length += len;
   1478 		return 0;
   1479 	}
   1480 
   1481 	/* add a new entry at the end of the list */
   1482 	el = (struct tdb_transaction_el *)malloc(sizeof(*el));
   1483 	if (el == NULL) {
   1484 		tdb->ecode = TDB_ERR_OOM;
   1485 		tdb->transaction->transaction_error = 1;
   1486 		return -1;
   1487 	}
   1488 	el->next = NULL;
   1489 	el->prev = tdb->transaction->elements_last;
   1490 	el->offset = off;
   1491 	el->length = len;
   1492 	el->data = (unsigned char *)malloc(len);
   1493 	if (el->data == NULL) {
   1494 		free(el);
   1495 		tdb->ecode = TDB_ERR_OOM;
   1496 		tdb->transaction->transaction_error = 1;
   1497 		return -1;
   1498 	}
   1499 	if (buf) {
   1500 		memcpy(el->data, buf, len);
   1501 	} else {
   1502 		memset(el->data, TDB_PAD_BYTE, len);
   1503 	}
   1504 	if (el->prev) {
   1505 		el->prev->next = el;
   1506 	} else {
   1507 		tdb->transaction->elements = el;
   1508 	}
   1509 	tdb->transaction->elements_last = el;
   1510 	return 0;
   1511 
   1512 fail:
   1513 	TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
   1514 	tdb->ecode = TDB_ERR_IO;
   1515 	tdb->transaction->transaction_error = 1;
   1516 	return -1;
   1517 }
   1518 
   1519 /*
   1520   accelerated hash chain head search, using the cached hash heads
   1521 */
   1522 static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
   1523 {
   1524 	u32 h = *chain;
   1525 	for (;h < tdb->header.hash_size;h++) {
   1526 		/* the +1 takes account of the freelist */
   1527 		if (0 != tdb->transaction->hash_heads[h+1]) {
   1528 			break;
   1529 		}
   1530 	}
   1531 	(*chain) = h;
   1532 }
   1533 
   1534 /*
   1535   out of bounds check during a transaction
   1536 */
   1537 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
   1538 {
   1539 	if (len <= tdb->map_size) {
   1540 		return 0;
   1541 	}
   1542 	return TDB_ERRCODE(TDB_ERR_IO, -1);
   1543 }
   1544 
   1545 /*
   1546   transaction version of tdb_expand().
   1547 */
   1548 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
   1549 				   tdb_off_t addition)
   1550 {
   1551 	/* add a write to the transaction elements, so subsequent
   1552 	   reads see the zero data */
   1553 	if (transaction_write(tdb, size, NULL, addition) != 0) {
   1554 		return -1;
   1555 	}
   1556 
   1557 	return 0;
   1558 }
   1559 
   1560 /*
   1561   brlock during a transaction - ignore them
   1562 */
   1563 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
   1564 			      int rw_type, int lck_type, int probe, size_t len)
   1565 {
   1566 	return 0;
   1567 }
   1568 
   1569 static const struct tdb_methods transaction_methods = {
   1570 	transaction_read,
   1571 	transaction_write,
   1572 	transaction_next_hash_chain,
   1573 	transaction_oob,
   1574 	transaction_expand_file,
   1575 	transaction_brlock
   1576 };
   1577 
   1578 
   1579 /*
   1580   start a tdb transaction. No token is returned, as only a single
   1581   transaction is allowed to be pending per tdb_context
   1582 */
   1583 int tdb_transaction_start(struct tdb_context *tdb)
   1584 {
   1585 	/* some sanity checks */
   1586 	if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
   1587 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
   1588 		tdb->ecode = TDB_ERR_EINVAL;
   1589 		return -1;
   1590 	}
   1591 
   1592 	/* cope with nested tdb_transaction_start() calls */
   1593 	if (tdb->transaction != NULL) {
   1594 		tdb->transaction->nesting++;
   1595 		TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
   1596 			 tdb->transaction->nesting));
   1597 		return 0;
   1598 	}
   1599 
   1600 	if (tdb->num_locks != 0 || tdb->global_lock.count) {
   1601 		/* the caller must not have any locks when starting a
   1602 		   transaction as otherwise we'll be screwed by lack
   1603 		   of nested locks in posix */
   1604 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
   1605 		tdb->ecode = TDB_ERR_LOCK;
   1606 		return -1;
   1607 	}
   1608 
   1609 	if (tdb->travlocks.next != NULL) {
   1610 		/* you cannot use transactions inside a traverse (although you can use
   1611 		   traverse inside a transaction) as otherwise you can end up with
   1612 		   deadlock */
   1613 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
   1614 		tdb->ecode = TDB_ERR_LOCK;
   1615 		return -1;
   1616 	}
   1617 
   1618 	tdb->transaction = (struct tdb_transaction *)
   1619 		calloc(sizeof(struct tdb_transaction), 1);
   1620 	if (tdb->transaction == NULL) {
   1621 		tdb->ecode = TDB_ERR_OOM;
   1622 		return -1;
   1623 	}
   1624 
   1625 	/* get the transaction write lock. This is a blocking lock. As
   1626 	   discussed with Volker, there are a number of ways we could
   1627 	   make this async, which we will probably do in the future */
   1628 	if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
   1629 		SAFE_FREE(tdb->transaction);
   1630 		return -1;
   1631 	}
   1632 
   1633 	/* get a read lock from the freelist to the end of file. This
   1634 	   is upgraded to a write lock during the commit */
   1635 	if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
   1636 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
   1637 		tdb->ecode = TDB_ERR_LOCK;
   1638 		goto fail;
   1639 	}
   1640 
   1641 	/* setup a copy of the hash table heads so the hash scan in
   1642 	   traverse can be fast */
   1643 	tdb->transaction->hash_heads = (u32 *)
   1644 		calloc(tdb->header.hash_size+1, sizeof(u32));
   1645 	if (tdb->transaction->hash_heads == NULL) {
   1646 		tdb->ecode = TDB_ERR_OOM;
   1647 		goto fail;
   1648 	}
   1649 	if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
   1650 				   TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
   1651 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
   1652 		tdb->ecode = TDB_ERR_IO;
   1653 		goto fail;
   1654 	}
   1655 
   1656 	/* make sure we know about any file expansions already done by
   1657 	   anyone else */
   1658 	tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
   1659 	tdb->transaction->old_map_size = tdb->map_size;
   1660 
   1661 	/* finally hook the io methods, replacing them with
   1662 	   transaction specific methods */
   1663 	tdb->transaction->io_methods = tdb->methods;
   1664 	tdb->methods = &transaction_methods;
   1665 
   1666 	/* by calling this transaction write here, we ensure that we don't grow the
   1667 	   transaction linked list due to hash table updates */
   1668 	if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
   1669 			      TDB_HASHTABLE_SIZE(tdb)) != 0) {
   1670 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
   1671 		tdb->ecode = TDB_ERR_IO;
   1672 		tdb->methods = tdb->transaction->io_methods;
   1673 		goto fail;
   1674 	}
   1675 
   1676 	return 0;
   1677 
   1678 fail:
   1679 	tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
   1680 	tdb_transaction_unlock(tdb);
   1681 	SAFE_FREE(tdb->transaction->hash_heads);
   1682 	SAFE_FREE(tdb->transaction);
   1683 	return -1;
   1684 }
   1685 
   1686 
   1687 /*
   1688   cancel the current transaction
   1689 */
   1690 int tdb_transaction_cancel(struct tdb_context *tdb)
   1691 {
   1692 	if (tdb->transaction == NULL) {
   1693 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
   1694 		return -1;
   1695 	}
   1696 
   1697 	if (tdb->transaction->nesting != 0) {
   1698 		tdb->transaction->transaction_error = 1;
   1699 		tdb->transaction->nesting--;
   1700 		return 0;
   1701 	}
   1702 
   1703 	tdb->map_size = tdb->transaction->old_map_size;
   1704 
   1705 	/* free all the transaction elements */
   1706 	while (tdb->transaction->elements) {
   1707 		struct tdb_transaction_el *el = tdb->transaction->elements;
   1708 		tdb->transaction->elements = el->next;
   1709 		free(el->data);
   1710 		free(el);
   1711 	}
   1712 
   1713 	/* remove any global lock created during the transaction */
   1714 	if (tdb->global_lock.count != 0) {
   1715 		tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
   1716 		tdb->global_lock.count = 0;
   1717 	}
   1718 
   1719 	/* remove any locks created during the transaction */
   1720 	if (tdb->num_locks != 0) {
   1721 		int i;
   1722 		for (i=0;i<tdb->num_lockrecs;i++) {
   1723 			tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
   1724 				   F_UNLCK,F_SETLKW, 0, 1);
   1725 		}
   1726 		tdb->num_locks = 0;
   1727 		tdb->num_lockrecs = 0;
   1728 		SAFE_FREE(tdb->lockrecs);
   1729 	}
   1730 
   1731 	/* restore the normal io methods */
   1732 	tdb->methods = tdb->transaction->io_methods;
   1733 
   1734 	tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
   1735 	tdb_transaction_unlock(tdb);
   1736 	SAFE_FREE(tdb->transaction->hash_heads);
   1737 	SAFE_FREE(tdb->transaction);
   1738 
   1739 	return 0;
   1740 }
   1741 
   1742 /*
   1743   sync to disk
   1744 */
   1745 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
   1746 {
   1747 	if (fsync(tdb->fd) != 0) {
   1748 		tdb->ecode = TDB_ERR_IO;
   1749 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
   1750 		return -1;
   1751 	}
   1752 #ifdef MS_SYNC
   1753 	if (tdb->map_ptr) {
   1754 		tdb_off_t moffset = offset & ~(tdb->page_size-1);
   1755 		if (msync(moffset + (char *)tdb->map_ptr,
   1756 			  length + (offset - moffset), MS_SYNC) != 0) {
   1757 			tdb->ecode = TDB_ERR_IO;
   1758 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
   1759 				 strerror(errno)));
   1760 			return -1;
   1761 		}
   1762 	}
   1763 #endif
   1764 	return 0;
   1765 }
   1766 
   1767 
   1768 /*
   1769   work out how much space the linearised recovery data will consume
   1770 */
   1771 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
   1772 {
   1773 	struct tdb_transaction_el *el;
   1774 	tdb_len_t recovery_size = 0;
   1775 
   1776 	recovery_size = sizeof(u32);
   1777 	for (el=tdb->transaction->elements;el;el=el->next) {
   1778 		if (el->offset >= tdb->transaction->old_map_size) {
   1779 			continue;
   1780 		}
   1781 		recovery_size += 2*sizeof(tdb_off_t) + el->length;
   1782 	}
   1783 
   1784 	return recovery_size;
   1785 }
   1786 
   1787 /*
   1788   allocate the recovery area, or use an existing recovery area if it is
   1789   large enough
   1790 */
   1791 static int tdb_recovery_allocate(struct tdb_context *tdb,
   1792 				 tdb_len_t *recovery_size,
   1793 				 tdb_off_t *recovery_offset,
   1794 				 tdb_len_t *recovery_max_size)
   1795 {
   1796 	struct list_struct rec;
   1797 	const struct tdb_methods *methods = tdb->transaction->io_methods;
   1798 	tdb_off_t recovery_head;
   1799 
   1800 	if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
   1801 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
   1802 		return -1;
   1803 	}
   1804 
   1805 	rec.rec_len = 0;
   1806 
   1807 	if (recovery_head != 0 &&
   1808 	    methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
   1809 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
   1810 		return -1;
   1811 	}
   1812 
   1813 	*recovery_size = tdb_recovery_size(tdb);
   1814 
   1815 	if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
   1816 		/* it fits in the existing area */
   1817 		*recovery_max_size = rec.rec_len;
   1818 		*recovery_offset = recovery_head;
   1819 		return 0;
   1820 	}
   1821 
   1822 	/* we need to free up the old recovery area, then allocate a
   1823 	   new one at the end of the file. Note that we cannot use
   1824 	   tdb_allocate() to allocate the new one as that might return
   1825 	   us an area that is being currently used (as of the start of
   1826 	   the transaction) */
   1827 	if (recovery_head != 0) {
   1828 		if (tdb_free(tdb, recovery_head, &rec) == -1) {
   1829 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
   1830 			return -1;
   1831 		}
   1832 	}
   1833 
   1834 	/* the tdb_free() call might have increased the recovery size */
   1835 	*recovery_size = tdb_recovery_size(tdb);
   1836 
   1837 	/* round up to a multiple of page size */
   1838 	*recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
   1839 	*recovery_offset = tdb->map_size;
   1840 	recovery_head = *recovery_offset;
   1841 
   1842 	if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
   1843 				     (tdb->map_size - tdb->transaction->old_map_size) +
   1844 				     sizeof(rec) + *recovery_max_size) == -1) {
   1845 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
   1846 		return -1;
   1847 	}
   1848 
   1849 	/* remap the file (if using mmap) */
   1850 	methods->tdb_oob(tdb, tdb->map_size + 1, 1);
   1851 
   1852 	/* we have to reset the old map size so that we don't try to expand the file
   1853 	   again in the transaction commit, which would destroy the recovery area */
   1854 	tdb->transaction->old_map_size = tdb->map_size;
   1855 
   1856 	/* write the recovery header offset and sync - we can sync without a race here
   1857 	   as the magic ptr in the recovery record has not been set */
   1858 	CONVERT(recovery_head);
   1859 	if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
   1860 			       &recovery_head, sizeof(tdb_off_t)) == -1) {
   1861 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
   1862 		return -1;
   1863 	}
   1864 
   1865 	return 0;
   1866 }
   1867 
   1868 
   1869 /*
   1870   setup the recovery data that will be used on a crash during commit
   1871 */
   1872 static int transaction_setup_recovery(struct tdb_context *tdb,
   1873 				      tdb_off_t *magic_offset)
   1874 {
   1875 	struct tdb_transaction_el *el;
   1876 	tdb_len_t recovery_size;
   1877 	unsigned char *data, *p;
   1878 	const struct tdb_methods *methods = tdb->transaction->io_methods;
   1879 	struct list_struct *rec;
   1880 	tdb_off_t recovery_offset, recovery_max_size;
   1881 	tdb_off_t old_map_size = tdb->transaction->old_map_size;
   1882 	u32 magic, tailer;
   1883 
   1884 	/*
   1885 	  check that the recovery area has enough space
   1886 	*/
   1887 	if (tdb_recovery_allocate(tdb, &recovery_size,
   1888 				  &recovery_offset, &recovery_max_size) == -1) {
   1889 		return -1;
   1890 	}
   1891 
   1892 	data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
   1893 	if (data == NULL) {
   1894 		tdb->ecode = TDB_ERR_OOM;
   1895 		return -1;
   1896 	}
   1897 
   1898 	rec = (struct list_struct *)data;
   1899 	memset(rec, 0, sizeof(*rec));
   1900 
   1901 	rec->magic    = 0;
   1902 	rec->data_len = recovery_size;
   1903 	rec->rec_len  = recovery_max_size;
   1904 	rec->key_len  = old_map_size;
   1905 	CONVERT(rec);
   1906 
   1907 	/* build the recovery data into a single blob to allow us to do a single
   1908 	   large write, which should be more efficient */
   1909 	p = data + sizeof(*rec);
   1910 	for (el=tdb->transaction->elements;el;el=el->next) {
   1911 		if (el->offset >= old_map_size) {
   1912 			continue;
   1913 		}
   1914 		if (el->offset + el->length > tdb->transaction->old_map_size) {
   1915 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
   1916 			free(data);
   1917 			tdb->ecode = TDB_ERR_CORRUPT;
   1918 			return -1;
   1919 		}
   1920 		memcpy(p, &el->offset, 4);
   1921 		memcpy(p+4, &el->length, 4);
   1922 		if (DOCONV()) {
   1923 			tdb_convert(p, 8);
   1924 		}
   1925 		/* the recovery area contains the old data, not the
   1926 		   new data, so we have to call the original tdb_read
   1927 		   method to get it */
   1928 		if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
   1929 			free(data);
   1930 			tdb->ecode = TDB_ERR_IO;
   1931 			return -1;
   1932 		}
   1933 		p += 8 + el->length;
   1934 	}
   1935 
   1936 	/* and the tailer */
   1937 	tailer = sizeof(*rec) + recovery_max_size;
   1938 	memcpy(p, &tailer, 4);
   1939 	CONVERT(p);
   1940 
   1941 	/* write the recovery data to the recovery area */
   1942 	if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
   1943 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
   1944 		free(data);
   1945 		tdb->ecode = TDB_ERR_IO;
   1946 		return -1;
   1947 	}
   1948 
   1949 	/* as we don't have ordered writes, we have to sync the recovery
   1950 	   data before we update the magic to indicate that the recovery
   1951 	   data is present */
   1952 	if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
   1953 		free(data);
   1954 		return -1;
   1955 	}
   1956 
   1957 	free(data);
   1958 
   1959 	magic = TDB_RECOVERY_MAGIC;
   1960 	CONVERT(magic);
   1961 
   1962 	*magic_offset = recovery_offset + offsetof(struct list_struct, magic);
   1963 
   1964 	if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
   1965 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
   1966 		tdb->ecode = TDB_ERR_IO;
   1967 		return -1;
   1968 	}
   1969 
   1970 	/* ensure the recovery magic marker is on disk */
   1971 	if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
   1972 		return -1;
   1973 	}
   1974 
   1975 	return 0;
   1976 }
   1977 
   1978 /*
   1979   commit the current transaction
   1980 */
   1981 int tdb_transaction_commit(struct tdb_context *tdb)
   1982 {
   1983 	const struct tdb_methods *methods;
   1984 	tdb_off_t magic_offset = 0;
   1985 	u32 zero = 0;
   1986 
   1987 	if (tdb->transaction == NULL) {
   1988 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
   1989 		return -1;
   1990 	}
   1991 
   1992 	if (tdb->transaction->transaction_error) {
   1993 		tdb->ecode = TDB_ERR_IO;
   1994 		tdb_transaction_cancel(tdb);
   1995 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
   1996 		return -1;
   1997 	}
   1998 
   1999 	if (tdb->transaction->nesting != 0) {
   2000 		tdb->transaction->nesting--;
   2001 		return 0;
   2002 	}
   2003 
   2004 	/* check for a null transaction */
   2005 	if (tdb->transaction->elements == NULL) {
   2006 		tdb_transaction_cancel(tdb);
   2007 		return 0;
   2008 	}
   2009 
   2010 	methods = tdb->transaction->io_methods;
   2011 
   2012 	/* if there are any locks pending then the caller has not
   2013 	   nested their locks properly, so fail the transaction */
   2014 	if (tdb->num_locks || tdb->global_lock.count) {
   2015 		tdb->ecode = TDB_ERR_LOCK;
   2016 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
   2017 		tdb_transaction_cancel(tdb);
   2018 		return -1;
   2019 	}
   2020 
   2021 	/* upgrade the main transaction lock region to a write lock */
   2022 	if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
   2023 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
   2024 		tdb->ecode = TDB_ERR_LOCK;
   2025 		tdb_transaction_cancel(tdb);
   2026 		return -1;
   2027 	}
   2028 
   2029 	/* get the global lock - this prevents new users attaching to the database
   2030 	   during the commit */
   2031 	if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
   2032 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
   2033 		tdb->ecode = TDB_ERR_LOCK;
   2034 		tdb_transaction_cancel(tdb);
   2035 		return -1;
   2036 	}
   2037 
   2038 	if (!(tdb->flags & TDB_NOSYNC)) {
   2039 		/* write the recovery data to the end of the file */
   2040 		if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
   2041 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
   2042 			tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
   2043 			tdb_transaction_cancel(tdb);
   2044 			return -1;
   2045 		}
   2046 	}
   2047 
   2048 	/* expand the file to the new size if needed */
   2049 	if (tdb->map_size != tdb->transaction->old_map_size) {
   2050 		if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
   2051 					     tdb->map_size -
   2052 					     tdb->transaction->old_map_size) == -1) {
   2053 			tdb->ecode = TDB_ERR_IO;
   2054 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
   2055 			tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
   2056 			tdb_transaction_cancel(tdb);
   2057 			return -1;
   2058 		}
   2059 		tdb->map_size = tdb->transaction->old_map_size;
   2060 		methods->tdb_oob(tdb, tdb->map_size + 1, 1);
   2061 	}
   2062 
   2063 	/* perform all the writes */
   2064 	while (tdb->transaction->elements) {
   2065 		struct tdb_transaction_el *el = tdb->transaction->elements;
   2066 
   2067 		if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
   2068 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
   2069 
   2070 			/* we've overwritten part of the data and
   2071 			   possibly expanded the file, so we need to
   2072 			   run the crash recovery code */
   2073 			tdb->methods = methods;
   2074 			tdb_transaction_recover(tdb);
   2075 
   2076 			tdb_transaction_cancel(tdb);
   2077 			tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
   2078 
   2079 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
   2080 			return -1;
   2081 		}
   2082 		tdb->transaction->elements = el->next;
   2083 		free(el->data);
   2084 		free(el);
   2085 	}
   2086 
   2087 	if (!(tdb->flags & TDB_NOSYNC)) {
   2088 		/* ensure the new data is on disk */
   2089 		if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
   2090 			return -1;
   2091 		}
   2092 
   2093 		/* remove the recovery marker */
   2094 		if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
   2095 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
   2096 			return -1;
   2097 		}
   2098 
   2099 		/* ensure the recovery marker has been removed on disk */
   2100 		if (transaction_sync(tdb, magic_offset, 4) == -1) {
   2101 			return -1;
   2102 		}
   2103 	}
   2104 
   2105 	tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
   2106 
   2107 	/*
   2108 	  TODO: maybe write to some dummy hdr field, or write to magic
   2109 	  offset without mmap, before the last sync, instead of the
   2110 	  utime() call
   2111 	*/
   2112 
   2113 	/* on some systems (like Linux 2.6.x) changes via mmap/msync
   2114 	   don't change the mtime of the file, this means the file may
   2115 	   not be backed up (as tdb rounding to block sizes means that
   2116 	   file size changes are quite rare too). The following forces
   2117 	   mtime changes when a transaction completes */
   2118 #ifdef HAVE_UTIME
   2119 	utime(tdb->name, NULL);
   2120 #endif
   2121 
   2122 	/* use a transaction cancel to free memory and remove the
   2123 	   transaction locks */
   2124 	tdb_transaction_cancel(tdb);
   2125 	return 0;
   2126 }
   2127 
   2128 
   2129 /*
   2130   recover from an aborted transaction. Must be called with exclusive
   2131   database write access already established (including the global
   2132   lock to prevent new processes attaching)
   2133 */
   2134 int tdb_transaction_recover(struct tdb_context *tdb)
   2135 {
   2136 	tdb_off_t recovery_head, recovery_eof;
   2137 	unsigned char *data, *p;
   2138 	u32 zero = 0;
   2139 	struct list_struct rec;
   2140 
   2141 	/* find the recovery area */
   2142 	if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
   2143 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
   2144 		tdb->ecode = TDB_ERR_IO;
   2145 		return -1;
   2146 	}
   2147 
   2148 	if (recovery_head == 0) {
   2149 		/* we have never allocated a recovery record */
   2150 		return 0;
   2151 	}
   2152 
   2153 	/* read the recovery record */
   2154 	if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
   2155 				   sizeof(rec), DOCONV()) == -1) {
   2156 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
   2157 		tdb->ecode = TDB_ERR_IO;
   2158 		return -1;
   2159 	}
   2160 
   2161 	if (rec.magic != TDB_RECOVERY_MAGIC) {
   2162 		/* there is no valid recovery data */
   2163 		return 0;
   2164 	}
   2165 
   2166 	if (tdb->read_only) {
   2167 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
   2168 		tdb->ecode = TDB_ERR_CORRUPT;
   2169 		return -1;
   2170 	}
   2171 
   2172 	recovery_eof = rec.key_len;
   2173 
   2174 	data = (unsigned char *)malloc(rec.data_len);
   2175 	if (data == NULL) {
   2176 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
   2177 		tdb->ecode = TDB_ERR_OOM;
   2178 		return -1;
   2179 	}
   2180 
   2181 	/* read the full recovery data */
   2182 	if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
   2183 				   rec.data_len, 0) == -1) {
   2184 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
   2185 		tdb->ecode = TDB_ERR_IO;
   2186 		return -1;
   2187 	}
   2188 
   2189 	/* recover the file data */
   2190 	p = data;
   2191 	while (p+8 < data + rec.data_len) {
   2192 		u32 ofs, len;
   2193 		if (DOCONV()) {
   2194 			tdb_convert(p, 8);
   2195 		}
   2196 		memcpy(&ofs, p, 4);
   2197 		memcpy(&len, p+4, 4);
   2198 
   2199 		if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
   2200 			free(data);
   2201 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
   2202 			tdb->ecode = TDB_ERR_IO;
   2203 			return -1;
   2204 		}
   2205 		p += 8 + len;
   2206 	}
   2207 
   2208 	free(data);
   2209 
   2210 	if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
   2211 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
   2212 		tdb->ecode = TDB_ERR_IO;
   2213 		return -1;
   2214 	}
   2215 
   2216 	/* if the recovery area is after the recovered eof then remove it */
   2217 	if (recovery_eof <= recovery_head) {
   2218 		if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
   2219 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
   2220 			tdb->ecode = TDB_ERR_IO;
   2221 			return -1;
   2222 		}
   2223 	}
   2224 
   2225 	/* remove the recovery magic */
   2226 	if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
   2227 			  &zero) == -1) {
   2228 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
   2229 		tdb->ecode = TDB_ERR_IO;
   2230 		return -1;
   2231 	}
   2232 
   2233 	/* reduce the file size to the old size */
   2234 	tdb_munmap(tdb);
   2235 	if (ftruncate(tdb->fd, recovery_eof) != 0) {
   2236 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
   2237 		tdb->ecode = TDB_ERR_IO;
   2238 		return -1;
   2239 	}
   2240 	tdb->map_size = recovery_eof;
   2241 	tdb_mmap(tdb);
   2242 
   2243 	if (transaction_sync(tdb, 0, recovery_eof) == -1) {
   2244 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
   2245 		tdb->ecode = TDB_ERR_IO;
   2246 		return -1;
   2247 	}
   2248 
   2249 	TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",
   2250 		 recovery_eof));
   2251 
   2252 	/* all done */
   2253 	return 0;
   2254 }
   2255 
   2256 /* file: freelist.c */
   2257 
   2258 /* read a freelist record and check for simple errors */
   2259 static int tdb_rec_free_read(struct tdb_context *tdb, tdb_off_t off, struct list_struct *rec)
   2260 {
   2261 	if (tdb->methods->tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
   2262 		return -1;
   2263 
   2264 	if (rec->magic == TDB_MAGIC) {
   2265 		/* this happens when a app is showdown while deleting a record - we should
   2266 		   not completely fail when this happens */
   2267 		TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_rec_free_read non-free magic 0x%x at offset=%d - fixing\n",
   2268 			 rec->magic, off));
   2269 		rec->magic = TDB_FREE_MAGIC;
   2270 		if (tdb->methods->tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
   2271 			return -1;
   2272 	}
   2273 
   2274 	if (rec->magic != TDB_FREE_MAGIC) {
   2275 		/* Ensure ecode is set for log fn. */
   2276 		tdb->ecode = TDB_ERR_CORRUPT;
   2277 		TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_rec_free_read bad magic 0x%x at offset=%d\n",
   2278 			   rec->magic, off));
   2279 		return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
   2280 	}
   2281 	if (tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
   2282 		return -1;
   2283 	return 0;
   2284 }
   2285 
   2286 
   2287 
   2288 /* Remove an element from the freelist.  Must have alloc lock. */
   2289 static int remove_from_freelist(struct tdb_context *tdb, tdb_off_t off, tdb_off_t next)
   2290 {
   2291 	tdb_off_t last_ptr, i;
   2292 
   2293 	/* read in the freelist top */
   2294 	last_ptr = FREELIST_TOP;
   2295 	while (tdb_ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
   2296 		if (i == off) {
   2297 			/* We've found it! */
   2298 			return tdb_ofs_write(tdb, last_ptr, &next);
   2299 		}
   2300 		/* Follow chain (next offset is at start of record) */
   2301 		last_ptr = i;
   2302 	}
   2303 	TDB_LOG((tdb, TDB_DEBUG_FATAL,"remove_from_freelist: not on list at off=%d\n", off));
   2304 	return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
   2305 }
   2306 
   2307 
   2308 /* update a record tailer (must hold allocation lock) */
   2309 static int update_tailer(struct tdb_context *tdb, tdb_off_t offset,
   2310 			 const struct list_struct *rec)
   2311 {
   2312 	tdb_off_t totalsize;
   2313 
   2314 	/* Offset of tailer from record header */
   2315 	totalsize = sizeof(*rec) + rec->rec_len;
   2316 	return tdb_ofs_write(tdb, offset + totalsize - sizeof(tdb_off_t),
   2317 			 &totalsize);
   2318 }
   2319 
   2320 /* Add an element into the freelist. Merge adjacent records if
   2321    neccessary. */
   2322 int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
   2323 {
   2324 	tdb_off_t right, left;
   2325 
   2326 	/* Allocation and tailer lock */
   2327 	if (tdb_lock(tdb, -1, F_WRLCK) != 0)
   2328 		return -1;
   2329 
   2330 	/* set an initial tailer, so if we fail we don't leave a bogus record */
   2331 	if (update_tailer(tdb, offset, rec) != 0) {
   2332 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: update_tailer failed!\n"));
   2333 		goto fail;
   2334 	}
   2335 
   2336 	/* Look right first (I'm an Australian, dammit) */
   2337 	right = offset + sizeof(*rec) + rec->rec_len;
   2338 	if (right + sizeof(*rec) <= tdb->map_size) {
   2339 		struct list_struct r;
   2340 
   2341 		if (tdb->methods->tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
   2342 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: right read failed at %u\n", right));
   2343 			goto left;
   2344 		}
   2345 
   2346 		/* If it's free, expand to include it. */
   2347 		if (r.magic == TDB_FREE_MAGIC) {
   2348 			if (remove_from_freelist(tdb, right, r.next) == -1) {
   2349 				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: right free failed at %u\n", right));
   2350 				goto left;
   2351 			}
   2352 			rec->rec_len += sizeof(r) + r.rec_len;
   2353 		}
   2354 	}
   2355 
   2356 left:
   2357 	/* Look left */
   2358 	left = offset - sizeof(tdb_off_t);
   2359 	if (left > TDB_DATA_START(tdb->header.hash_size)) {
   2360 		struct list_struct l;
   2361 		tdb_off_t leftsize;
   2362 
   2363 		/* Read in tailer and jump back to header */
   2364 		if (tdb_ofs_read(tdb, left, &leftsize) == -1) {
   2365 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left offset read failed at %u\n", left));
   2366 			goto update;
   2367 		}
   2368 
   2369 		/* it could be uninitialised data */
   2370 		if (leftsize == 0 || leftsize == TDB_PAD_U32) {
   2371 			goto update;
   2372 		}
   2373 
   2374 		left = offset - leftsize;
   2375 
   2376 		/* Now read in record */
   2377 		if (tdb->methods->tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
   2378 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
   2379 			goto update;
   2380 		}
   2381 
   2382 		/* If it's free, expand to include it. */
   2383 		if (l.magic == TDB_FREE_MAGIC) {
   2384 			if (remove_from_freelist(tdb, left, l.next) == -1) {
   2385 				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left free failed at %u\n", left));
   2386 				goto update;
   2387 			} else {
   2388 				offset = left;
   2389 				rec->rec_len += leftsize;
   2390 			}
   2391 		}
   2392 	}
   2393 
   2394 update:
   2395 	if (update_tailer(tdb, offset, rec) == -1) {
   2396 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: update_tailer failed at %u\n", offset));
   2397 		goto fail;
   2398 	}
   2399 
   2400 	/* Now, prepend to free list */
   2401 	rec->magic = TDB_FREE_MAGIC;
   2402 
   2403 	if (tdb_ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
   2404 	    tdb_rec_write(tdb, offset, rec) == -1 ||
   2405 	    tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
   2406 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free record write failed at offset=%d\n", offset));
   2407 		goto fail;
   2408 	}
   2409 
   2410 	/* And we're done. */
   2411 	tdb_unlock(tdb, -1, F_WRLCK);
   2412 	return 0;
   2413 
   2414  fail:
   2415 	tdb_unlock(tdb, -1, F_WRLCK);
   2416 	return -1;
   2417 }
   2418 
   2419 
   2420 /*
   2421    the core of tdb_allocate - called when we have decided which
   2422    free list entry to use
   2423  */
   2424 static tdb_off_t tdb_allocate_ofs(struct tdb_context *tdb, tdb_len_t length, tdb_off_t rec_ptr,
   2425 				struct list_struct *rec, tdb_off_t last_ptr)
   2426 {
   2427 	struct list_struct newrec;
   2428 	tdb_off_t newrec_ptr;
   2429 
   2430 	memset(&newrec, '\0', sizeof(newrec));
   2431 
   2432 	/* found it - now possibly split it up  */
   2433 	if (rec->rec_len > length + MIN_REC_SIZE) {
   2434 		/* Length of left piece */
   2435 		length = TDB_ALIGN(length, TDB_ALIGNMENT);
   2436 
   2437 		/* Right piece to go on free list */
   2438 		newrec.rec_len = rec->rec_len - (sizeof(*rec) + length);
   2439 		newrec_ptr = rec_ptr + sizeof(*rec) + length;
   2440 
   2441 		/* And left record is shortened */
   2442 		rec->rec_len = length;
   2443 	} else {
   2444 		newrec_ptr = 0;
   2445 	}
   2446 
   2447 	/* Remove allocated record from the free list */
   2448 	if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1) {
   2449 		return 0;
   2450 	}
   2451 
   2452 	/* Update header: do this before we drop alloc
   2453 	   lock, otherwise tdb_free() might try to
   2454 	   merge with us, thinking we're free.
   2455 	   (Thanks Jeremy Allison). */
   2456 	rec->magic = TDB_MAGIC;
   2457 	if (tdb_rec_write(tdb, rec_ptr, rec) == -1) {
   2458 		return 0;
   2459 	}
   2460 
   2461 	/* Did we create new block? */
   2462 	if (newrec_ptr) {
   2463 		/* Update allocated record tailer (we
   2464 		   shortened it). */
   2465 		if (update_tailer(tdb, rec_ptr, rec) == -1) {
   2466 			return 0;
   2467 		}
   2468 
   2469 		/* Free new record */
   2470 		if (tdb_free(tdb, newrec_ptr, &newrec) == -1) {
   2471 			return 0;
   2472 		}
   2473 	}
   2474 
   2475 	/* all done - return the new record offset */
   2476 	return rec_ptr;
   2477 }
   2478 
   2479 /* allocate some space from the free list. The offset returned points
   2480    to a unconnected list_struct within the database with room for at
   2481    least length bytes of total data
   2482 
   2483    0 is returned if the space could not be allocated
   2484  */
   2485 tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_struct *rec)
   2486 {
   2487 	tdb_off_t rec_ptr, last_ptr, newrec_ptr;
   2488 	struct {
   2489 		tdb_off_t rec_ptr, last_ptr;
   2490 		tdb_len_t rec_len;
   2491 	} bestfit;
   2492 
   2493 	if (tdb_lock(tdb, -1, F_WRLCK) == -1)
   2494 		return 0;
   2495 
   2496 	/* Extra bytes required for tailer */
   2497 	length += sizeof(tdb_off_t);
   2498 
   2499  again:
   2500 	last_ptr = FREELIST_TOP;
   2501 
   2502 	/* read in the freelist top */
   2503 	if (tdb_ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
   2504 		goto fail;
   2505 
   2506 	bestfit.rec_ptr = 0;
   2507 	bestfit.last_ptr = 0;
   2508 	bestfit.rec_len = 0;
   2509 
   2510 	/*
   2511 	   this is a best fit allocation strategy. Originally we used
   2512 	   a first fit strategy, but it suffered from massive fragmentation
   2513 	   issues when faced with a slowly increasing record size.
   2514 	 */
   2515 	while (rec_ptr) {
   2516 		if (tdb_rec_free_read(tdb, rec_ptr, rec) == -1) {
   2517 			goto fail;
   2518 		}
   2519 
   2520 		if (rec->rec_len >= length) {
   2521 			if (bestfit.rec_ptr == 0 ||
   2522 			    rec->rec_len < bestfit.rec_len) {
   2523 				bestfit.rec_len = rec->rec_len;
   2524 				bestfit.rec_ptr = rec_ptr;
   2525 				bestfit.last_ptr = last_ptr;
   2526 				/* consider a fit to be good enough if
   2527 				   we aren't wasting more than half
   2528 				   the space */
   2529 				if (bestfit.rec_len < 2*length) {
   2530 					break;
   2531 				}
   2532 			}
   2533 		}
   2534 
   2535 		/* move to the next record */
   2536 		last_ptr = rec_ptr;
   2537 		rec_ptr = rec->next;
   2538 	}
   2539 
   2540 	if (bestfit.rec_ptr != 0) {
   2541 		if (tdb_rec_free_read(tdb, bestfit.rec_ptr, rec) == -1) {
   2542 			goto fail;
   2543 		}
   2544 
   2545 		newrec_ptr = tdb_allocate_ofs(tdb, length, bestfit.rec_ptr, rec, bestfit.last_ptr);
   2546 		tdb_unlock(tdb, -1, F_WRLCK);
   2547 		return newrec_ptr;
   2548 	}
   2549 
   2550 	/* we didn't find enough space. See if we can expand the
   2551 	   database and if we can then try again */
   2552 	if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
   2553 		goto again;
   2554  fail:
   2555 	tdb_unlock(tdb, -1, F_WRLCK);
   2556 	return 0;
   2557 }
   2558 
   2559 /* file: freelistcheck.c */
   2560 
   2561 /* Check the freelist is good and contains no loops.
   2562    Very memory intensive - only do this as a consistency
   2563    checker. Heh heh - uses an in memory tdb as the storage
   2564    for the "seen" record list. For some reason this strikes
   2565    me as extremely clever as I don't have to write another tree
   2566    data structure implementation :-).
   2567  */
   2568 
   2569 static int seen_insert(struct tdb_context *mem_tdb, tdb_off_t rec_ptr)
   2570 {
   2571 	TDB_DATA key, data;
   2572 
   2573 	memset(&data, '\0', sizeof(data));
   2574 	key.dptr = (unsigned char *)&rec_ptr;
   2575 	key.dsize = sizeof(rec_ptr);
   2576 	return tdb_store(mem_tdb, key, data, TDB_INSERT);
   2577 }
   2578 
   2579 int tdb_validate_freelist(struct tdb_context *tdb, int *pnum_entries)
   2580 {
   2581 	struct tdb_context *mem_tdb = NULL;
   2582 	struct list_struct rec;
   2583 	tdb_off_t rec_ptr, last_ptr;
   2584 	int ret = -1;
   2585 
   2586 	*pnum_entries = 0;
   2587 
   2588 	mem_tdb = tdb_open("flval", tdb->header.hash_size,
   2589 				TDB_INTERNAL, O_RDWR, 0600);
   2590 	if (!mem_tdb) {
   2591 		return -1;
   2592 	}
   2593 
   2594 	if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
   2595 		tdb_close(mem_tdb);
   2596 		return 0;
   2597 	}
   2598 
   2599 	last_ptr = FREELIST_TOP;
   2600 
   2601 	/* Store the FREELIST_TOP record. */
   2602 	if (seen_insert(mem_tdb, last_ptr) == -1) {
   2603 		ret = TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
   2604 		goto fail;
   2605 	}
   2606 
   2607 	/* read in the freelist top */
   2608 	if (tdb_ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1) {
   2609 		goto fail;
   2610 	}
   2611 
   2612 	while (rec_ptr) {
   2613 
   2614 		/* If we can't store this record (we've seen it
   2615 		   before) then the free list has a loop and must
   2616 		   be corrupt. */
   2617 
   2618 		if (seen_insert(mem_tdb, rec_ptr)) {
   2619 			ret = TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
   2620 			goto fail;
   2621 		}
   2622 
   2623 		if (tdb_rec_free_read(tdb, rec_ptr, &rec) == -1) {
   2624 			goto fail;
   2625 		}
   2626 
   2627 		/* move to the next record */
   2628 		last_ptr = rec_ptr;
   2629 		rec_ptr = rec.next;
   2630 		*pnum_entries += 1;
   2631 	}
   2632 
   2633 	ret = 0;
   2634 
   2635   fail:
   2636 
   2637 	tdb_close(mem_tdb);
   2638 	tdb_unlock(tdb, -1, F_WRLCK);
   2639 	return ret;
   2640 }
   2641 
   2642 /* file: traverse.c */
   2643 
   2644 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
   2645 static int tdb_next_lock(struct tdb_context *tdb, struct tdb_traverse_lock *tlock,
   2646 			 struct list_struct *rec)
   2647 {
   2648 	int want_next = (tlock->off != 0);
   2649 
   2650 	/* Lock each chain from the start one. */
   2651 	for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
   2652 		if (!tlock->off && tlock->hash != 0) {
   2653 			/* this is an optimisation for the common case where
   2654 			   the hash chain is empty, which is particularly
   2655 			   common for the use of tdb with ldb, where large
   2656 			   hashes are used. In that case we spend most of our
   2657 			   time in tdb_brlock(), locking empty hash chains.
   2658 
   2659 			   To avoid this, we do an unlocked pre-check to see
   2660 			   if the hash chain is empty before starting to look
   2661 			   inside it. If it is empty then we can avoid that
   2662 			   hash chain. If it isn't empty then we can't believe
   2663 			   the value we get back, as we read it without a
   2664 			   lock, so instead we get the lock and re-fetch the
   2665 			   value below.
   2666 
   2667 			   Notice that not doing this optimisation on the
   2668 			   first hash chain is critical. We must guarantee
   2669 			   that we have done at least one fcntl lock at the
   2670 			   start of a search to guarantee that memory is
   2671 			   coherent on SMP systems. If records are added by
   2672 			   others during the search then thats OK, and we
   2673 			   could possibly miss those with this trick, but we
   2674 			   could miss them anyway without this trick, so the
   2675 			   semantics don't change.
   2676 
   2677 			   With a non-indexed ldb search this trick gains us a
   2678 			   factor of around 80 in speed on a linux 2.6.x
   2679 			   system (testing using ldbtest).
   2680 			*/
   2681 			tdb->methods->next_hash_chain(tdb, &tlock->hash);
   2682 			if (tlock->hash == tdb->header.hash_size) {
   2683 				continue;
   2684 			}
   2685 		}
   2686 
   2687 		if (tdb_lock(tdb, tlock->hash, tlock->lock_rw) == -1)
   2688 			return -1;
   2689 
   2690 		/* No previous record?  Start at top of chain. */
   2691 		if (!tlock->off) {
   2692 			if (tdb_ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
   2693 				     &tlock->off) == -1)
   2694 				goto fail;
   2695 		} else {
   2696 			/* Otherwise unlock the previous record. */
   2697 			if (tdb_unlock_record(tdb, tlock->off) != 0)
   2698 				goto fail;
   2699 		}
   2700 
   2701 		if (want_next) {
   2702 			/* We have offset of old record: grab next */
   2703 			if (tdb_rec_read(tdb, tlock->off, rec) == -1)
   2704 				goto fail;
   2705 			tlock->off = rec->next;
   2706 		}
   2707 
   2708 		/* Iterate through chain */
   2709 		while( tlock->off) {
   2710 			tdb_off_t current;
   2711 			if (tdb_rec_read(tdb, tlock->off, rec) == -1)
   2712 				goto fail;
   2713 
   2714 			/* Detect infinite loops. From "Shlomi Yaakobovich" <Shlomi (at) exanet.com>. */
   2715 			if (tlock->off == rec->next) {
   2716 				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: loop detected.\n"));
   2717 				goto fail;
   2718 			}
   2719 
   2720 			if (!TDB_DEAD(rec)) {
   2721 				/* Woohoo: we found one! */
   2722 				if (tdb_lock_record(tdb, tlock->off) != 0)
   2723 					goto fail;
   2724 				return tlock->off;
   2725 			}
   2726 
   2727 			/* Try to clean dead ones from old traverses */
   2728 			current = tlock->off;
   2729 			tlock->off = rec->next;
   2730 			if (!(tdb->read_only || tdb->traverse_read) &&
   2731 			    tdb_do_delete(tdb, current, rec) != 0)
   2732 				goto fail;
   2733 		}
   2734 		tdb_unlock(tdb, tlock->hash, tlock->lock_rw);
   2735 		want_next = 0;
   2736 	}
   2737 	/* We finished iteration without finding anything */
   2738 	return TDB_ERRCODE(TDB_SUCCESS, 0);
   2739 
   2740  fail:
   2741 	tlock->off = 0;
   2742 	if (tdb_unlock(tdb, tlock->hash, tlock->lock_rw) != 0)
   2743 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: On error unlock failed!\n"));
   2744 	return -1;
   2745 }
   2746 
   2747 /* traverse the entire database - calling fn(tdb, key, data) on each element.
   2748    return -1 on error or the record count traversed
   2749    if fn is NULL then it is not called
   2750    a non-zero return value from fn() indicates that the traversal should stop
   2751   */
   2752 static int tdb_traverse_internal(struct tdb_context *tdb,
   2753 				 tdb_traverse_func fn, void *private_data,
   2754 				 struct tdb_traverse_lock *tl)
   2755 {
   2756 	TDB_DATA key, dbuf;
   2757 	struct list_struct rec;
   2758 	int ret, count = 0;
   2759 
   2760 	/* This was in the initializaton, above, but the IRIX compiler
   2761 	 * did not like it.  crh
   2762 	 */
   2763 	tl->next = tdb->travlocks.next;
   2764 
   2765 	/* fcntl locks don't stack: beware traverse inside traverse */
   2766 	tdb->travlocks.next = tl;
   2767 
   2768 	/* tdb_next_lock places locks on the record returned, and its chain */
   2769 	while ((ret = tdb_next_lock(tdb, tl, &rec)) > 0) {
   2770 		count++;
   2771 		/* now read the full record */
   2772 		key.dptr = tdb_alloc_read(tdb, tl->off + sizeof(rec),
   2773 					  rec.key_len + rec.data_len);
   2774 		if (!key.dptr) {
   2775 			ret = -1;
   2776 			if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0)
   2777 				goto out;
   2778 			if (tdb_unlock_record(tdb, tl->off) != 0)
   2779 				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
   2780 			goto out;
   2781 		}
   2782 		key.dsize = rec.key_len;
   2783 		dbuf.dptr = key.dptr + rec.key_len;
   2784 		dbuf.dsize = rec.data_len;
   2785 
   2786 		/* Drop chain lock, call out */
   2787 		if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0) {
   2788 			ret = -1;
   2789 			SAFE_FREE(key.dptr);
   2790 			goto out;
   2791 		}
   2792 		if (fn && fn(tdb, key, dbuf, private_data)) {
   2793 			/* They want us to terminate traversal */
   2794 			ret = count;
   2795 			if (tdb_unlock_record(tdb, tl->off) != 0) {
   2796 				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: unlock_record failed!\n"));;
   2797 				ret = -1;
   2798 			}
   2799 			SAFE_FREE(key.dptr);
   2800 			goto out;
   2801 		}
   2802 		SAFE_FREE(key.dptr);
   2803 	}
   2804 out:
   2805 	tdb->travlocks.next = tl->next;
   2806 	if (ret < 0)
   2807 		return -1;
   2808 	else
   2809 		return count;
   2810 }
   2811 
   2812 
   2813 /*
   2814   a write style traverse - temporarily marks the db read only
   2815 */
   2816 int tdb_traverse_read(struct tdb_context *tdb,
   2817 		      tdb_traverse_func fn, void *private_data)
   2818 {
   2819 	struct tdb_traverse_lock tl = { NULL, 0, 0, F_RDLCK };
   2820 	int ret;
   2821 
   2822 	/* we need to get a read lock on the transaction lock here to
   2823 	   cope with the lock ordering semantics of solaris10 */
   2824 	if (tdb_transaction_lock(tdb, F_RDLCK)) {
   2825 		return -1;
   2826 	}
   2827 
   2828 	tdb->traverse_read++;
   2829 	ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
   2830 	tdb->traverse_read--;
   2831 
   2832 	tdb_transaction_unlock(tdb);
   2833 
   2834 	return ret;
   2835 }
   2836 
   2837 /*
   2838   a write style traverse - needs to get the transaction lock to
   2839   prevent deadlocks
   2840 */
   2841 int tdb_traverse(struct tdb_context *tdb,
   2842 		 tdb_traverse_func fn, void *private_data)
   2843 {
   2844 	struct tdb_traverse_lock tl = { NULL, 0, 0, F_WRLCK };
   2845 	int ret;
   2846 
   2847 	if (tdb->read_only || tdb->traverse_read) {
   2848 		return tdb_traverse_read(tdb, fn, private_data);
   2849 	}
   2850 
   2851 	if (tdb_transaction_lock(tdb, F_WRLCK)) {
   2852 		return -1;
   2853 	}
   2854 
   2855 	ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
   2856 
   2857 	tdb_transaction_unlock(tdb);
   2858 
   2859 	return ret;
   2860 }
   2861 
   2862 
   2863 /* find the first entry in the database and return its key */
   2864 TDB_DATA tdb_firstkey(struct tdb_context *tdb)
   2865 {
   2866 	TDB_DATA key;
   2867 	struct list_struct rec;
   2868 
   2869 	/* release any old lock */
   2870 	if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0)
   2871 		return tdb_null;
   2872 	tdb->travlocks.off = tdb->travlocks.hash = 0;
   2873 	tdb->travlocks.lock_rw = F_RDLCK;
   2874 
   2875 	/* Grab first record: locks chain and returned record. */
   2876 	if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
   2877 		return tdb_null;
   2878 	/* now read the key */
   2879 	key.dsize = rec.key_len;
   2880 	key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
   2881 
   2882 	/* Unlock the hash chain of the record we just read. */
   2883 	if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0)
   2884 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
   2885 	return key;
   2886 }
   2887 
   2888 /* find the next entry in the database, returning its key */
   2889 TDB_DATA tdb_nextkey(struct tdb_context *tdb, TDB_DATA oldkey)
   2890 {
   2891 	u32 oldhash;
   2892 	TDB_DATA key = tdb_null;
   2893 	struct list_struct rec;
   2894 	unsigned char *k = NULL;
   2895 
   2896 	/* Is locked key the old key?  If so, traverse will be reliable. */
   2897 	if (tdb->travlocks.off) {
   2898 		if (tdb_lock(tdb,tdb->travlocks.hash,tdb->travlocks.lock_rw))
   2899 			return tdb_null;
   2900 		if (tdb_rec_read(tdb, tdb->travlocks.off, &rec) == -1
   2901 		    || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
   2902 					    rec.key_len))
   2903 		    || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
   2904 			/* No, it wasn't: unlock it and start from scratch */
   2905 			if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0) {
   2906 				SAFE_FREE(k);
   2907 				return tdb_null;
   2908 			}
   2909 			if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0) {
   2910 				SAFE_FREE(k);
   2911 				return tdb_null;
   2912 			}
   2913 			tdb->travlocks.off = 0;
   2914 		}
   2915 
   2916 		SAFE_FREE(k);
   2917 	}
   2918 
   2919 	if (!tdb->travlocks.off) {
   2920 		/* No previous element: do normal find, and lock record */
   2921 		tdb->travlocks.off = tdb_find_lock_hash(tdb, oldkey, tdb->hash_fn(&oldkey), tdb->travlocks.lock_rw, &rec);
   2922 		if (!tdb->travlocks.off)
   2923 			return tdb_null;
   2924 		tdb->travlocks.hash = BUCKET(rec.full_hash);
   2925 		if (tdb_lock_record(tdb, tdb->travlocks.off) != 0) {
   2926 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
   2927 			return tdb_null;
   2928 		}
   2929 	}
   2930 	oldhash = tdb->travlocks.hash;
   2931 
   2932 	/* Grab next record: locks chain and returned record,
   2933 	   unlocks old record */
   2934 	if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
   2935 		key.dsize = rec.key_len;
   2936 		key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
   2937 					  key.dsize);
   2938 		/* Unlock the chain of this new record */
   2939 		if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0)
   2940 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
   2941 	}
   2942 	/* Unlock the chain of old record */
   2943 	if (tdb_unlock(tdb, BUCKET(oldhash), tdb->travlocks.lock_rw) != 0)
   2944 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
   2945 	return key;
   2946 }
   2947 
   2948 /* file: dump.c */
   2949 
   2950 static tdb_off_t tdb_dump_record(struct tdb_context *tdb, int hash,
   2951 				 tdb_off_t offset)
   2952 {
   2953 	struct list_struct rec;
   2954 	tdb_off_t tailer_ofs, tailer;
   2955 
   2956 	if (tdb->methods->tdb_read(tdb, offset, (char *)&rec,
   2957 				   sizeof(rec), DOCONV()) == -1) {
   2958 		printf("ERROR: failed to read record at %u\n", offset);
   2959 		return 0;
   2960 	}
   2961 
   2962 	printf(" rec: hash=%d offset=0x%08x next=0x%08x rec_len=%d "
   2963 	       "key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
   2964 	       hash, offset, rec.next, rec.rec_len, rec.key_len, rec.data_len,
   2965 	       rec.full_hash, rec.magic);
   2966 
   2967 	tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off_t);
   2968 
   2969 	if (tdb_ofs_read(tdb, tailer_ofs, &tailer) == -1) {
   2970 		printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
   2971 		return rec.next;
   2972 	}
   2973 
   2974 	if (tailer != rec.rec_len + sizeof(rec)) {
   2975 		printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
   2976 				(unsigned int)tailer, (unsigned int)(rec.rec_len + sizeof(rec)));
   2977 	}
   2978 	return rec.next;
   2979 }
   2980 
   2981 static int tdb_dump_chain(struct tdb_context *tdb, int i)
   2982 {
   2983 	tdb_off_t rec_ptr, top;
   2984 
   2985 	top = TDB_HASH_TOP(i);
   2986 
   2987 	if (tdb_lock(tdb, i, F_WRLCK) != 0)
   2988 		return -1;
   2989 
   2990 	if (tdb_ofs_read(tdb, top, &rec_ptr) == -1)
   2991 		return tdb_unlock(tdb, i, F_WRLCK);
   2992 
   2993 	if (rec_ptr)
   2994 		printf("hash=%d\n", i);
   2995 
   2996 	while (rec_ptr) {
   2997 		rec_ptr = tdb_dump_record(tdb, i, rec_ptr);
   2998 	}
   2999 
   3000 	return tdb_unlock(tdb, i, F_WRLCK);
   3001 }
   3002 
   3003 void tdb_dump_all(struct tdb_context *tdb)
   3004 {
   3005 	int i;
   3006 	for (i=0;i<tdb->header.hash_size;i++) {
   3007 		tdb_dump_chain(tdb, i);
   3008 	}
   3009 	printf("freelist:\n");
   3010 	tdb_dump_chain(tdb, -1);
   3011 }
   3012 
   3013 int tdb_printfreelist(struct tdb_context *tdb)
   3014 {
   3015 	int ret;
   3016 	long total_free = 0;
   3017 	tdb_off_t offset, rec_ptr;
   3018 	struct list_struct rec;
   3019 
   3020 	if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
   3021 		return ret;
   3022 
   3023 	offset = FREELIST_TOP;
   3024 
   3025 	/* read in the freelist top */
   3026 	if (tdb_ofs_read(tdb, offset, &rec_ptr) == -1) {
   3027 		tdb_unlock(tdb, -1, F_WRLCK);
   3028 		return 0;
   3029 	}
   3030 
   3031 	printf("freelist top=[0x%08x]\n", rec_ptr );
   3032 	while (rec_ptr) {
   3033 		if (tdb->methods->tdb_read(tdb, rec_ptr, (char *)&rec,
   3034 					   sizeof(rec), DOCONV()) == -1) {
   3035 			tdb_unlock(tdb, -1, F_WRLCK);
   3036 			return -1;
   3037 		}
   3038 
   3039 		if (rec.magic != TDB_FREE_MAGIC) {
   3040 			printf("bad magic 0x%08x in free list\n", rec.magic);
   3041 			tdb_unlock(tdb, -1, F_WRLCK);
   3042 			return -1;
   3043 		}
   3044 
   3045 		printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)] (end = 0x%08x)\n",
   3046 		       rec_ptr, rec.rec_len, rec.rec_len, rec_ptr + rec.rec_len);
   3047 		total_free += rec.rec_len;
   3048 
   3049 		/* move to the next record */
   3050 		rec_ptr = rec.next;
   3051 	}
   3052 	printf("total rec_len = [0x%08x (%d)]\n", (int)total_free,
   3053                (int)total_free);
   3054 
   3055 	return tdb_unlock(tdb, -1, F_WRLCK);
   3056 }
   3057 
   3058 /* file: tdb.c */
   3059 
   3060 TDB_DATA tdb_null;
   3061 
   3062 /*
   3063   non-blocking increment of the tdb sequence number if the tdb has been opened using
   3064   the TDB_SEQNUM flag
   3065 */
   3066 void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
   3067 {
   3068 	tdb_off_t seqnum=0;
   3069 
   3070 	if (!(tdb->flags & TDB_SEQNUM)) {
   3071 		return;
   3072 	}
   3073 
   3074 	/* we ignore errors from this, as we have no sane way of
   3075 	   dealing with them.
   3076 	*/
   3077 	tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
   3078 	seqnum++;
   3079 	tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
   3080 }
   3081 
   3082 /*
   3083   increment the tdb sequence number if the tdb has been opened using
   3084   the TDB_SEQNUM flag
   3085 */
   3086 static void tdb_increment_seqnum(struct tdb_context *tdb)
   3087 {
   3088 	if (!(tdb->flags & TDB_SEQNUM)) {
   3089 		return;
   3090 	}
   3091 
   3092 	if (tdb_brlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, F_SETLKW, 1, 1) != 0) {
   3093 		return;
   3094 	}
   3095 
   3096 	tdb_increment_seqnum_nonblock(tdb);
   3097 
   3098 	tdb_brlock(tdb, TDB_SEQNUM_OFS, F_UNLCK, F_SETLKW, 1, 1);
   3099 }
   3100 
   3101 static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
   3102 {
   3103 	return memcmp(data.dptr, key.dptr, data.dsize);
   3104 }
   3105 
   3106 /* Returns 0 on fail.  On success, return offset of record, and fills
   3107    in rec */
   3108 static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, u32 hash,
   3109 			struct list_struct *r)
   3110 {
   3111 	tdb_off_t rec_ptr;
   3112 
   3113 	/* read in the hash top */
   3114 	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
   3115 		return 0;
   3116 
   3117 	/* keep looking until we find the right record */
   3118 	while (rec_ptr) {
   3119 		if (tdb_rec_read(tdb, rec_ptr, r) == -1)
   3120 			return 0;
   3121 
   3122 		if (!TDB_DEAD(r) && hash==r->full_hash
   3123 		    && key.dsize==r->key_len
   3124 		    && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
   3125 				      r->key_len, tdb_key_compare,
   3126 				      NULL) == 0) {
   3127 			return rec_ptr;
   3128 		}
   3129 		rec_ptr = r->next;
   3130 	}
   3131 	return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
   3132 }
   3133 
   3134 /* As tdb_find, but if you succeed, keep the lock */
   3135 tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
   3136 			   struct list_struct *rec)
   3137 {
   3138 	u32 rec_ptr;
   3139 
   3140 	if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
   3141 		return 0;
   3142 	if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
   3143 		tdb_unlock(tdb, BUCKET(hash), locktype);
   3144 	return rec_ptr;
   3145 }
   3146 
   3147 
   3148 /* update an entry in place - this only works if the new data size
   3149    is <= the old data size and the key exists.
   3150    on failure return -1.
   3151 */
   3152 static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, TDB_DATA dbuf)
   3153 {
   3154 	struct list_struct rec;
   3155 	tdb_off_t rec_ptr;
   3156 
   3157 	/* find entry */
   3158 	if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
   3159 		return -1;
   3160 
   3161 	/* must be long enough key, data and tailer */
   3162 	if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off_t)) {
   3163 		tdb->ecode = TDB_SUCCESS; /* Not really an error */
   3164 		return -1;
   3165 	}
   3166 
   3167 	if (tdb->methods->tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
   3168 		      dbuf.dptr, dbuf.dsize) == -1)
   3169 		return -1;
   3170 
   3171 	if (dbuf.dsize != rec.data_len) {
   3172 		/* update size */
   3173 		rec.data_len = dbuf.dsize;
   3174 		return tdb_rec_write(tdb, rec_ptr, &rec);
   3175 	}
   3176 
   3177 	return 0;
   3178 }
   3179 
   3180 /* find an entry in the database given a key */
   3181 /* If an entry doesn't exist tdb_err will be set to
   3182  * TDB_ERR_NOEXIST. If a key has no data attached
   3183  * then the TDB_DATA will have zero length but
   3184  * a non-zero pointer
   3185  */
   3186 TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
   3187 {
   3188 	tdb_off_t rec_ptr;
   3189 	struct list_struct rec;
   3190 	TDB_DATA ret;
   3191 	u32 hash;
   3192 
   3193 	/* find which hash bucket it is in */
   3194 	hash = tdb->hash_fn(&key);
   3195 	if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
   3196 		return tdb_null;
   3197 
   3198 	ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
   3199 				  rec.data_len);
   3200 	ret.dsize = rec.data_len;
   3201 	tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
   3202 	return ret;
   3203 }
   3204 
   3205 /*
   3206  * Find an entry in the database and hand the record's data to a parsing
   3207  * function. The parsing function is executed under the chain read lock, so it
   3208  * should be fast and should not block on other syscalls.
   3209  *
   3210  * DONT CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
   3211  *
   3212  * For mmapped tdb's that do not have a transaction open it points the parsing
   3213  * function directly at the mmap area, it avoids the malloc/memcpy in this
   3214  * case. If a transaction is open or no mmap is available, it has to do
   3215  * malloc/read/parse/free.
   3216  *
   3217  * This is interesting for all readers of potentially large data structures in
   3218  * the tdb records, ldb indexes being one example.
   3219  */
   3220 
   3221 int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
   3222 		     int (*parser)(TDB_DATA key, TDB_DATA data,
   3223 				   void *private_data),
   3224 		     void *private_data)
   3225 {
   3226 	tdb_off_t rec_ptr;
   3227 	struct list_struct rec;
   3228 	int ret;
   3229 	u32 hash;
   3230 
   3231 	/* find which hash bucket it is in */
   3232 	hash = tdb->hash_fn(&key);
   3233 
   3234 	if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
   3235 		return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
   3236 	}
   3237 
   3238 	ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
   3239 			     rec.data_len, parser, private_data);
   3240 
   3241 	tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
   3242 
   3243 	return ret;
   3244 }
   3245 
   3246 /* check if an entry in the database exists
   3247 
   3248    note that 1 is returned if the key is found and 0 is returned if not found
   3249    this doesn't match the conventions in the rest of this module, but is
   3250    compatible with gdbm
   3251 */
   3252 static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
   3253 {
   3254 	struct list_struct rec;
   3255 
   3256 	if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
   3257 		return 0;
   3258 	tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
   3259 	return 1;
   3260 }
   3261 
   3262 int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
   3263 {
   3264 	u32 hash = tdb->hash_fn(&key);
   3265 	return tdb_exists_hash(tdb, key, hash);
   3266 }
   3267 
   3268 /* actually delete an entry in the database given the offset */
   3269 int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct*rec)
   3270 {
   3271 	tdb_off_t last_ptr, i;
   3272 	struct list_struct lastrec;
   3273 
   3274 	if (tdb->read_only || tdb->traverse_read) return -1;
   3275 
   3276 	if (tdb_write_lock_record(tdb, rec_ptr) == -1) {
   3277 		/* Someone traversing here: mark it as dead */
   3278 		rec->magic = TDB_DEAD_MAGIC;
   3279 		return tdb_rec_write(tdb, rec_ptr, rec);
   3280 	}
   3281 	if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
   3282 		return -1;
   3283 
   3284 	/* find previous record in hash chain */
   3285 	if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
   3286 		return -1;
   3287 	for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
   3288 		if (tdb_rec_read(tdb, i, &lastrec) == -1)
   3289 			return -1;
   3290 
   3291 	/* unlink it: next ptr is at start of record. */
   3292 	if (last_ptr == 0)
   3293 		last_ptr = TDB_HASH_TOP(rec->full_hash);
   3294 	if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
   3295 		return -1;
   3296 
   3297 	/* recover the space */
   3298 	if (tdb_free(tdb, rec_ptr, rec) == -1)
   3299 		return -1;
   3300 	return 0;
   3301 }
   3302 
   3303 static int tdb_count_dead(struct tdb_context *tdb, u32 hash)
   3304 {
   3305 	int res = 0;
   3306 	tdb_off_t rec_ptr;
   3307 	struct list_struct rec;
   3308 
   3309 	/* read in the hash top */
   3310 	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
   3311 		return 0;
   3312 
   3313 	while (rec_ptr) {
   3314 		if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
   3315 			return 0;
   3316 
   3317 		if (rec.magic == TDB_DEAD_MAGIC) {
   3318 			res += 1;
   3319 		}
   3320 		rec_ptr = rec.next;
   3321 	}
   3322 	return res;
   3323 }
   3324 
   3325 /*
   3326  * Purge all DEAD records from a hash chain
   3327  */
   3328 static int tdb_purge_dead(struct tdb_context *tdb, u32 hash)
   3329 {
   3330 	int res = -1;
   3331 	struct list_struct rec;
   3332 	tdb_off_t rec_ptr;
   3333 
   3334 	if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
   3335 		return -1;
   3336 	}
   3337 
   3338 	/* read in the hash top */
   3339 	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
   3340 		goto fail;
   3341 
   3342 	while (rec_ptr) {
   3343 		tdb_off_t next;
   3344 
   3345 		if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
   3346 			goto fail;
   3347 		}
   3348 
   3349 		next = rec.next;
   3350 
   3351 		if (rec.magic == TDB_DEAD_MAGIC
   3352 		    && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
   3353 			goto fail;
   3354 		}
   3355 		rec_ptr = next;
   3356 	}
   3357 	res = 0;
   3358  fail:
   3359 	tdb_unlock(tdb, -1, F_WRLCK);
   3360 	return res;
   3361 }
   3362 
   3363 /* delete an entry in the database given a key */
   3364 static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
   3365 {
   3366 	tdb_off_t rec_ptr;
   3367 	struct list_struct rec;
   3368 	int ret;
   3369 
   3370 	if (tdb->max_dead_records != 0) {
   3371 
   3372 		/*
   3373 		 * Allow for some dead records per hash chain, mainly for
   3374 		 * tdb's with a very high create/delete rate like locking.tdb.
   3375 		 */
   3376 
   3377 		if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
   3378 			return -1;
   3379 
   3380 		if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
   3381 			/*
   3382 			 * Don't let the per-chain freelist grow too large,
   3383 			 * delete all existing dead records
   3384 			 */
   3385 			tdb_purge_dead(tdb, hash);
   3386 		}
   3387 
   3388 		if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
   3389 			tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
   3390 			return -1;
   3391 		}
   3392 
   3393 		/*
   3394 		 * Just mark the record as dead.
   3395 		 */
   3396 		rec.magic = TDB_DEAD_MAGIC;
   3397 		ret = tdb_rec_write(tdb, rec_ptr, &rec);
   3398 	}
   3399 	else {
   3400 		if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK,
   3401 						   &rec)))
   3402 			return -1;
   3403 
   3404 		ret = tdb_do_delete(tdb, rec_ptr, &rec);
   3405 	}
   3406 
   3407 	if (ret == 0) {
   3408 		tdb_increment_seqnum(tdb);
   3409 	}
   3410 
   3411 	if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
   3412 		TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
   3413 	return ret;
   3414 }
   3415 
   3416 int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
   3417 {
   3418 	u32 hash = tdb->hash_fn(&key);
   3419 	return tdb_delete_hash(tdb, key, hash);
   3420 }
   3421 
   3422 /*
   3423  * See if we have a dead record around with enough space
   3424  */
   3425 static tdb_off_t tdb_find_dead(struct tdb_context *tdb, u32 hash,
   3426 			       struct list_struct *r, tdb_len_t length)
   3427 {
   3428 	tdb_off_t rec_ptr;
   3429 
   3430 	/* read in the hash top */
   3431 	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
   3432 		return 0;
   3433 
   3434 	/* keep looking until we find the right record */
   3435 	while (rec_ptr) {
   3436 		if (tdb_rec_read(tdb, rec_ptr, r) == -1)
   3437 			return 0;
   3438 
   3439 		if (TDB_DEAD(r) && r->rec_len >= length) {
   3440 			/*
   3441 			 * First fit for simple coding, TODO: change to best
   3442 			 * fit
   3443 			 */
   3444 			return rec_ptr;
   3445 		}
   3446 		rec_ptr = r->next;
   3447 	}
   3448 	return 0;
   3449 }
   3450 
   3451 /* store an element in the database, replacing any existing element
   3452    with the same key
   3453 
   3454    return 0 on success, -1 on failure
   3455 */
   3456 int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
   3457 {
   3458 	struct list_struct rec;
   3459 	u32 hash;
   3460 	tdb_off_t rec_ptr;
   3461 	char *p = NULL;
   3462 	int ret = -1;
   3463 
   3464 	if (tdb->read_only || tdb->traverse_read) {
   3465 		tdb->ecode = TDB_ERR_RDONLY;
   3466 		return -1;
   3467 	}
   3468 
   3469 	/* find which hash bucket it is in */
   3470 	hash = tdb->hash_fn(&key);
   3471 	if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
   3472 		return -1;
   3473 
   3474 	/* check for it existing, on insert. */
   3475 	if (flag == TDB_INSERT) {
   3476 		if (tdb_exists_hash(tdb, key, hash)) {
   3477 			tdb->ecode = TDB_ERR_EXISTS;
   3478 			goto fail;
   3479 		}
   3480 	} else {
   3481 		/* first try in-place update, on modify or replace. */
   3482 		if (tdb_update_hash(tdb, key, hash, dbuf) == 0) {
   3483 			goto done;
   3484 		}
   3485 		if (tdb->ecode == TDB_ERR_NOEXIST &&
   3486 		    flag == TDB_MODIFY) {
   3487 			/* if the record doesn't exist and we are in TDB_MODIFY mode then
   3488 			 we should fail the store */
   3489 			goto fail;
   3490 		}
   3491 	}
   3492 	/* reset the error code potentially set by the tdb_update() */
   3493 	tdb->ecode = TDB_SUCCESS;
   3494 
   3495 	/* delete any existing record - if it doesn't exist we don't
   3496            care.  Doing this first reduces fragmentation, and avoids
   3497            coalescing with `allocated' block before it's updated. */
   3498 	if (flag != TDB_INSERT)
   3499 		tdb_delete_hash(tdb, key, hash);
   3500 
   3501 	/* Copy key+value *before* allocating free space in case malloc
   3502 	   fails and we are left with a dead spot in the tdb. */
   3503 
   3504 	if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
   3505 		tdb->ecode = TDB_ERR_OOM;
   3506 		goto fail;
   3507 	}
   3508 
   3509 	memcpy(p, key.dptr, key.dsize);
   3510 	if (dbuf.dsize)
   3511 		memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
   3512 
   3513 	if (tdb->max_dead_records != 0) {
   3514 		/*
   3515 		 * Allow for some dead records per hash chain, look if we can
   3516 		 * find one that can hold the new record. We need enough space
   3517 		 * for key, data and tailer. If we find one, we don't have to
   3518 		 * consult the central freelist.
   3519 		 */
   3520 		rec_ptr = tdb_find_dead(
   3521 			tdb, hash, &rec,
   3522 			key.dsize + dbuf.dsize + sizeof(tdb_off_t));
   3523 
   3524 		if (rec_ptr != 0) {
   3525 			rec.key_len = key.dsize;
   3526 			rec.data_len = dbuf.dsize;
   3527 			rec.full_hash = hash;
   3528 			rec.magic = TDB_MAGIC;
   3529 			if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
   3530 			    || tdb->methods->tdb_write(
   3531 				    tdb, rec_ptr + sizeof(rec),
   3532 				    p, key.dsize + dbuf.dsize) == -1) {
   3533 				goto fail;
   3534 			}
   3535 			goto done;
   3536 		}
   3537 	}
   3538 
   3539 	/*
   3540 	 * We have to allocate some space from the freelist, so this means we
   3541 	 * have to lock it. Use the chance to purge all the DEAD records from
   3542 	 * the hash chain under the freelist lock.
   3543 	 */
   3544 
   3545 	if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
   3546 		goto fail;
   3547 	}
   3548 
   3549 	if ((tdb->max_dead_records != 0)
   3550 	    && (tdb_purge_dead(tdb, hash) == -1)) {
   3551 		tdb_unlock(tdb, -1, F_WRLCK);
   3552 		goto fail;
   3553 	}
   3554 
   3555 	/* we have to allocate some space */
   3556 	rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec);
   3557 
   3558 	tdb_unlock(tdb, -1, F_WRLCK);
   3559 
   3560 	if (rec_ptr == 0) {
   3561 		goto fail;
   3562 	}
   3563 
   3564 	/* Read hash top into next ptr */
   3565 	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
   3566 		goto fail;
   3567 
   3568 	rec.key_len = key.dsize;
   3569 	rec.data_len = dbuf.dsize;
   3570 	rec.full_hash = hash;
   3571 	rec.magic = TDB_MAGIC;
   3572 
   3573 	/* write out and point the top of the hash chain at it */
   3574 	if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
   3575 	    || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
   3576 	    || tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
   3577 		/* Need to tdb_unallocate() here */
   3578 		goto fail;
   3579 	}
   3580 
   3581  done:
   3582 	ret = 0;
   3583  fail:
   3584 	if (ret == 0) {
   3585 		tdb_increment_seqnum(tdb);
   3586 	}
   3587 
   3588 	SAFE_FREE(p);
   3589 	tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
   3590 	return ret;
   3591 }
   3592 
   3593 
   3594 /* Append to an entry. Create if not exist. */
   3595 int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
   3596 {
   3597 	u32 hash;
   3598 	TDB_DATA dbuf;
   3599 	int ret = -1;
   3600 
   3601 	/* find which hash bucket it is in */
   3602 	hash = tdb->hash_fn(&key);
   3603 	if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
   3604 		return -1;
   3605 
   3606 	dbuf = tdb_fetch(tdb, key);
   3607 
   3608 	if (dbuf.dptr == NULL) {
   3609 		dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
   3610 	} else {
   3611 		unsigned char *new_dptr = (unsigned char *)realloc(dbuf.dptr,
   3612 						     dbuf.dsize + new_dbuf.dsize);
   3613 		if (new_dptr == NULL) {
   3614 			free(dbuf.dptr);
   3615 		}
   3616 		dbuf.dptr = new_dptr;
   3617 	}
   3618 
   3619 	if (dbuf.dptr == NULL) {
   3620 		tdb->ecode = TDB_ERR_OOM;
   3621 		goto failed;
   3622 	}
   3623 
   3624 	memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
   3625 	dbuf.dsize += new_dbuf.dsize;
   3626 
   3627 	ret = tdb_store(tdb, key, dbuf, 0);
   3628 
   3629 failed:
   3630 	tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
   3631 	SAFE_FREE(dbuf.dptr);
   3632 	return ret;
   3633 }
   3634 
   3635 
   3636 /*
   3637   return the name of the current tdb file
   3638   useful for external logging functions
   3639 */
   3640 const char *tdb_name(struct tdb_context *tdb)
   3641 {
   3642 	return tdb->name;
   3643 }
   3644 
   3645 /*
   3646   return the underlying file descriptor being used by tdb, or -1
   3647   useful for external routines that want to check the device/inode
   3648   of the fd
   3649 */
   3650 int tdb_fd(struct tdb_context *tdb)
   3651 {
   3652 	return tdb->fd;
   3653 }
   3654 
   3655 /*
   3656   return the current logging function
   3657   useful for external tdb routines that wish to log tdb errors
   3658 */
   3659 tdb_log_func tdb_log_fn(struct tdb_context *tdb)
   3660 {
   3661 	return tdb->log.log_fn;
   3662 }
   3663 
   3664 
   3665 /*
   3666   get the tdb sequence number. Only makes sense if the writers opened
   3667   with TDB_SEQNUM set. Note that this sequence number will wrap quite
   3668   quickly, so it should only be used for a 'has something changed'
   3669   test, not for code that relies on the count of the number of changes
   3670   made. If you want a counter then use a tdb record.
   3671 
   3672   The aim of this sequence number is to allow for a very lightweight
   3673   test of a possible tdb change.
   3674 */
   3675 int tdb_get_seqnum(struct tdb_context *tdb)
   3676 {
   3677 	tdb_off_t seqnum=0;
   3678 
   3679 	tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
   3680 	return seqnum;
   3681 }
   3682 
   3683 int tdb_hash_size(struct tdb_context *tdb)
   3684 {
   3685 	return tdb->header.hash_size;
   3686 }
   3687 
   3688 size_t tdb_map_size(struct tdb_context *tdb)
   3689 {
   3690 	return tdb->map_size;
   3691 }
   3692 
   3693 int tdb_get_flags(struct tdb_context *tdb)
   3694 {
   3695 	return tdb->flags;
   3696 }
   3697 
   3698 
   3699 /*
   3700   enable sequence number handling on an open tdb
   3701 */
   3702 void tdb_enable_seqnum(struct tdb_context *tdb)
   3703 {
   3704 	tdb->flags |= TDB_SEQNUM;
   3705 }
   3706 
   3707 /* file: open.c */
   3708 
   3709 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
   3710 static struct tdb_context *tdbs = NULL;
   3711 
   3712 
   3713 /* This is based on the hash algorithm from gdbm */
   3714 static unsigned int default_tdb_hash(TDB_DATA *key)
   3715 {
   3716 	u32 value;	/* Used to compute the hash value.  */
   3717 	u32   i;	/* Used to cycle through random values. */
   3718 
   3719 	/* Set the initial value from the key size. */
   3720 	for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
   3721 		value = (value + (key->dptr[i] << (i*5 % 24)));
   3722 
   3723 	return (1103515243 * value + 12345);
   3724 }
   3725 
   3726 
   3727 /* initialise a new database with a specified hash size */
   3728 static int tdb_new_database(struct tdb_context *tdb, int hash_size)
   3729 {
   3730 	struct tdb_header *newdb;
   3731 	int size, ret = -1;
   3732 
   3733 	/* We make it up in memory, then write it out if not internal */
   3734 	size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off_t);
   3735 	if (!(newdb = (struct tdb_header *)calloc(size, 1)))
   3736 		return TDB_ERRCODE(TDB_ERR_OOM, -1);
   3737 
   3738 	/* Fill in the header */
   3739 	newdb->version = TDB_VERSION;
   3740 	newdb->hash_size = hash_size;
   3741 	if (tdb->flags & TDB_INTERNAL) {
   3742 		tdb->map_size = size;
   3743 		tdb->map_ptr = (char *)newdb;
   3744 		memcpy(&tdb->header, newdb, sizeof(tdb->header));
   3745 		/* Convert the `ondisk' version if asked. */
   3746 		CONVERT(*newdb);
   3747 		return 0;
   3748 	}
   3749 	if (lseek(tdb->fd, 0, SEEK_SET) == -1)
   3750 		goto fail;
   3751 
   3752 	if (ftruncate(tdb->fd, 0) == -1)
   3753 		goto fail;
   3754 
   3755 	/* This creates an endian-converted header, as if read from disk */
   3756 	CONVERT(*newdb);
   3757 	memcpy(&tdb->header, newdb, sizeof(tdb->header));
   3758 	/* Don't endian-convert the magic food! */
   3759 	memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
   3760 	if (write(tdb->fd, newdb, size) != size) {
   3761 		ret = -1;
   3762 	} else {
   3763 		ret = 0;
   3764 	}
   3765 
   3766   fail:
   3767 	SAFE_FREE(newdb);
   3768 	return ret;
   3769 }
   3770 
   3771 
   3772 
   3773 static int tdb_already_open(dev_t device,
   3774 			    ino_t ino)
   3775 {
   3776 	struct tdb_context *i;
   3777 
   3778 	for (i = tdbs; i; i = i->next) {
   3779 		if (i->device == device && i->inode == ino) {
   3780 			return 1;
   3781 		}
   3782 	}
   3783 
   3784 	return 0;
   3785 }
   3786 
   3787 /* open the database, creating it if necessary
   3788 
   3789    The open_flags and mode are passed straight to the open call on the
   3790    database file. A flags value of O_WRONLY is invalid. The hash size
   3791    is advisory, use zero for a default value.
   3792 
   3793    Return is NULL on error, in which case errno is also set.  Don't
   3794    try to call tdb_error or tdb_errname, just do strerror(errno).
   3795 
   3796    @param name may be NULL for internal databases. */
   3797 struct tdb_context *tdb_open(const char *name, int hash_size, int tdb_flags,
   3798 		      int open_flags, mode_t mode)
   3799 {
   3800 	return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL, NULL);
   3801 }
   3802 
   3803 /* a default logging function */
   3804 static void null_log_fn(struct tdb_context *tdb, enum tdb_debug_level level, const char *fmt, ...) PRINTF_ATTRIBUTE(3, 4);
   3805 static void null_log_fn(struct tdb_context *tdb, enum tdb_debug_level level, const char *fmt, ...)
   3806 {
   3807 }
   3808 
   3809 
   3810 struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
   3811 				int open_flags, mode_t mode,
   3812 				const struct tdb_logging_context *log_ctx,
   3813 				tdb_hash_func hash_fn)
   3814 {
   3815 	struct tdb_context *tdb;
   3816 	struct stat st;
   3817 	int rev = 0, locked = 0;
   3818 	unsigned char *vp;
   3819 	u32 vertest;
   3820 
   3821 	if (!(tdb = (struct tdb_context *)calloc(1, sizeof *tdb))) {
   3822 		/* Can't log this */
   3823 		errno = ENOMEM;
   3824 		goto fail;
   3825 	}
   3826 	tdb_io_init(tdb);
   3827 	tdb->fd = -1;
   3828 	tdb->name = NULL;
   3829 	tdb->map_ptr = NULL;
   3830 	tdb->flags = tdb_flags;
   3831 	tdb->open_flags = open_flags;
   3832 	if (log_ctx) {
   3833 		tdb->log = *log_ctx;
   3834 	} else {
   3835 		tdb->log.log_fn = null_log_fn;
   3836 		tdb->log.log_private = NULL;
   3837 	}
   3838 	tdb->hash_fn = hash_fn ? hash_fn : default_tdb_hash;
   3839 
   3840 	/* cache the page size */
   3841 	tdb->page_size = getpagesize();
   3842 	if (tdb->page_size <= 0) {
   3843 		tdb->page_size = 0x2000;
   3844 	}
   3845 
   3846 	if ((open_flags & O_ACCMODE) == O_WRONLY) {
   3847 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: can't open tdb %s write-only\n",
   3848 			 name));
   3849 		errno = EINVAL;
   3850 		goto fail;
   3851 	}
   3852 
   3853 	if (hash_size == 0)
   3854 		hash_size = DEFAULT_HASH_SIZE;
   3855 	if ((open_flags & O_ACCMODE) == O_RDONLY) {
   3856 		tdb->read_only = 1;
   3857 		/* read only databases don't do locking or clear if first */
   3858 		tdb->flags |= TDB_NOLOCK;
   3859 		tdb->flags &= ~TDB_CLEAR_IF_FIRST;
   3860 	}
   3861 
   3862 	/* internal databases don't mmap or lock, and start off cleared */
   3863 	if (tdb->flags & TDB_INTERNAL) {
   3864 		tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
   3865 		tdb->flags &= ~TDB_CLEAR_IF_FIRST;
   3866 		if (tdb_new_database(tdb, hash_size) != 0) {
   3867 			TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: tdb_new_database failed!"));
   3868 			goto fail;
   3869 		}
   3870 		goto internal;
   3871 	}
   3872 
   3873 	if ((tdb->fd = open(name, open_flags, mode)) == -1) {
   3874 		TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_open_ex: could not open file %s: %s\n",
   3875 			 name, strerror(errno)));
   3876 		goto fail;	/* errno set by open(2) */
   3877 	}
   3878 
   3879 	/* ensure there is only one process initialising at once */
   3880 	if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
   3881 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to get global lock on %s: %s\n",
   3882 			 name, strerror(errno)));
   3883 		goto fail;	/* errno set by tdb_brlock */
   3884 	}
   3885 
   3886 	/* we need to zero database if we are the only one with it open */
   3887 	if ((tdb_flags & TDB_CLEAR_IF_FIRST) &&
   3888 	    (locked = (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0, 1) == 0))) {
   3889 		open_flags |= O_CREAT;
   3890 		if (ftruncate(tdb->fd, 0) == -1) {
   3891 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_open_ex: "
   3892 				 "failed to truncate %s: %s\n",
   3893 				 name, strerror(errno)));
   3894 			goto fail; /* errno set by ftruncate */
   3895 		}
   3896 	}
   3897 
   3898 	if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
   3899 	    || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
   3900 	    || (tdb->header.version != TDB_VERSION
   3901 		&& !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
   3902 		/* its not a valid database - possibly initialise it */
   3903 		if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
   3904 			errno = EIO; /* ie bad format or something */
   3905 			goto fail;
   3906 		}
   3907 		rev = (tdb->flags & TDB_CONVERT);
   3908 	}
   3909 	vp = (unsigned char *)&tdb->header.version;
   3910 	vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
   3911 		  (((u32)vp[2]) << 8) | (u32)vp[3];
   3912 	tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
   3913 	if (!rev)
   3914 		tdb->flags &= ~TDB_CONVERT;
   3915 	else {
   3916 		tdb->flags |= TDB_CONVERT;
   3917 		tdb_convert(&tdb->header, sizeof(tdb->header));
   3918 	}
   3919 	if (fstat(tdb->fd, &st) == -1)
   3920 		goto fail;
   3921 
   3922 	if (tdb->header.rwlocks != 0) {
   3923 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: spinlocks no longer supported\n"));
   3924 		goto fail;
   3925 	}
   3926 
   3927 	/* Is it already in the open list?  If so, fail. */
   3928 	if (tdb_already_open(st.st_dev, st.st_ino)) {
   3929 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
   3930 			 "%s (%d,%d) is already open in this process\n",
   3931 			 name, (int)st.st_dev, (int)st.st_ino));
   3932 		errno = EBUSY;
   3933 		goto fail;
   3934 	}
   3935 
   3936 	if (!(tdb->name = (char *)strdup(name))) {
   3937 		errno = ENOMEM;
   3938 		goto fail;
   3939 	}
   3940 
   3941 	tdb->map_size = st.st_size;
   3942 	tdb->device = st.st_dev;
   3943 	tdb->inode = st.st_ino;
   3944 	tdb->max_dead_records = 0;
   3945 	tdb_mmap(tdb);
   3946 	if (locked) {
   3947 		if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0, 1) == -1) {
   3948 			TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
   3949 				 "failed to take ACTIVE_LOCK on %s: %s\n",
   3950 				 name, strerror(errno)));
   3951 			goto fail;
   3952 		}
   3953 
   3954 	}
   3955 
   3956 	/* We always need to do this if the CLEAR_IF_FIRST flag is set, even if
   3957 	   we didn't get the initial exclusive lock as we need to let all other
   3958 	   users know we're using it. */
   3959 
   3960 	if (tdb_flags & TDB_CLEAR_IF_FIRST) {
   3961 		/* leave this lock in place to indicate it's in use */
   3962 		if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)
   3963 			goto fail;
   3964 	}
   3965 
   3966 	/* if needed, run recovery */
   3967 	if (tdb_transaction_recover(tdb) == -1) {
   3968 		goto fail;
   3969 	}
   3970 
   3971  internal:
   3972 	/* Internal (memory-only) databases skip all the code above to
   3973 	 * do with disk files, and resume here by releasing their
   3974 	 * global lock and hooking into the active list. */
   3975 	if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1) == -1)
   3976 		goto fail;
   3977 	tdb->next = tdbs;
   3978 	tdbs = tdb;
   3979 	return tdb;
   3980 
   3981  fail:
   3982 	{ int save_errno = errno;
   3983 
   3984 	if (!tdb)
   3985 		return NULL;
   3986 
   3987 	if (tdb->map_ptr) {
   3988 		if (tdb->flags & TDB_INTERNAL)
   3989 			SAFE_FREE(tdb->map_ptr);
   3990 		else
   3991 			tdb_munmap(tdb);
   3992 	}
   3993 	SAFE_FREE(tdb->name);
   3994 	if (tdb->fd != -1)
   3995 		if (close(tdb->fd) != 0)
   3996 			TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to close tdb->fd on error!\n"));
   3997 	SAFE_FREE(tdb);
   3998 	errno = save_errno;
   3999 	return NULL;
   4000 	}
   4001 }
   4002 
   4003 /*
   4004  * Set the maximum number of dead records per hash chain
   4005  */
   4006 
   4007 void tdb_set_max_dead(struct tdb_context *tdb, int max_dead)
   4008 {
   4009 	tdb->max_dead_records = max_dead;
   4010 }
   4011 
   4012 /**
   4013  * Close a database.
   4014  *
   4015  * @returns -1 for error; 0 for success.
   4016  **/
   4017 int tdb_close(struct tdb_context *tdb)
   4018 {
   4019 	struct tdb_context **i;
   4020 	int ret = 0;
   4021 
   4022 	if (tdb->transaction) {
   4023 		tdb_transaction_cancel(tdb);
   4024 	}
   4025 
   4026 	if (tdb->map_ptr) {
   4027 		if (tdb->flags & TDB_INTERNAL)
   4028 			SAFE_FREE(tdb->map_ptr);
   4029 		else
   4030 			tdb_munmap(tdb);
   4031 	}
   4032 	SAFE_FREE(tdb->name);
   4033 	if (tdb->fd != -1)
   4034 		ret = close(tdb->fd);
   4035 	SAFE_FREE(tdb->lockrecs);
   4036 
   4037 	/* Remove from contexts list */
   4038 	for (i = &tdbs; *i; i = &(*i)->next) {
   4039 		if (*i == tdb) {
   4040 			*i = tdb->next;
   4041 			break;
   4042 		}
   4043 	}
   4044 
   4045 	memset(tdb, 0, sizeof(*tdb));
   4046 	SAFE_FREE(tdb);
   4047 
   4048 	return ret;
   4049 }
   4050 
   4051 /* register a loging function */
   4052 void tdb_set_logging_function(struct tdb_context *tdb,
   4053                               const struct tdb_logging_context *log_ctx)
   4054 {
   4055         tdb->log = *log_ctx;
   4056 }
   4057 
   4058 void *tdb_get_logging_private(struct tdb_context *tdb)
   4059 {
   4060 	return tdb->log.log_private;
   4061 }
   4062 
   4063 /* reopen a tdb - this can be used after a fork to ensure that we have an independent
   4064    seek pointer from our parent and to re-establish locks */
   4065 int tdb_reopen(struct tdb_context *tdb)
   4066 {
   4067 	struct stat st;
   4068 
   4069 	if (tdb->flags & TDB_INTERNAL) {
   4070 		return 0; /* Nothing to do. */
   4071 	}
   4072 
   4073 	if (tdb->num_locks != 0 || tdb->global_lock.count) {
   4074 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_reopen: reopen not allowed with locks held\n"));
   4075 		goto fail;
   4076 	}
   4077 
   4078 	if (tdb->transaction != 0) {
   4079 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_reopen: reopen not allowed inside a transaction\n"));
   4080 		goto fail;
   4081 	}
   4082 
   4083 	if (tdb_munmap(tdb) != 0) {
   4084 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
   4085 		goto fail;
   4086 	}
   4087 	if (close(tdb->fd) != 0)
   4088 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
   4089 	tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
   4090 	if (tdb->fd == -1) {
   4091 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: open failed (%s)\n", strerror(errno)));
   4092 		goto fail;
   4093 	}
   4094 	if ((tdb->flags & TDB_CLEAR_IF_FIRST) &&
   4095 	    (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)) {
   4096 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: failed to obtain active lock\n"));
   4097 		goto fail;
   4098 	}
   4099 	if (fstat(tdb->fd, &st) != 0) {
   4100 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
   4101 		goto fail;
   4102 	}
   4103 	if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
   4104 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: file dev/inode has changed!\n"));
   4105 		goto fail;
   4106 	}
   4107 	tdb_mmap(tdb);
   4108 
   4109 	return 0;
   4110 
   4111 fail:
   4112 	tdb_close(tdb);
   4113 	return -1;
   4114 }
   4115 
   4116 /* reopen all tdb's */
   4117 int tdb_reopen_all(int parent_longlived)
   4118 {
   4119 	struct tdb_context *tdb;
   4120 
   4121 	for (tdb=tdbs; tdb; tdb = tdb->next) {
   4122 		/*
   4123 		 * If the parent is longlived (ie. a
   4124 		 * parent daemon architecture), we know
   4125 		 * it will keep it's active lock on a
   4126 		 * tdb opened with CLEAR_IF_FIRST. Thus
   4127 		 * for child processes we don't have to
   4128 		 * add an active lock. This is essential
   4129 		 * to improve performance on systems that
   4130 		 * keep POSIX locks as a non-scalable data
   4131 		 * structure in the kernel.
   4132 		 */
   4133 		if (parent_longlived) {
   4134 			/* Ensure no clear-if-first. */
   4135 			tdb->flags &= ~TDB_CLEAR_IF_FIRST;
   4136 		}
   4137 
   4138 		if (tdb_reopen(tdb) != 0)
   4139 			return -1;
   4140 	}
   4141 
   4142 	return 0;
   4143 }
   4144