Home | History | Annotate | Download | only in util
      1 /*
      2  * Copyright 2001-2004 Brandon Long
      3  * All Rights Reserved.
      4  *
      5  * ClearSilver Templating System
      6  *
      7  * This code is made available under the terms of the ClearSilver License.
      8  * http://www.clearsilver.net/license.hdf
      9  *
     10  */
     11 /*
     12  * The wdb is a wrapper around the sleepycat db library which adds
     13  * a relatively simple data/column definition.  In many respects, this
     14  * code is way more complicated than it ever needed to be, but it works,
     15  * so I'm loathe to "fix" it.
     16  *
     17  * One of they key features of this is the ability to update the
     18  * "schema" of the wdb without changing all of the existing rows of
     19  * data.
     20  */
     21 
     22 #include "cs_config.h"
     23 
     24 #include <unistd.h>
     25 #include <stdlib.h>
     26 #include <stdarg.h>
     27 #include <errno.h>
     28 #include <string.h>
     29 #include <limits.h>
     30 #include <db.h>
     31 #include <ctype.h>
     32 
     33 #include "neo_misc.h"
     34 #include "neo_err.h"
     35 #include "dict.h"
     36 #include "ulist.h"
     37 #include "skiplist.h"
     38 #include "wdb.h"
     39 
     40 
     41 #define DEFN_VERSION_1 "WDB-VERSION-200006301"
     42 #define PACK_VERSION_1 1
     43 
     44 static void string_rstrip (char *s)
     45 {
     46   size_t len;
     47 
     48   len = strlen(s);
     49   len--;
     50   while (len > 0 && isspace(s[len]))
     51   {
     52     s[len] = '\0';
     53     len--;
     54   }
     55   return;
     56 }
     57 
     58 static int to_hex (unsigned char ch, unsigned char *s)
     59 {
     60   unsigned int uvalue = ch;
     61 
     62   s[1] = "0123456789ABCDEF"[uvalue % 16];
     63   uvalue = (uvalue / 16);  s[0] = "0123456789ABCDEF"[uvalue % 16];
     64   return 0;
     65 }
     66 
     67 int Index_hex[128] = {
     68   -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1,
     69   -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1,
     70   -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1,
     71   0, 1, 2, 3,  4, 5, 6, 7,  8, 9,-1,-1, -1,-1,-1,-1,
     72   -1,10,11,12, 13,14,15,-1, -1,-1,-1,-1, -1,-1,-1,-1,
     73   -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1,  -1,10,11,12, 13,14,15,-1, -1,-1,-1,-1, -1,-1,-1,-1,
     74   -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1
     75 };
     76 
     77 #define hexval(c) Index_hex[(unsigned int)(c)]
     78 
     79 /* Encoding means any non-printable characters and : and % */
     80 static NEOERR *wdb_encode_str_alloc (const char *s, char **o)
     81 {
     82   int x = 0;
     83   int c = 0;
     84   char *out;
     85   unsigned char ch;
     86 
     87   while (s[x])
     88   {
     89     ch = (unsigned char) s[x];
     90     if ((ch < 32) || (ch > 127) || (ch == ':') || (ch == '%') || isspace(ch))
     91       c++;
     92     x++;
     93   }
     94 
     95   out = (char *) malloc (sizeof (char) * (x + c * 3) + 1);
     96   if (out == NULL)
     97     return nerr_raise (NERR_NOMEM,
     98 	"Unable to allocate memory for encoding %s", s);
     99 
    100   x = 0;
    101   c = 0;
    102 
    103   *o = out;
    104 
    105   while (s[x])
    106   {
    107     ch = (unsigned char) s[x];
    108     if ((ch < 32) || (ch > 127) || (ch == ':') || (ch == '%') || isspace(ch))
    109     {
    110       out[c++] = '%';
    111       to_hex (s[x], &out[c]);
    112       c+=2;
    113     }
    114     else
    115     {
    116       out[c++] = s[x];
    117     }
    118     x++;
    119   }
    120   out[c] = '\0';
    121 
    122   return STATUS_OK;
    123 }
    124 
    125 static NEOERR *wdb_decode_str_alloc (const char *s, char **o)
    126 {
    127   int x = 0;
    128   int c = 0;
    129   char *out;
    130   unsigned char ch;
    131 
    132 
    133   x = strlen(s);
    134   /* Overkill, the decoded string will be smaller */
    135   out = (char *) malloc (sizeof (char) * (x + 1));
    136   if (out == NULL)
    137     return nerr_raise (NERR_NOMEM,
    138 	"Unable to allocate memory for decoding %s", s);
    139 
    140   x = 0;
    141   c = 0;
    142 
    143   while (s[x])
    144   {
    145     if (s[x] == '%')
    146     {
    147       x++;
    148       ch = hexval(s[x]) << 4;
    149       x++;
    150       ch |= hexval(s[x]);
    151       out[c++] = ch;
    152     }
    153     else
    154     {
    155       out[c++] = s[x];
    156     }
    157     x++;
    158   }
    159   out[c] = '\0';
    160   *o = out;
    161 
    162   return STATUS_OK;
    163 }
    164 
    165 static void free_cb (void *value, void *rock)
    166 {
    167   free (value);
    168 }
    169 
    170 static void free_col_cb (void *value, void *rock)
    171 {
    172   WDBColumn *col;
    173 
    174   col = (WDBColumn *)value;
    175 
    176   free (col->name);
    177   free (col);
    178 }
    179 
    180 static NEOERR *wdb_alloc (WDB **wdb, int flags)
    181 {
    182   WDB *my_wdb;
    183   NEOERR *err = STATUS_OK;
    184 
    185   my_wdb = (WDB *) calloc (1, sizeof (WDB));
    186 
    187   if (my_wdb == NULL)
    188     return nerr_raise (NERR_NOMEM, "Unable to allocate memory for WDB");
    189 
    190   do
    191   {
    192     err = dictCreate (&(my_wdb->attrs), 0, 2, 5, 0, 0, free_cb, NULL);
    193     if (err != STATUS_OK) break;
    194     err = dictCreate (&(my_wdb->cols), 0, 2, 5, 0, 0, free_col_cb, NULL);
    195     if (err != STATUS_OK) break;
    196     err = uListInit (&(my_wdb->cols_l), 0, 0);
    197     if (err != STATUS_OK) break;
    198     err = skipNewList(&(my_wdb->ondisk), 0, 4, 2, 0, NULL, NULL);
    199     if (err != STATUS_OK) break;
    200 
    201     *wdb = my_wdb;
    202 
    203     return STATUS_OK;
    204   } while (0);
    205 
    206   wdb_destroy(&my_wdb);
    207   return nerr_pass (err);
    208 }
    209 
    210 
    211 #define STATE_REQUIRED 1
    212 #define STATE_ATTRIBUTES 2
    213 #define STATE_COLUMN_DEF 3
    214 
    215 static NEOERR *wdb_load_defn_v1 (WDB *wdb, FILE *fp)
    216 {
    217   char line[1024];
    218   int state = 1;
    219   char *k, *v;
    220   NEOERR *err = STATUS_OK;
    221   int colindex = 1;
    222   WDBColumn *col;
    223 
    224   while (fgets(line, sizeof(line), fp) != NULL)
    225   {
    226     string_rstrip(line);
    227     switch (state)
    228     {
    229       case STATE_REQUIRED:
    230 	if (!strcmp(line, "attributes"))
    231 	  state = STATE_ATTRIBUTES;
    232 	else if (!strcmp(line, "columns"))
    233 	  state = STATE_COLUMN_DEF;
    234 	else
    235 	{
    236 	  k = line;
    237 	  v = strchr(line, ':');
    238 	  /* HACK */
    239 	  if (!strcmp(k, "name") && ((v == NULL) || (v[1] == '\0')))
    240 	  {
    241 	    v = "dNone";
    242 	  }
    243 	  else
    244 	  {
    245 	    if (v == NULL)
    246 	      return nerr_raise (NERR_PARSE, "Error parsing %s", line);
    247 	    if (v[1] == '\0')
    248 	      return nerr_raise (NERR_PARSE, "Error parsing %s", line);
    249 	  }
    250 	  v[0] = '\0';
    251 	  v++;
    252 	  if (!strcmp(k, "key"))
    253 	  {
    254 	    err = wdb_decode_str_alloc (v, &(wdb->key));
    255 	    if (err) return nerr_pass(err);
    256 	  }
    257 	  else if (!strcmp(k, "name"))
    258 	  {
    259 	    err = wdb_decode_str_alloc (v, &(wdb->name));
    260 	    if (err) return nerr_pass(err);
    261 	  }
    262 	  else if (!strcmp(k, "ondisk"))
    263 	  {
    264 	    wdb->last_ondisk = atoi (v);
    265 	  }
    266 	}
    267 	break;
    268       case STATE_ATTRIBUTES:
    269 	if (!strcmp(line, "columns"))
    270 	  state = STATE_COLUMN_DEF;
    271 	else
    272 	{
    273 	  k = line;
    274 	  v = strchr(line, ':');
    275 	  if (v == NULL)
    276 	    return nerr_raise (NERR_PARSE, "Error parsing %s", line);
    277 	  v[0] = '\0';
    278 	  v++;
    279 	  err = wdb_decode_str_alloc (k, &k);
    280 	  if (err) return nerr_pass(err);
    281 	  err = wdb_decode_str_alloc (v, &v);
    282 	  if (err) return nerr_pass(err);
    283 	  err = dictSetValue(wdb->attrs, k, v);
    284 	  free(k);
    285 	  if (err)
    286 	    return nerr_pass_ctx(err, "Error parsing %s", line);
    287 	}
    288 	break;
    289       case STATE_COLUMN_DEF:
    290 	k = line;
    291 	v = strchr(line, ':');
    292 	if (v == NULL)
    293 	  return nerr_raise (NERR_PARSE, "Error parsing %s", line);
    294 	if (v[1] == '\0')
    295 	  return nerr_raise (NERR_PARSE, "Error parsing %s", line);
    296 	v[0] = '\0';
    297 	v++;
    298 	err = wdb_decode_str_alloc (k, &k);
    299 	if (err) return nerr_pass(err);
    300 	col = (WDBColumn *) calloc (1, sizeof (WDBColumn));
    301 	col->name = k;
    302 	col->inmem_index = colindex++;
    303 	col->type = *v;
    304 	v+=2;
    305 	col->ondisk_index = atoi(v);
    306 	err = dictSetValue(wdb->cols, k, col);
    307 	if (err)
    308 	  return nerr_raise (NERR_PARSE, "Error parsing %s", line);
    309 	err = uListAppend(wdb->cols_l, col);
    310 	if (err) return nerr_pass(err);
    311 	/* stupid skiplist will assert */
    312 	if (col->ondisk_index == 0)
    313 	{
    314 	  return nerr_raise (NERR_ASSERT, "Invalid ondisk mapping for %s", k);
    315 	}
    316 	err = skipInsert (wdb->ondisk, col->ondisk_index,
    317 	    (void *)(col->inmem_index), 0);
    318 	if (err)
    319 	  return nerr_pass_ctx(err, "Unable to update ondisk mapping for %s", k);
    320 	break;
    321       default:
    322 	return nerr_raise (NERR_ASSERT, "Invalid state %d", state);
    323     }
    324   }
    325   return STATUS_OK;
    326 }
    327 
    328 static NEOERR *wdb_save_defn_v1 (WDB *wdb, FILE *fp)
    329 {
    330   NEOERR *err = STATUS_OK;
    331   WDBColumn *col;
    332   char *s = NULL;
    333   char *key = NULL;
    334   int r, x, len;
    335   char *k = NULL;
    336   char *v = NULL;
    337 
    338   /* Write version string */
    339   r = fprintf (fp, "%s\n", DEFN_VERSION_1);
    340   if (!r) goto save_err;
    341 
    342   err = wdb_encode_str_alloc (wdb->name, &s);
    343   if (err) goto save_err;
    344   r = fprintf (fp, "name:%s\n", s);
    345   if (!r) goto save_err;
    346   free (s);
    347 
    348   err = wdb_encode_str_alloc (wdb->key, &s);
    349   if (err != STATUS_OK) goto save_err;
    350   r = fprintf (fp, "key:%s\n", s);
    351   if (!r) goto save_err;
    352   free (s);
    353   s = NULL;
    354 
    355   r = fprintf (fp, "ondisk:%d\n", wdb->last_ondisk);
    356   if (!r) goto save_err;
    357 
    358   r = fprintf (fp, "attributes\n");
    359   if (!r) goto save_err;
    360   key = NULL;
    361   s = (char *) dictNext (wdb->attrs, &key, NULL);
    362   while (s)
    363   {
    364     err = wdb_encode_str_alloc (key, &k);
    365     if (err != STATUS_OK) goto save_err;
    366     err = wdb_encode_str_alloc (s, &v);
    367     if (err != STATUS_OK) goto save_err;
    368     r = fprintf (fp, "%s:%s\n", k, v);
    369     if (!r) goto save_err;
    370     free (k);
    371     free (v);
    372     k = NULL;
    373     v = NULL;
    374     s = (char *) dictNext (wdb->attrs, &key, NULL);
    375   }
    376   s = NULL;
    377 
    378   r = fprintf (fp, "columns\n");
    379   if (!r) goto save_err;
    380 
    381   len = uListLength(wdb->cols_l);
    382 
    383   for (x = 0; x < len; x++)
    384   {
    385     err = uListGet (wdb->cols_l, x, (void *)&col);
    386     if (err) goto save_err;
    387     err = wdb_encode_str_alloc (col->name, &s);
    388     if (err != STATUS_OK) goto save_err;
    389     r = fprintf (fp, "%s:%c:%d\n", s, col->type, col->ondisk_index);
    390     if (!r) goto save_err;
    391     free(s);
    392     s = NULL;
    393   }
    394 
    395   return STATUS_OK;
    396 
    397 save_err:
    398   if (s != NULL) free (s);
    399   if (k != NULL) free (k);
    400   if (v != NULL) free (v);
    401   if (err == STATUS_OK) return nerr_pass(err);
    402   return nerr_raise (r, "Unable to save defn");
    403 }
    404 
    405 static NEOERR *wdb_load_defn (WDB *wdb, const char *name)
    406 {
    407   char path[_POSIX_PATH_MAX];
    408   char line[1024];
    409   FILE *fp;
    410   NEOERR *err = STATUS_OK;
    411 
    412   snprintf (path, sizeof(path), "%s.wdf", name);
    413   fp = fopen (path, "r");
    414   if (fp == NULL)
    415   {
    416     if (errno == ENOENT)
    417       return nerr_raise (NERR_NOT_FOUND, "Unable to open defn %s", name);
    418     return nerr_raise_errno (NERR_IO, "Unable to open defn %s", name);
    419   }
    420 
    421   /* Read Version string */
    422   if (fgets (line, sizeof(line), fp) == NULL)
    423   {
    424     fclose(fp);
    425     return nerr_raise_errno (NERR_IO, "Unable to read defn %s", name);
    426   }
    427   string_rstrip(line);
    428 
    429   if (!strcmp(line, DEFN_VERSION_1))
    430   {
    431     err = wdb_load_defn_v1(wdb, fp);
    432     fclose(fp);
    433     if (err) return nerr_pass(err);
    434   }
    435   else
    436   {
    437     fclose(fp);
    438     return nerr_raise (NERR_ASSERT, "Unknown defn version %s: %s", line, name);
    439   }
    440 
    441   wdb->table_version = rand();
    442 
    443   return STATUS_OK;
    444 }
    445 
    446 static NEOERR *wdb_save_defn (WDB *wdb, const char *name)
    447 {
    448   char path[_POSIX_PATH_MAX];
    449   char path2[_POSIX_PATH_MAX];
    450   FILE *fp;
    451   NEOERR *err = STATUS_OK;
    452   int r;
    453 
    454   snprintf (path, sizeof(path), "%s.wdf.new", name);
    455   snprintf (path2, sizeof(path2), "%s.wdf", name);
    456   fp = fopen (path, "w");
    457   if (fp == NULL)
    458     return nerr_raise_errno (NERR_IO, "Unable to open defn %s", name);
    459 
    460   err = wdb_save_defn_v1 (wdb, fp);
    461   fclose (fp);
    462   if (err != STATUS_OK)
    463   {
    464     unlink (path);
    465     return nerr_pass (err);
    466   }
    467 
    468   r = unlink (path2);
    469   if (r == -1 && errno != ENOENT)
    470     return nerr_raise_errno (NERR_IO, "Unable to unlink %s", path2);
    471   r = link (path, path2);
    472   if (r == -1)
    473     return nerr_raise_errno (NERR_IO, "Unable to link %s to %s", path, path2);
    474   r = unlink (path);
    475 
    476   wdb->defn_dirty = 0;
    477   wdb->table_version = rand();
    478 
    479   return STATUS_OK;
    480 }
    481 
    482 NEOERR *wdb_open (WDB **wdb, const char *name, int flags)
    483 {
    484   WDB *my_wdb;
    485   char path[_POSIX_PATH_MAX];
    486   NEOERR *err = STATUS_OK;
    487   int r;
    488 
    489   *wdb = NULL;
    490 
    491   err = wdb_alloc (&my_wdb, flags);
    492   if (err) return nerr_pass(err);
    493 
    494   my_wdb->path = strdup (name);
    495   if (err)
    496   {
    497     wdb_destroy (&my_wdb);
    498     return nerr_pass(err);
    499   }
    500 
    501   err = wdb_load_defn (my_wdb, name);
    502   if (err)
    503   {
    504     wdb_destroy (&my_wdb);
    505     return nerr_pass(err);
    506   }
    507 
    508   snprintf (path, sizeof(path), "%s.wdb", name);
    509   r = db_open(path, DB_BTREE, 0, 0, NULL, NULL, &(my_wdb->db));
    510   if (r)
    511   {
    512     wdb_destroy (&my_wdb);
    513     return nerr_raise (NERR_DB, "Unable to open database %s: %d", name, r);
    514   }
    515 
    516   *wdb = my_wdb;
    517 
    518   return STATUS_OK;
    519 }
    520 
    521 NEOERR *wdb_save (WDB *wdb)
    522 {
    523   if (wdb->defn_dirty)
    524   {
    525     wdb_save_defn (wdb, wdb->path);
    526   }
    527   return STATUS_OK;
    528 }
    529 
    530 void wdb_destroy (WDB **wdb)
    531 {
    532   WDB *my_wdb;
    533 
    534   my_wdb = *wdb;
    535 
    536   if (my_wdb == NULL) return;
    537 
    538   if (my_wdb->defn_dirty)
    539   {
    540     wdb_save_defn (my_wdb, my_wdb->path);
    541   }
    542 
    543   if (my_wdb->attrs != NULL)
    544   {
    545     dictDestroy (my_wdb->attrs);
    546   }
    547 
    548   if (my_wdb->cols != NULL)
    549   {
    550     dictDestroy (my_wdb->cols);
    551   }
    552 
    553   if (my_wdb->cols_l != NULL)
    554   {
    555     uListDestroy(&(my_wdb->cols_l), 0);
    556   }
    557 
    558   if (my_wdb->ondisk != NULL)
    559   {
    560     skipFreeList(my_wdb->ondisk);
    561   }
    562 
    563   if (my_wdb->db != NULL)
    564   {
    565     my_wdb->db->close (my_wdb->db, 0);
    566     my_wdb->db = NULL;
    567   }
    568 
    569   if (my_wdb->path != NULL)
    570   {
    571     free(my_wdb->path);
    572     my_wdb->path = NULL;
    573   }
    574   if (my_wdb->name != NULL)
    575   {
    576     free(my_wdb->name);
    577     my_wdb->name = NULL;
    578   }
    579   if (my_wdb->key != NULL)
    580   {
    581     free(my_wdb->key);
    582     my_wdb->key = NULL;
    583   }
    584 
    585   free (my_wdb);
    586   *wdb = NULL;
    587 
    588   return;
    589 }
    590 
    591 #define PACK_UB4(pdata, plen, pmax, pn) \
    592 { \
    593   if (plen + 4 > pmax) \
    594   { \
    595     pmax *= 2; \
    596     pdata = realloc ((void *)pdata, pmax); \
    597     if (pdata == NULL) goto pack_err; \
    598   } \
    599   pdata[plen++] = (0x0ff & (pn >> 0)); \
    600   pdata[plen++] = (0x0ff & (pn >> 8)); \
    601   pdata[plen++] = (0x0ff & (pn >> 16)); \
    602   pdata[plen++] = (0x0ff & (pn >> 24)); \
    603 }
    604 
    605 #define UNPACK_UB4(pdata, plen, pn, pd) \
    606 { \
    607   if (pn + 4 > plen) \
    608     goto pack_err; \
    609   pd = ((0x0ff & pdata[pn+0])<<0) | ((0x0ff & pdata[pn+1])<<8) | \
    610        ((0x0ff & pdata[pn+2])<<16) | ((0x0ff & pdata[pn+3])<<24); \
    611   pn+=4; \
    612 }
    613 
    614 #define PACK_BYTE(pdata, plen, pmax, pn) \
    615 { \
    616   if (plen + 1 > pmax) \
    617   { \
    618     pmax *= 2; \
    619     pdata = realloc ((void *)pdata, pmax); \
    620     if (pdata == NULL) goto pack_err; \
    621   } \
    622   pdata[plen++] = (0x0ff & (pn >> 0)); \
    623 }
    624 
    625 #define UNPACK_BYTE(pdata, plen, pn, pd) \
    626 { \
    627   if (pn + 1 > plen) \
    628     goto pack_err; \
    629   pd = pdata[pn++]; \
    630 }
    631 
    632 #define PACK_STRING(pdata, plen, pmax, dl, ds) \
    633 { \
    634   if (plen + 4 + dl > pmax) \
    635   { \
    636     while (plen + 4 + dl > pmax) \
    637       pmax *= 2; \
    638     pdata = realloc ((void *)pdata, pmax); \
    639     if (pdata == NULL) goto pack_err; \
    640   } \
    641   pdata[plen++] = (0x0ff & (dl >> 0)); \
    642   pdata[plen++] = (0x0ff & (dl >> 8)); \
    643   pdata[plen++] = (0x0ff & (dl >> 16)); \
    644   pdata[plen++] = (0x0ff & (dl >> 24)); \
    645   memcpy (&pdata[plen], ds, dl); \
    646   plen+=dl;\
    647 }
    648 
    649 #define UNPACK_STRING(pdata, plen, pn, ps) \
    650 { \
    651   int pl; \
    652   if (pn + 4 > plen) \
    653     goto pack_err; \
    654   pl = ((0x0ff & pdata[pn+0])<<0) | ((0x0ff & pdata[pn+1])<<8) | \
    655        ((0x0ff & pdata[pn+2])<<16) | ((0x0ff & pdata[pn+3])<<24); \
    656   pn+=4; \
    657   if (pl) \
    658   { \
    659     ps = (char *)malloc(sizeof(char)*(pl+1)); \
    660     if (ps == NULL) \
    661       goto pack_err; \
    662     memcpy (ps, &pdata[pn], pl); \
    663     ps[pl] = '\0'; \
    664     pn += pl; \
    665   } else { \
    666     ps = NULL; \
    667   } \
    668 }
    669 
    670 /* A VERSION_1 Row consists of the following data:
    671  * UB4 VERSION
    672  * UB4 DATA COUNT
    673  * DATA where
    674  *   UB4 ONDISK INDEX
    675  *   UB1 TYPE
    676  *   if INT, then UB4
    677  *   if STR, then UB4 length and length UB1s
    678  */
    679 
    680 static NEOERR *pack_row (WDB *wdb, WDBRow *row, void **rdata, int *rdlen)
    681 {
    682   char *data;
    683   int x, len, dlen, dmax;
    684   char *s;
    685   int n;
    686   WDBColumn *col;
    687   NEOERR *err;
    688 
    689   *rdata = NULL;
    690   *rdlen = 0;
    691   /* allocate */
    692   data = (char *)malloc(sizeof (char) * 1024);
    693   if (data == NULL)
    694     return nerr_raise (NERR_NOMEM, "Unable to allocate memory to pack row");
    695 
    696   dmax = 1024;
    697   dlen = 0;
    698 
    699   PACK_UB4 (data, dlen, dmax, PACK_VERSION_1);
    700 /*  PACK_UB4 (data, dlen, dmax, time(NULL)); */
    701 
    702   len = uListLength(wdb->cols_l);
    703   if (len > row->data_count)
    704     len = row->data_count;
    705   PACK_UB4 (data, dlen, dmax, len);
    706 
    707   for (x = 0; x < len; x++)
    708   {
    709     err = uListGet (wdb->cols_l, x, (void *)&col);
    710     if (err) goto pack_err;
    711     PACK_UB4 (data, dlen, dmax, col->ondisk_index);
    712     PACK_BYTE (data, dlen, dmax, col->type);
    713     switch (col->type)
    714     {
    715       case WDB_TYPE_INT:
    716 	n = (int)(row->data[x]);
    717 	PACK_UB4 (data, dlen, dmax, n);
    718 	break;
    719       case WDB_TYPE_STR:
    720 	s = (char *)(row->data[x]);
    721 	if (s == NULL)
    722 	{
    723 	  s = "";
    724 	}
    725 	n = strlen(s);
    726 	PACK_STRING (data, dlen, dmax, n, s);
    727 	break;
    728       default:
    729 	free (data);
    730 	return nerr_raise (NERR_ASSERT, "Unknown type %d", col->type);
    731     }
    732   }
    733 
    734   *rdata = data;
    735   *rdlen = dlen;
    736   return STATUS_OK;
    737 
    738 pack_err:
    739   if (data != NULL)
    740     free (data);
    741   if (err == STATUS_OK)
    742     return nerr_raise(NERR_NOMEM, "Unable to allocate memory for pack_row");
    743   return nerr_pass(err);
    744 }
    745 
    746 static NEOERR *unpack_row (WDB *wdb, void *rdata, int dlen, WDBRow *row)
    747 {
    748   unsigned char *data = rdata;
    749   int version, n;
    750   int count, x, ondisk_index, type, d_int, inmem_index;
    751   char *s;
    752 
    753   n = 0;
    754 
    755   UNPACK_UB4(data, dlen, n, version);
    756 
    757   switch (version)
    758   {
    759     case PACK_VERSION_1:
    760       UNPACK_UB4(data, dlen, n, count);
    761       for (x = 0; x<count; x++)
    762       {
    763 	UNPACK_UB4 (data, dlen, n, ondisk_index);
    764 	UNPACK_BYTE (data, dlen, n, type);
    765 	inmem_index = (int) skipSearch (wdb->ondisk, ondisk_index, NULL);
    766 
    767 	switch (type)
    768 	{
    769 	  case WDB_TYPE_INT:
    770 	    UNPACK_UB4 (data, dlen, n, d_int);
    771 	    if (inmem_index != 0)
    772 	      row->data[inmem_index-1] = (void *) d_int;
    773 	    break;
    774 	  case WDB_TYPE_STR:
    775 	    UNPACK_STRING (data, dlen, n, s);
    776 	    if (inmem_index != 0)
    777 	      row->data[inmem_index-1] = s;
    778 	    break;
    779 	  default:
    780 	    return nerr_raise (NERR_ASSERT, "Unknown type %d for col %d", type, ondisk_index);
    781 	}
    782       }
    783       break;
    784     default:
    785       return nerr_raise (NERR_ASSERT, "Unknown version %d", version);
    786   }
    787 
    788   return STATUS_OK;
    789 pack_err:
    790   return nerr_raise(NERR_PARSE, "Unable to unpack row %s", row->key_value);
    791 }
    792 
    793 NEOERR *wdb_column_insert (WDB *wdb, int loc, const char *key, char type)
    794 {
    795   NEOERR *err;
    796   WDBColumn *col, *ocol;
    797   int x, len;
    798 
    799   col = (WDBColumn *) dictSearch (wdb->cols, key, NULL);
    800 
    801   if (col != NULL)
    802     return nerr_raise (NERR_DUPLICATE,
    803 	"Duplicate key %s:%d", key, col->inmem_index);
    804 
    805   col = (WDBColumn *) calloc (1, sizeof (WDBColumn));
    806   if (col == NULL)
    807   {
    808     return nerr_raise (NERR_NOMEM,
    809 	"Unable to allocate memory for creation of col %s:%d", key, loc);
    810   }
    811 
    812   col->name = strdup(key);
    813   if (col->name == NULL)
    814   {
    815     free(col);
    816     return nerr_raise (NERR_NOMEM,
    817 	"Unable to allocate memory for creation of col %s:%d", key, loc);
    818   }
    819   col->type = type;
    820   col->ondisk_index = wdb->last_ondisk++;
    821   /* -1 == append */
    822   if (loc == -1)
    823   {
    824     err = dictSetValue(wdb->cols, key, col);
    825     if (err)
    826     {
    827       free (col->name);
    828       free (col);
    829       return nerr_pass_ctx (err,
    830 	  "Unable to insert for creation of col %s:%d", key, loc);
    831     }
    832     err = uListAppend (wdb->cols_l, (void *)col);
    833     if (err) return nerr_pass(err);
    834     x = uListLength (wdb->cols_l);
    835     col->inmem_index = x;
    836     err = skipInsert (wdb->ondisk, col->ondisk_index,
    837 	            (void *)(col->inmem_index), 0);
    838     if (err)
    839       return nerr_pass_ctx (err, "Unable to update ondisk mapping for %s", key);
    840   }
    841   else
    842   {
    843     /* We are inserting this in middle, so the skipList ondisk is now
    844      * invalid, as is the inmem_index for all cols */
    845     err = dictSetValue(wdb->cols, key, col);
    846     if (err)
    847     {
    848       free (col->name);
    849       free (col);
    850       return nerr_pass_ctx (err,
    851 	  "Unable to insert for creation of col %s:%d", key, loc);
    852     }
    853     err = uListInsert (wdb->cols_l, loc, (void *)col);
    854     if (err) return nerr_pass(err);
    855     len = uListLength (wdb->cols_l);
    856     /* Fix up inmem_index and ondisk skipList */
    857     for (x = 0; x < len; x++)
    858     {
    859       err = uListGet (wdb->cols_l, x, (void *)&ocol);
    860       if (err) return nerr_pass(err);
    861       ocol->inmem_index = x + 1;
    862       err = skipInsert (wdb->ondisk, ocol->ondisk_index,
    863 	              (void *)(ocol->inmem_index), TRUE);
    864       if (err)
    865 	return nerr_pass_ctx (err, "Unable to update ondisk mapping for %s", key);
    866     }
    867   }
    868 
    869   wdb->defn_dirty = 1;
    870   wdb->table_version = rand();
    871 
    872   return STATUS_OK;
    873 }
    874 
    875 NEOERR *wdb_column_update (WDB *wdb, const char *oldkey, const char *newkey)
    876 {
    877   WDBColumn *ocol, *col;
    878   WDBColumn *vcol;
    879   NEOERR *err = STATUS_OK;
    880   int x, len, r;
    881 
    882   ocol = (WDBColumn *) dictSearch (wdb->cols, oldkey, NULL);
    883 
    884   if (ocol == NULL)
    885     return nerr_raise (NERR_NOT_FOUND,
    886 	"Unable to find column for key %s", oldkey);
    887 
    888   col = (WDBColumn *) calloc (1, sizeof (WDBColumn));
    889   if (col == NULL)
    890   {
    891     return nerr_raise (NERR_NOMEM,
    892 	"Unable to allocate memory for column update %s", newkey);
    893   }
    894 
    895   *col = *ocol;
    896   col->name = strdup(newkey);
    897   if (col->name == NULL)
    898   {
    899     free(col);
    900     return nerr_raise (NERR_NOMEM,
    901 	"Unable to allocate memory for column update %s", oldkey);
    902   }
    903   len = uListLength(wdb->cols_l);
    904   for (x = 0; x < len; x++)
    905   {
    906     err = uListGet (wdb->cols_l, x, (void *)&vcol);
    907     if (err) return nerr_pass(err);
    908     if (!strcmp(vcol->name, oldkey))
    909     {
    910       err = uListSet (wdb->cols_l, x, (void *)col);
    911       if (err) return nerr_pass(err);
    912       break;
    913     }
    914   }
    915   if (x>len)
    916   {
    917     return nerr_raise (NERR_ASSERT, "Unable to find cols_l for key %s", oldkey);
    918   }
    919 
    920   r = dictRemove (wdb->cols, oldkey); /* Only failure is key not found */
    921   err = dictSetValue(wdb->cols, newkey, col);
    922   if (err)
    923   {
    924     free (col->name);
    925     free (col);
    926     return nerr_pass_ctx (err,
    927 	"Unable to insert for update of col %s->%s", oldkey, newkey);
    928   }
    929 
    930   wdb->defn_dirty = 1;
    931   wdb->table_version = rand();
    932 
    933   return STATUS_OK;
    934 }
    935 
    936 NEOERR *wdb_column_delete (WDB *wdb, const char *name)
    937 {
    938   WDBColumn *col;
    939   NEOERR *err = STATUS_OK;
    940   int len, x, r;
    941 
    942   len = uListLength(wdb->cols_l);
    943   for (x = 0; x < len; x++)
    944   {
    945     err = uListGet (wdb->cols_l, x, (void *)&col);
    946     if (err) return nerr_pass(err);
    947     if (!strcmp(col->name, name))
    948     {
    949       err = uListDelete (wdb->cols_l, x, NULL);
    950       if (err) return nerr_pass(err);
    951       break;
    952     }
    953   }
    954 
    955   r = dictRemove (wdb->cols, name); /* Only failure is key not found */
    956   if (!r)
    957   {
    958     return nerr_raise (NERR_NOT_FOUND,
    959 	"Unable to find column for key %s", name);
    960   }
    961   wdb->defn_dirty = 1;
    962   wdb->table_version = rand();
    963 
    964   return STATUS_OK;
    965 }
    966 
    967 NEOERR *wdb_column_exchange (WDB *wdb, const char *key1, const char *key2)
    968 {
    969   return nerr_raise (NERR_ASSERT,
    970       "wdb_column_exchange: Not Implemented");
    971 }
    972 
    973 /* Not that there's that much point in changing the key name ... */
    974 NEOERR *wdb_update (WDB *wdb, const char *name, const char *key)
    975 {
    976   if (name != NULL && strcmp(wdb->name, name))
    977   {
    978     if (wdb->name != NULL)
    979       free(wdb->name);
    980     wdb->name = strdup(name);
    981     if (wdb->name == NULL)
    982       return nerr_raise (NERR_NOMEM,
    983 	  "Unable to allocate memory to update name to %s", name);
    984     wdb->defn_dirty = 1;
    985     wdb->table_version = rand();
    986   }
    987   if (key != NULL && strcmp(wdb->key, key))
    988   {
    989     if (wdb->key != NULL)
    990       free(wdb->key);
    991     wdb->key = strdup(key);
    992     if (wdb->key == NULL)
    993     {
    994       wdb->defn_dirty = 0;
    995       return nerr_raise (NERR_NOMEM,
    996 	  "Unable to allocate memory to update key to %s", key);
    997     }
    998     wdb->defn_dirty = 1;
    999     wdb->table_version = rand();
   1000   }
   1001   return STATUS_OK;
   1002 }
   1003 
   1004 NEOERR *wdb_create (WDB **wdb, const char *path, const char *name,
   1005                     const char *key, ULIST *col_def, int flags)
   1006 {
   1007   WDB *my_wdb;
   1008   char d_path[_POSIX_PATH_MAX];
   1009   NEOERR *err = STATUS_OK;
   1010   int x, len, r;
   1011   char *s;
   1012 
   1013   *wdb = NULL;
   1014 
   1015   err = wdb_alloc (&my_wdb, flags);
   1016   if (err) return nerr_pass(err);
   1017 
   1018   my_wdb->name = strdup (name);
   1019   my_wdb->key = strdup (key);
   1020   my_wdb->path = strdup(path);
   1021   if (my_wdb->name == NULL || my_wdb->key == NULL || my_wdb->path == NULL)
   1022   {
   1023     wdb_destroy (&my_wdb);
   1024     return nerr_raise (NERR_NOMEM,
   1025 	"Unable to allocate memory for creation of %s", name);
   1026   }
   1027 
   1028   /* ondisk must start at one because of skipList */
   1029   my_wdb->last_ondisk = 1;
   1030   len = uListLength(col_def);
   1031   for (x = 0; x < len; x++)
   1032   {
   1033     err = uListGet (col_def, x, (void *)&s);
   1034     if (err)
   1035     {
   1036       wdb_destroy (&my_wdb);
   1037       return nerr_pass(err);
   1038     }
   1039     err = wdb_column_insert (my_wdb, -1, s, WDB_TYPE_STR);
   1040     my_wdb->defn_dirty = 0; /* So we don't save on error destroy */
   1041     if (err)
   1042     {
   1043       wdb_destroy (&my_wdb);
   1044       return nerr_pass(err);
   1045     }
   1046   }
   1047 
   1048   err = wdb_save_defn (my_wdb, path);
   1049   if (err)
   1050   {
   1051     wdb_destroy (&my_wdb);
   1052     return nerr_pass(err);
   1053   }
   1054 
   1055   snprintf (d_path, sizeof(d_path), "%s.wdb", path);
   1056   r = db_open(d_path, DB_BTREE, DB_CREATE | DB_TRUNCATE, 0, NULL, NULL, &(my_wdb->db));
   1057   if (r)
   1058   {
   1059     wdb_destroy (&my_wdb);
   1060     return nerr_raise (NERR_DB, "Unable to create db file %s: %d", d_path, r);
   1061   }
   1062 
   1063   *wdb = my_wdb;
   1064 
   1065   return STATUS_OK;
   1066 }
   1067 
   1068 NEOERR *wdb_attr_next (WDB *wdb, char **key, char **value)
   1069 {
   1070   *value = (char *) dictNext (wdb->attrs, key, NULL);
   1071 
   1072   return STATUS_OK;
   1073 }
   1074 
   1075 NEOERR *wdb_attr_get (WDB *wdb, const char *key, char **value)
   1076 {
   1077   void *v;
   1078 
   1079   v = dictSearch (wdb->attrs, key, NULL);
   1080 
   1081   if (v == NULL)
   1082     return nerr_raise (NERR_NOT_FOUND, "Unable to find attr %s", key);
   1083 
   1084   *value = (char *)v;
   1085 
   1086   return STATUS_OK;
   1087 }
   1088 
   1089 NEOERR *wdb_attr_set (WDB *wdb, const char *key, const char *value)
   1090 {
   1091   NEOERR *err = STATUS_OK;
   1092   char *v;
   1093 
   1094   v = strdup(value);
   1095   if (v == NULL)
   1096     return nerr_raise (NERR_NOMEM, "No memory for new attr");
   1097 
   1098   err = dictSetValue(wdb->attrs, key, v);
   1099   if (err)
   1100     return nerr_pass_ctx (err, "Unable to set attr %s", key);
   1101 
   1102   wdb->defn_dirty = 1;
   1103 
   1104   return STATUS_OK;
   1105 }
   1106 
   1107 NEOERR *wdbr_get (WDB *wdb, WDBRow *row, const char *key, void **value)
   1108 {
   1109   WDBColumn *col;
   1110   void *v;
   1111 
   1112   col = (WDBColumn *) dictSearch (wdb->cols, key, NULL);
   1113 
   1114   if (col == NULL)
   1115     return nerr_raise (NERR_NOT_FOUND, "Unable to find key %s", key);
   1116 
   1117   if (col->inmem_index-1 > row->data_count)
   1118     return nerr_raise (NERR_ASSERT, "Index for key %s is greater than row data, was table altered?", key);
   1119 
   1120   v = row->data[col->inmem_index-1];
   1121 
   1122   *value = v;
   1123 
   1124   return STATUS_OK;
   1125 }
   1126 
   1127 NEOERR *wdbr_set (WDB *wdb, WDBRow *row, const char *key, void *value)
   1128 {
   1129   WDBColumn *col;
   1130 
   1131   col = (WDBColumn *) dictSearch (wdb->cols, key, NULL);
   1132 
   1133   if (col == NULL)
   1134     return nerr_raise (NERR_NOT_FOUND, "Unable to find key %s", key);
   1135 
   1136   if (col->inmem_index-1 > row->data_count)
   1137     return nerr_raise (NERR_ASSERT, "Index for key %s is greater than row data, was table altered?", key);
   1138 
   1139   if (col->type == WDB_TYPE_STR && row->data[col->inmem_index-1] != NULL)
   1140   {
   1141     free (row->data[col->inmem_index-1]);
   1142   }
   1143   row->data[col->inmem_index-1] = value;
   1144 
   1145   return STATUS_OK;
   1146 }
   1147 
   1148 static NEOERR *alloc_row (WDB *wdb, WDBRow **row)
   1149 {
   1150   WDBRow *my_row;
   1151   int len;
   1152 
   1153   *row = NULL;
   1154 
   1155   len = uListLength (wdb->cols_l);
   1156 
   1157   my_row = (WDBRow *) calloc (1, sizeof (WDBRow) + len * (sizeof (void *)));
   1158   if (my_row == NULL)
   1159     return nerr_raise (NERR_NOMEM, "No memory for new row");
   1160 
   1161   my_row->data_count = len;
   1162   my_row->table_version = wdb->table_version;
   1163 
   1164   *row = my_row;
   1165 
   1166   return STATUS_OK;
   1167 }
   1168 
   1169 NEOERR *wdbr_destroy (WDB *wdb, WDBRow **row)
   1170 {
   1171   WDBColumn *col;
   1172   WDBRow *my_row;
   1173   int len, x;
   1174   NEOERR *err;
   1175 
   1176   err = STATUS_OK;
   1177   if (*row == NULL)
   1178     return err;
   1179 
   1180   my_row = *row;
   1181 
   1182   /* Verify this row maps to this table, or else we could do something
   1183    * bad */
   1184 
   1185   if (wdb->table_version != my_row->table_version)
   1186     return nerr_raise (NERR_ASSERT, "Row %s doesn't match current table", my_row->key_value);
   1187 
   1188   if (my_row->key_value != NULL)
   1189     free (my_row->key_value);
   1190 
   1191   len = uListLength(wdb->cols_l);
   1192 
   1193   for (x = 0; x < len; x++)
   1194   {
   1195     if (my_row->data[x] != NULL)
   1196     {
   1197       err = uListGet (wdb->cols_l, x, (void *)&col);
   1198       if (err) break;
   1199       switch (col->type)
   1200       {
   1201 	case WDB_TYPE_INT:
   1202 	  break;
   1203 	case WDB_TYPE_STR:
   1204 	  free (my_row->data[x]);
   1205 	  break;
   1206 	default:
   1207 	  return nerr_raise (NERR_ASSERT, "Unknown type %d", col->type);
   1208       }
   1209     }
   1210   }
   1211 
   1212   free (my_row);
   1213   *row = NULL;
   1214 
   1215   return nerr_pass(err);
   1216 }
   1217 
   1218 NEOERR *wdbr_lookup (WDB *wdb, const char *key, WDBRow **row)
   1219 {
   1220   DBT dkey, data;
   1221   NEOERR *err = STATUS_OK;
   1222   WDBRow *my_row;
   1223   int r;
   1224 
   1225   *row = NULL;
   1226 
   1227   memset(&dkey, 0, sizeof(dkey));
   1228   memset(&data, 0, sizeof(data));
   1229 
   1230   dkey.flags = DB_DBT_USERMEM;
   1231   data.flags = DB_DBT_MALLOC;
   1232 
   1233   dkey.data = (void *)key;
   1234   dkey.size = strlen(key);
   1235 
   1236   r = wdb->db->get (wdb->db, NULL, &dkey, &data, 0);
   1237   if (r == DB_NOTFOUND)
   1238     return nerr_raise (NERR_NOT_FOUND, "Unable to find key %s", key);
   1239   else if (r)
   1240     return nerr_raise (NERR_DB, "Error retrieving key %s: %d", key, r);
   1241 
   1242   /* allocate row */
   1243   err = alloc_row (wdb, &my_row);
   1244   if (err != STATUS_OK)
   1245   {
   1246     free (data.data);
   1247     return nerr_pass(err);
   1248   }
   1249 
   1250   my_row->key_value = strdup(key);
   1251   if (my_row->key_value == NULL)
   1252   {
   1253     free (data.data);
   1254     free (my_row);
   1255     return nerr_raise (NERR_NOMEM, "No memory for new row");
   1256   }
   1257 
   1258   /* unpack row */
   1259   err = unpack_row (wdb, data.data, data.size, my_row);
   1260   free (data.data);
   1261   if (err)
   1262   {
   1263     free (my_row);
   1264     return nerr_pass(err);
   1265   }
   1266 
   1267   *row = my_row;
   1268 
   1269   return STATUS_OK;
   1270 }
   1271 
   1272 NEOERR *wdbr_create (WDB *wdb, const char *key, WDBRow **row)
   1273 {
   1274   WDBRow *my_row;
   1275   NEOERR *err = STATUS_OK;
   1276 
   1277   *row = NULL;
   1278 
   1279   /* allocate row */
   1280   err = alloc_row (wdb, &my_row);
   1281   if (err) return nerr_pass(err);
   1282 
   1283   my_row->key_value = strdup(key);
   1284   if (my_row->key_value == NULL)
   1285   {
   1286     wdbr_destroy (wdb, &my_row);
   1287     return nerr_raise (NERR_NOMEM, "No memory for new row");
   1288   }
   1289 
   1290   *row = my_row;
   1291 
   1292   return STATUS_OK;
   1293 }
   1294 
   1295 NEOERR *wdbr_save (WDB *wdb, WDBRow *row, int flags)
   1296 {
   1297   DBT dkey, data;
   1298   int dflags = 0;
   1299   NEOERR *err = STATUS_OK;
   1300   int r;
   1301 
   1302   memset(&dkey, 0, sizeof(dkey));
   1303   memset(&data, 0, sizeof(data));
   1304 
   1305   dkey.data = row->key_value;
   1306   dkey.size = strlen(row->key_value);
   1307 
   1308   err = pack_row (wdb, row, &(data.data), &data.size);
   1309   if (err != STATUS_OK) return nerr_pass(err);
   1310 
   1311   if (flags & WDBR_INSERT)
   1312   {
   1313     dflags = DB_NOOVERWRITE;
   1314   }
   1315 
   1316   r = wdb->db->put (wdb->db, NULL, &dkey, &data, dflags);
   1317   free (data.data);
   1318   if (r == DB_KEYEXIST)
   1319     return nerr_raise (NERR_DUPLICATE, "Key %s already exists", row->key_value);
   1320   if (r)
   1321     return nerr_raise (NERR_DB, "Error saving key %s: %d",
   1322 	row->key_value, r);
   1323 
   1324   return STATUS_OK;
   1325 }
   1326 
   1327 NEOERR *wdbr_delete (WDB *wdb, const char *key)
   1328 {
   1329   DBT dkey;
   1330   int r;
   1331 
   1332   memset(&dkey, 0, sizeof(dkey));
   1333 
   1334   dkey.flags = DB_DBT_USERMEM;
   1335 
   1336   dkey.data = (void *)key;
   1337   dkey.size = strlen(key);
   1338 
   1339   r = wdb->db->del (wdb->db, NULL, &dkey, 0);
   1340   if (r == DB_NOTFOUND)
   1341     return nerr_raise (NERR_NOT_FOUND, "Key %s not found", key);
   1342   else if (r)
   1343     return nerr_raise (NERR_DB, "Error deleting key %s: %d", key, r);
   1344 
   1345 
   1346   return STATUS_OK;
   1347 }
   1348 
   1349 NEOERR *wdbr_dump (WDB *wdb, WDBRow *row)
   1350 {
   1351   int x;
   1352 
   1353   ne_warn ("version: %d", row->table_version);
   1354   ne_warn ("key: %s", row->key_value);
   1355   ne_warn ("count: %d", row->data_count);
   1356   for (x=0; x < row->data_count; x++)
   1357     ne_warn ("data[%d]: %s", x, row->data[x]);
   1358 
   1359   return STATUS_OK;
   1360 }
   1361 
   1362 NEOERR *wdbc_create (WDB *wdb, WDBCursor **cursor)
   1363 {
   1364   DBC *db_cursor;
   1365   WDBCursor *new_cursor;
   1366   int r;
   1367 
   1368   *cursor = NULL;
   1369 
   1370 #if (DB_VERSION_MINOR==4)
   1371   r = (wdb->db)->cursor (wdb->db, NULL, &db_cursor);
   1372 #else
   1373   r = (wdb->db)->cursor (wdb->db, NULL, &db_cursor, 0);
   1374 #endif
   1375   if (r)
   1376     return nerr_raise (NERR_DB, "Unable to create cursor: %d", r);
   1377 
   1378   new_cursor = (WDBCursor *) calloc (1, sizeof (WDBCursor));
   1379   if (new_cursor == NULL)
   1380   {
   1381     db_cursor->c_close (db_cursor);
   1382     return nerr_raise (NERR_NOMEM, "Unable to create cursor");
   1383   }
   1384 
   1385   new_cursor->table_version = wdb->table_version;
   1386   new_cursor->db_cursor = db_cursor;
   1387 
   1388   *cursor = new_cursor;
   1389 
   1390   return STATUS_OK;
   1391 }
   1392 
   1393 NEOERR *wdbc_destroy (WDB *wdb, WDBCursor **cursor)
   1394 {
   1395   if (*cursor != NULL)
   1396   {
   1397     (*cursor)->db_cursor->c_close ((*cursor)->db_cursor);
   1398     free (*cursor);
   1399     *cursor = NULL;
   1400   }
   1401   return STATUS_OK;
   1402 }
   1403 
   1404 NEOERR *wdbr_next (WDB *wdb, WDBCursor *cursor, WDBRow **row, int flags)
   1405 {
   1406   DBT dkey, data;
   1407   WDBRow *my_row;
   1408   NEOERR *err = STATUS_OK;
   1409   int r;
   1410 
   1411   *row = NULL;
   1412 
   1413   if (wdb->table_version != cursor->table_version)
   1414   {
   1415     return nerr_raise (NERR_ASSERT, "Cursor doesn't match database");
   1416   }
   1417 
   1418   memset(&dkey, 0, sizeof(dkey));
   1419   memset(&data, 0, sizeof(data));
   1420   dkey.flags = DB_DBT_MALLOC;
   1421   data.flags = DB_DBT_MALLOC;
   1422 
   1423   /* First call */
   1424   if (flags & WDBC_FIRST)
   1425   {
   1426     r = cursor->db_cursor->c_get (cursor->db_cursor, &dkey, &data, DB_FIRST);
   1427     if (r == DB_NOTFOUND)
   1428       return nerr_raise (NERR_NOT_FOUND, "Cursor empty");
   1429     else if (r)
   1430       return nerr_raise (NERR_DB, "Unable to get first item from cursor: %d",
   1431 	  r);
   1432   }
   1433   else
   1434   {
   1435     r = cursor->db_cursor->c_get (cursor->db_cursor, &dkey, &data, DB_NEXT);
   1436     if (r == DB_NOTFOUND)
   1437       return STATUS_OK;
   1438     else if (r)
   1439       return nerr_raise (NERR_DB, "Unable to get next item from cursor: %d", r);
   1440   }
   1441 
   1442   /* allocate row */
   1443   err = alloc_row (wdb, &my_row);
   1444   if (err)
   1445   {
   1446     free (data.data);
   1447     return nerr_pass(err);
   1448   }
   1449 
   1450   my_row->key_value = (char *) malloc (dkey.size + 1);
   1451   if (my_row->key_value == NULL)
   1452   {
   1453     free (data.data);
   1454     free (my_row);
   1455     return nerr_raise (NERR_NOMEM, "No memory for new row");
   1456   }
   1457 
   1458   memcpy (my_row->key_value, dkey.data, dkey.size);
   1459   my_row->key_value[dkey.size] = '\0';
   1460 
   1461   /* unpack row */
   1462   err = unpack_row (wdb, data.data, data.size, my_row);
   1463   free (data.data);
   1464   free (dkey.data);
   1465   if (err)
   1466   {
   1467     free (my_row);
   1468     return nerr_pass(err);
   1469   }
   1470 
   1471   *row = my_row;
   1472 
   1473   return STATUS_OK;
   1474 }
   1475 
   1476 NEOERR *wdbr_find (WDB *wdb, WDBCursor *cursor, const char *key, WDBRow **row)
   1477 {
   1478   DBT dkey, data;
   1479   WDBRow *my_row;
   1480   NEOERR *err = STATUS_OK;
   1481   int r;
   1482 
   1483   *row = NULL;
   1484 
   1485   if (wdb->table_version != cursor->table_version)
   1486   {
   1487     return nerr_raise (NERR_ASSERT, "Cursor doesn't match database");
   1488   }
   1489 
   1490   memset(&dkey, 0, sizeof(dkey));
   1491   memset(&data, 0, sizeof(data));
   1492   dkey.flags = DB_DBT_USERMEM;
   1493   data.flags = DB_DBT_MALLOC;
   1494 
   1495   dkey.data = (void *)key;
   1496   dkey.size = strlen(key);
   1497 
   1498   r = cursor->db_cursor->c_get (cursor->db_cursor, &dkey, &data, DB_SET_RANGE);
   1499   if (r == DB_NOTFOUND)
   1500     return STATUS_OK;
   1501   else if (r)
   1502     return nerr_raise (r, "Unable to get find item for key %s", key);
   1503 
   1504   /* allocate row */
   1505   err = alloc_row (wdb, &my_row);
   1506   if (err)
   1507   {
   1508     free (data.data);
   1509     return nerr_pass(err);
   1510   }
   1511 
   1512   my_row->key_value = (char *) malloc (dkey.size + 1);
   1513   if (my_row->key_value == NULL)
   1514   {
   1515     free (data.data);
   1516     free (my_row);
   1517     return nerr_raise (NERR_NOMEM, "No memory for new row");
   1518   }
   1519 
   1520   memcpy (my_row->key_value, dkey.data, dkey.size);
   1521   my_row->key_value[dkey.size] = '\0';
   1522 
   1523   /* unpack row */
   1524   err = unpack_row (wdb, data.data, data.size, my_row);
   1525   free (data.data);
   1526   if (err)
   1527   {
   1528     free (my_row);
   1529     return nerr_pass(err);
   1530   }
   1531 
   1532   *row = my_row;
   1533 
   1534   return STATUS_OK;
   1535 }
   1536 
   1537 NEOERR *wdb_keys (WDB *wdb, char **primary_key, ULIST **data)
   1538 {
   1539   NEOERR *err;
   1540   int x, len;
   1541   WDBColumn *col;
   1542   ULIST *my_data;
   1543   char *my_key = NULL;
   1544   char *my_col = NULL;
   1545 
   1546   *data = NULL;
   1547   *primary_key = NULL;
   1548   my_key = strdup(wdb->key);
   1549   if (my_key == NULL)
   1550     return nerr_raise (NERR_NOMEM, "Unable to allocate memory for keys");
   1551 
   1552   len = uListLength(wdb->cols_l);
   1553   err = uListInit (&my_data, len, 0);
   1554   if (err != STATUS_OK)
   1555   {
   1556     free(my_key);
   1557     return nerr_pass(err);
   1558   }
   1559 
   1560   for (x = 0; x < len; x++)
   1561   {
   1562     err = uListGet (wdb->cols_l, x, (void *)&col);
   1563     if (err) goto key_err;
   1564     my_col = strdup(col->name);
   1565     if (my_col == NULL)
   1566     {
   1567       err = nerr_raise (NERR_NOMEM, "Unable to allocate memory for keys");
   1568       goto key_err;
   1569     }
   1570     err = uListAppend (my_data, my_col);
   1571     my_col = NULL;
   1572     if (err) goto key_err;
   1573   }
   1574 
   1575   *data = my_data;
   1576   *primary_key = my_key;
   1577   return STATUS_OK;
   1578 
   1579 key_err:
   1580   if (my_key != NULL) free (my_key);
   1581   if (my_col != NULL) free (my_col);
   1582   *primary_key = NULL;
   1583   uListDestroy (&my_data, 0);
   1584   return nerr_pass(err);
   1585 }
   1586 
   1587 /*
   1588  * Known Issues:
   1589  *  - Probably need to store the actual key value in the packed row..
   1590  *    Maybe not, because any cursor you use on a sleepycat db will
   1591  *    return the key...
   1592  * - um, memory.  Especially when dealing with rows, need to keep track
   1593  *   of when we allocate, when we dealloc, and who owns that memory to
   1594  *   free it
   1595  * - function to delete entry from wdb
   1596  */
   1597