Home | History | Annotate | Download | only in fen
      1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
      2 /* vim:set expandtab ts=4 shiftwidth=4: */
      3 /*
      4  * Copyright (C) 2008 Sun Microsystems, Inc. All rights reserved.
      5  * Use is subject to license terms.
      6  *
      7  * This library is free software; you can redistribute it and/or
      8  * modify it under the terms of the GNU Lesser General Public
      9  * License as published by the Free Software Foundation; either
     10  * version 2 of the License, or (at your option) any later version.
     11  *
     12  * This library is distributed in the hope that it will be useful,
     13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
     14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
     15  * Lesser General Public License for more details.
     16  *
     17  * You should have received a copy of the GNU Lesser General
     18  * Public License along with this library; if not, write to the
     19  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
     20  * Boston, MA 02111-1307, USA.
     21  *
     22  * Authors: Lin Ma <lin.ma (at) sun.com>
     23  */
     24 
     25 #include "config.h"
     26 #include <port.h>
     27 #include <sys/types.h>
     28 #include <sys/time.h>
     29 #include <sys/stat.h>
     30 #include <errno.h>
     31 #include <glib.h>
     32 #include "fen-data.h"
     33 #include "fen-kernel.h"
     34 #include "fen-missing.h"
     35 #include "fen-dump.h"
     36 
     37 #define	PROCESS_EVENTQ_TIME	10	/* in milliseconds */
     38 #define PAIR_EVENTS_TIMEVAL	00000	/* in microseconds */
     39 #define	PAIR_EVENTS_INC_TIMEVAL	0000	/* in microseconds */
     40 #define SCAN_CHANGINGS_TIME	50	/* in milliseconds */
     41 #define	SCAN_CHANGINGS_MAX_TIME	(4*100)	/* in milliseconds */
     42 #define	SCAN_CHANGINGS_MIN_TIME	(4*100)	/* in milliseconds */
     43 #define	INIT_CHANGES_NUM	2
     44 #define	BASE_NUM	2
     45 
     46 #ifdef GIO_COMPILATION
     47 #define FD_W if (fd_debug_enabled) g_warning
     48 static gboolean fd_debug_enabled = FALSE;
     49 #else
     50 #include "gam_error.h"
     51 #define FD_W(...) GAM_DEBUG(DEBUG_INFO, __VA_ARGS__)
     52 #endif
     53 
     54 G_LOCK_EXTERN (fen_lock);
     55 static GList *deleting_data = NULL;
     56 static guint deleting_data_id = 0;
     57 
     58 static void (*emit_once_cb) (fdata *f, int events, gpointer sub);
     59 static void (*emit_cb) (fdata *f, int events);
     60 static int (*_event_converter) (int event);
     61 
     62 static gboolean fdata_delete (fdata* f);
     63 static gint fdata_sub_find (gpointer a, gpointer b);
     64 static void scan_children (node_t *f);
     65 static void scan_known_children (node_t* f);
     66 
     67 node_t*
     68 add_missing_cb (node_t* parent, gpointer user_data)
     69 {
     70     g_assert (parent);
     71     FD_W ("%s p:0x%p %s\n", __func__, parent, (gchar*)user_data);
     72     return add_node (parent, (gchar*)user_data);
     73 }
     74 
     75 gboolean
     76 pre_del_cb (node_t* node, gpointer user_data)
     77 {
     78     fdata* data;
     79 
     80     g_assert (node);
     81     data = node_get_data (node);
     82     FD_W ("%s node:0x%p %s\n", __func__, node, NODE_NAME(node));
     83     if (data != NULL) {
     84         if (!FN_IS_PASSIVE(data)) {
     85             return FALSE;
     86         }
     87         fdata_delete (data);
     88     }
     89     return TRUE;
     90 }
     91 
     92 static guint
     93 _pow (guint x, guint y)
     94 {
     95     guint z = 1;
     96     g_assert (x >= 0 && y >= 0);
     97     for (; y > 0; y--) {
     98         z *= x;
     99     }
    100     return z;
    101 }
    102 
    103 static guint
    104 get_scalable_scan_time (fdata* data)
    105 {
    106     guint sleep_time;
    107     /* Caculate from num = 0 */
    108     sleep_time = _pow (BASE_NUM, data->changed_event_num) * SCAN_CHANGINGS_TIME;
    109     if (sleep_time < SCAN_CHANGINGS_MIN_TIME) {
    110         sleep_time = SCAN_CHANGINGS_MIN_TIME;
    111     } else if (sleep_time > SCAN_CHANGINGS_MAX_TIME) {
    112         sleep_time = SCAN_CHANGINGS_MAX_TIME;
    113         data->change_update_id = INIT_CHANGES_NUM;
    114     }
    115     FD_W ("SCALABE SCAN num:time [ %4u : %4u ] %s\n", data->changed_event_num, sleep_time, FN_NAME(data));
    116     return sleep_time;
    117 }
    118 
    119 static gboolean
    120 g_timeval_lt (GTimeVal *val1, GTimeVal *val2)
    121 {
    122     if (val1->tv_sec < val2->tv_sec)
    123         return TRUE;
    124 
    125     if (val1->tv_sec > val2->tv_sec)
    126         return FALSE;
    127 
    128     /* val1->tv_sec == val2->tv_sec */
    129     if (val1->tv_usec < val2->tv_usec)
    130         return TRUE;
    131 
    132     return FALSE;
    133 }
    134 
    135 /**
    136  * If all active children nodes are ported, then cancel monitor the parent node
    137  *
    138  * Unsafe, need lock.
    139  */
    140 static void
    141 scan_known_children (node_t* f)
    142 {
    143 	GDir *dir;
    144 	GError *err = NULL;
    145     fdata* pdata;
    146 
    147     FD_W ("%s %s [0x%p]\n", __func__, NODE_NAME(f), f);
    148     pdata = node_get_data (f);
    149     /*
    150      * Currect fdata must is directly monitored. Be sure it is 1 level monitor.
    151      */
    152 	dir = g_dir_open (NODE_NAME(f), 0, &err);
    153 	if (dir) {
    154 		const char *basename;
    155 
    156 		while ((basename = g_dir_read_name (dir)))
    157 		{
    158             node_t* childf = NULL;
    159             fdata* data;
    160 			GList *idx;
    161 			/*
    162              * If the node is existed, and isn't ported, then emit created
    163              * event. Ignore others.
    164              */
    165             childf = children_find (f, basename);
    166             if (childf &&
    167               (data = node_get_data (childf)) != NULL &&
    168               !FN_IS_PASSIVE (data)) {
    169                 if (!is_monitoring (data) &&
    170                   port_add (&data->fobj, &data->len, data)) {
    171                     fdata_emit_events (data, FN_EVENT_CREATED);
    172                 }
    173             }
    174         }
    175 		g_dir_close (dir);
    176     } else {
    177         FD_W (err->message);
    178         g_error_free (err);
    179     }
    180 }
    181 
    182 static void
    183 scan_children (node_t *f)
    184 {
    185 	GDir *dir;
    186 	GError *err = NULL;
    187     fdata* pdata;
    188 
    189     FD_W ("%s %s [0x%p]\n", __func__, NODE_NAME(f), f);
    190     pdata = node_get_data (f);
    191     /*
    192      * Currect fdata must is directly monitored. Be sure it is 1 level monitor.
    193      */
    194 	dir = g_dir_open (NODE_NAME(f), 0, &err);
    195 	if (dir) {
    196 		const char *basename;
    197 
    198 		while ((basename = g_dir_read_name (dir)))
    199 		{
    200             node_t* childf = NULL;
    201             fdata* data;
    202 			GList *idx;
    203 
    204             childf = children_find (f, basename);
    205             if (childf == NULL) {
    206                 gchar *filename;
    207 
    208                 filename = g_build_filename (NODE_NAME(f), basename, NULL);
    209                 childf = add_node (f, filename);
    210                 g_assert (childf);
    211                 data = fdata_new (childf, FALSE);
    212                 g_free (filename);
    213             }
    214             if ((data = node_get_data (childf)) == NULL) {
    215                 data = fdata_new (childf, FALSE);
    216             }
    217             /* Be sure data isn't ported and add to port successfully */
    218             /* Don't need delete it, it will be deleted by the parent */
    219             if (is_monitoring (data)) {
    220                 /* Ignored */
    221             } else if (/* !is_ported (data) && */
    222                 port_add (&data->fobj, &data->len, data)) {
    223                 fdata_emit_events (data, FN_EVENT_CREATED);
    224             }
    225         }
    226 		g_dir_close (dir);
    227     } else {
    228         FD_W (err->message);
    229         g_error_free (err);
    230     }
    231 }
    232 
    233 static gboolean
    234 scan_deleting_data (gpointer data)
    235 {
    236     fdata *f;
    237     GList* i;
    238     GList* deleted_list = NULL;
    239     gboolean ret = TRUE;
    240 
    241     if (G_TRYLOCK (fen_lock)) {
    242         for (i = deleting_data; i; i = i->next) {
    243             f = (fdata*)i->data;
    244             if (fdata_delete (f)) {
    245                 deleted_list = g_list_prepend (deleted_list, i);
    246             }
    247         }
    248 
    249         for (i = deleted_list; i; i = i->next) {
    250             deleting_data = g_list_remove_link (deleting_data,
    251               (GList *)i->data);
    252             g_list_free_1 ((GList *)i->data);
    253         }
    254         g_list_free (deleted_list);
    255 
    256         if (deleting_data == NULL) {
    257             deleting_data_id = 0;
    258             ret = FALSE;
    259         }
    260         G_UNLOCK (fen_lock);
    261     }
    262     return ret;
    263 }
    264 
    265 gboolean
    266 is_monitoring (fdata* data)
    267 {
    268     return is_ported (data) || data->change_update_id > 0;
    269 }
    270 
    271 fdata*
    272 get_parent_data (fdata* data)
    273 {
    274     if (FN_NODE(data) && !IS_TOPNODE(FN_NODE(data))) {
    275         return node_get_data (FN_NODE(data)->parent);
    276     }
    277     return NULL;
    278 }
    279 
    280 node_t*
    281 get_parent_node (fdata* data)
    282 {
    283     if (FN_NODE(data)) {
    284         return (FN_NODE(data)->parent);
    285     }
    286     return NULL;
    287 }
    288 
    289 fdata *
    290 fdata_new (node_t* node, gboolean is_mondir)
    291 {
    292 	fdata *f = NULL;
    293 
    294     g_assert (node);
    295 	if ((f = g_new0 (fdata, 1)) != NULL) {
    296         FN_NODE(f) = node;
    297 		FN_NAME(f) = g_strdup (NODE_NAME(node));
    298         f->is_dir = is_mondir;
    299         f->eventq = g_queue_new ();
    300         FD_W ("[ %s ] 0x%p %s\n", __func__, f, FN_NAME(f));
    301         node_set_data (node, f);
    302 	}
    303 	return f;
    304 }
    305 
    306 static gboolean
    307 fdata_delete (fdata *f)
    308 {
    309     fnode_event_t *ev;
    310 
    311     FD_W ("[ TRY %s ] 0x%p id[%4d:%4d] %s\n", __func__, f, f->eventq_id, f->change_update_id, FN_NAME(f));
    312     g_assert (FN_IS_PASSIVE(f));
    313 
    314     port_remove (f);
    315     /* missing_remove (f); */
    316 
    317     if (f->node != NULL) {
    318         node_set_data (f->node, NULL);
    319         f->node = NULL;
    320     }
    321 
    322     if (f->change_update_id > 0 || f->eventq_id > 0) {
    323         if (FN_IS_LIVING(f)) {
    324             f->is_cancelled = TRUE;
    325             deleting_data = g_list_prepend (deleting_data, f);
    326             if (deleting_data_id == 0) {
    327                 deleting_data_id = g_idle_add (scan_deleting_data, NULL);
    328                 g_assert (deleting_data_id > 0);
    329             }
    330         }
    331         return FALSE;
    332     }
    333     FD_W ("[ %s ] 0x%p %s\n", __func__, f, FN_NAME(f));
    334 
    335     while ((ev = g_queue_pop_head (f->eventq)) != NULL) {
    336         fnode_event_delete (ev);
    337     }
    338 
    339     g_queue_free (f->eventq);
    340     g_free (FN_NAME(f));
    341     g_free (f);
    342     return TRUE;
    343 }
    344 
    345 void
    346 fdata_reset (fdata* data)
    347 {
    348     fnode_event_t *ev;
    349 
    350     g_assert (data);
    351 
    352     while ((ev = g_queue_pop_head (data->eventq)) != NULL) {
    353         fnode_event_delete (ev);
    354     }
    355 }
    356 
    357 static gint
    358 fdata_sub_find (gpointer a, gpointer b)
    359 {
    360     if (a != b) {
    361         return 1;
    362     } else {
    363         return 0;
    364     }
    365 }
    366 
    367 void
    368 fdata_sub_add (fdata *f, gpointer sub)
    369 {
    370     FD_W ("[%s] [data: 0x%p ] [s: 0x%p ] %s\n", __func__, f, sub, FN_NAME(f));
    371     g_assert (g_list_find_custom (f->subs, sub, (GCompareFunc)fdata_sub_find) == NULL);
    372     f->subs = g_list_prepend (f->subs, sub);
    373 }
    374 
    375 void
    376 fdata_sub_remove (fdata *f, gpointer sub)
    377 {
    378     GList *l;
    379     FD_W ("[%s] [data: 0x%p ] [s: 0x%p ] %s\n", __func__, f, sub, FN_NAME(f));
    380     g_assert (g_list_find_custom (f->subs, sub, (GCompareFunc)fdata_sub_find) != NULL);
    381     l = g_list_find_custom (f->subs, sub, (GCompareFunc)fdata_sub_find);
    382     g_assert (l);
    383     g_assert (sub == l->data);
    384     f->subs = g_list_delete_link (f->subs, l);
    385 }
    386 
    387 /**
    388  * Adjust self on failing to Port
    389  */
    390 void
    391 fdata_adjust_deleted (fdata* f)
    392 {
    393     node_t* parent;
    394     fdata* pdata;
    395     node_op_t op = {NULL, NULL, pre_del_cb, NULL};
    396 
    397     /*
    398      * It's a top node. We move it to missing list.
    399      */
    400     parent = get_parent_node (f);
    401     pdata = get_parent_data (f);
    402     if (!FN_IS_PASSIVE(f) ||
    403       children_num (FN_NODE(f)) > 0 ||
    404       (pdata && !FN_IS_PASSIVE(pdata))) {
    405         if (parent) {
    406             if (pdata == NULL) {
    407                 pdata = fdata_new (parent, FALSE);
    408             }
    409             g_assert (pdata);
    410             if (!port_add (&pdata->fobj, &pdata->len, pdata)) {
    411                 fdata_adjust_deleted (pdata);
    412             }
    413         } else {
    414             /* f is root */
    415             g_assert (IS_TOPNODE(FN_NODE(f)));
    416             missing_add (f);
    417         }
    418     } else {
    419 #ifdef GIO_COMPILATION
    420         pending_remove_node (FN_NODE(f), &op);
    421 #else
    422         remove_node (FN_NODE(f), &op);
    423 #endif
    424     }
    425 }
    426 
    427 static gboolean
    428 fdata_adjust_changed (fdata *f)
    429 {
    430     fnode_event_t *ev;
    431     struct stat buf;
    432     node_t* parent;
    433     fdata* pdata;
    434 
    435     G_LOCK (fen_lock);
    436     parent = get_parent_node (f);
    437     pdata = get_parent_data (f);
    438 
    439     if (!FN_IS_LIVING(f) ||
    440       (children_num (FN_NODE(f)) == 0 &&
    441         FN_IS_PASSIVE(f) &&
    442         pdata && FN_IS_PASSIVE(pdata))) {
    443         f->change_update_id = 0;
    444         G_UNLOCK (fen_lock);
    445         return FALSE;
    446     }
    447 
    448     FD_W ("[ %s ] %s\n", __func__, FN_NAME(f));
    449     if (FN_STAT (FN_NAME(f), &buf) != 0) {
    450         FD_W ("LSTAT [%-20s] %s\n", FN_NAME(f), g_strerror (errno));
    451         goto L_delete;
    452     }
    453     f->is_dir = S_ISDIR (buf.st_mode) ? TRUE : FALSE;
    454     if (f->len != buf.st_size) {
    455         /* FD_W ("LEN [%lld:%lld] %s\n", f->len, buf.st_size, FN_NAME(f)); */
    456         f->len = buf.st_size;
    457         ev = fnode_event_new (FILE_MODIFIED, TRUE, f);
    458         if (ev != NULL) {
    459             ev->is_pending = TRUE;
    460             fdata_add_event (f, ev);
    461         }
    462         /* Fdata is still changing, so scalable scan */
    463         f->change_update_id = g_timeout_add (get_scalable_scan_time (f),
    464           (GSourceFunc)fdata_adjust_changed,
    465           (gpointer)f);
    466         G_UNLOCK (fen_lock);
    467         return FALSE;
    468     } else {
    469         f->changed_event_num = 0;
    470         f->fobj.fo_atime = buf.st_atim;
    471         f->fobj.fo_mtime = buf.st_mtim;
    472         f->fobj.fo_ctime = buf.st_ctim;
    473         if (FN_IS_DIR(f)) {
    474             if (FN_IS_MONDIR(f)) {
    475                 scan_children (FN_NODE(f));
    476             } else {
    477                 scan_known_children (FN_NODE(f));
    478                 if ((children_num (FN_NODE(f)) == 0 &&
    479                       FN_IS_PASSIVE(f) &&
    480                       pdata && FN_IS_PASSIVE(pdata))) {
    481                     port_remove (f);
    482                     goto L_exit;
    483                 }
    484             }
    485         }
    486         if (!port_add_simple (&f->fobj, f)) {
    487         L_delete:
    488             ev = fnode_event_new (FILE_DELETE, FALSE, f);
    489             if (ev != NULL) {
    490                 fdata_add_event (f, ev);
    491             }
    492         }
    493     }
    494 L_exit:
    495     f->change_update_id = 0;
    496     G_UNLOCK (fen_lock);
    497     return FALSE;
    498 }
    499 
    500 void
    501 fdata_emit_events_once (fdata *f, int event, gpointer sub)
    502 {
    503     emit_once_cb (f, _event_converter (event), sub);
    504 }
    505 
    506 void
    507 fdata_emit_events (fdata *f, int event)
    508 {
    509     emit_cb (f, _event_converter (event));
    510 }
    511 
    512 static gboolean
    513 process_events (gpointer udata)
    514 {
    515     node_op_t op = {NULL, NULL, pre_del_cb, NULL};
    516     fdata* f;
    517     fnode_event_t* ev;
    518     int e;
    519 
    520     /* FD_W ("IN <======== %s\n", __func__); */
    521 
    522     f = (fdata*)udata;
    523     FD_W ("%s 0x%p id:%-4d %s\n", __func__, f, f->eventq_id, FN_NAME(f));
    524 
    525     G_LOCK (fen_lock);
    526 
    527     if (!FN_IS_LIVING(f)) {
    528         f->eventq_id = 0;
    529         G_UNLOCK (fen_lock);
    530         return FALSE;
    531     }
    532 
    533     if ((ev = (fnode_event_t*)g_queue_pop_head (f->eventq)) != NULL) {
    534         /* Send events to clients. */
    535         e = ev->e;
    536         if (!ev->is_pending) {
    537 #ifdef GIO_COMPILATION
    538             if (ev->has_twin) {
    539                 fdata_emit_events (f, FILE_ATTRIB);
    540             }
    541 #endif
    542             fdata_emit_events (f, ev->e);
    543         }
    544 
    545         fnode_event_delete (ev);
    546         ev = NULL;
    547 
    548         /* Adjust node state. */
    549         /*
    550          * Node the node has been created, so we can delete create event in
    551          * optimizing. To reduce the statings, we add it to Port on discoving
    552          * it then emit CREATED event. So we don't need to do anything here.
    553          */
    554         switch (e) {
    555         case FILE_MODIFIED:
    556         case MOUNTEDOVER:
    557         case UNMOUNTED:
    558             /* If the event is a changed event, then pending process it */
    559             if (f->change_update_id == 0) {
    560                 f->change_update_id = g_timeout_add (get_scalable_scan_time(f),
    561                   (GSourceFunc)fdata_adjust_changed,
    562                   (gpointer)f);
    563                 g_assert (f->change_update_id > 0);
    564             }
    565             break;
    566         case FILE_ATTRIB:
    567             g_assert (f->change_update_id == 0);
    568             if (!port_add (&f->fobj, &f->len, f)) {
    569                 ev = fnode_event_new (FILE_DELETE, FALSE, f);
    570                 if (ev != NULL) {
    571                     fdata_add_event (f, ev);
    572                 }
    573             }
    574             break;
    575         case FILE_DELETE: /* Ignored */
    576             break;
    577         default:
    578             g_assert_not_reached ();
    579             break;
    580         }
    581         /* Process one event a time */
    582         G_UNLOCK (fen_lock);
    583         return TRUE;
    584     }
    585     f->eventq_id = 0;
    586     G_UNLOCK (fen_lock);
    587     /* FD_W ("OUT ========> %s\n", __func__); */
    588     return FALSE;
    589 }
    590 
    591 /**
    592  * fdata_add_event:
    593  *
    594  */
    595 void
    596 fdata_add_event (fdata *f, fnode_event_t *ev)
    597 {
    598     node_op_t op = {NULL, NULL, pre_del_cb, NULL};
    599     fnode_event_t *tail;
    600 
    601     if (!FN_IS_LIVING(f)) {
    602         fnode_event_delete (ev);
    603         return;
    604     }
    605 
    606     FD_W ("%s %d\n", __func__, ev->e);
    607     g_get_current_time (&ev->t);
    608     /*
    609      * If created/deleted events of child node happened, then we use parent
    610      * event queue to handle.
    611      * If child node emits deleted event, it seems no changes for the parent
    612      * node, but the attr is changed. So we may try to cancel processing the
    613      * coming changed events of the parent node.
    614      */
    615     tail = (fnode_event_t*)g_queue_peek_tail (f->eventq);
    616     switch (ev->e) {
    617     case FILE_RENAME_FROM:
    618     case FILE_RENAME_TO:
    619     case FILE_ACCESS:
    620         fnode_event_delete (ev);
    621         g_assert_not_reached ();
    622         return;
    623     case FILE_DELETE:
    624         /* clear changed event number */
    625         f->changed_event_num = 0;
    626         /*
    627          * We will cancel all previous events.
    628          */
    629         if (tail) {
    630             g_queue_pop_tail (f->eventq);
    631             do {
    632                 fnode_event_delete (tail);
    633             } while ((tail = (fnode_event_t*)g_queue_pop_tail (f->eventq)) != NULL);
    634         }
    635         /*
    636          * Given a node "f" is deleted, process it ASAP.
    637          */
    638         fdata_emit_events (f, ev->e);
    639         fnode_event_delete (ev);
    640         fdata_adjust_deleted (f);
    641         return;
    642     case FILE_MODIFIED:
    643     case UNMOUNTED:
    644     case MOUNTEDOVER:
    645         /* clear changed event number */
    646         f->changed_event_num ++;
    647     case FILE_ATTRIB:
    648     default:
    649         /*
    650          * If in the time range, we will try optimizing
    651          * (changed+) to (changed)
    652          * (attrchanged changed) to ([changed, attrchanged])
    653          * (event attrchanged) to ([event, attrchanged])
    654          */
    655         if (tail) {
    656             do {
    657                 if (tail->e == ev->e) {
    658                     if (g_timeval_lt (&ev->t, &tail->t)) {
    659                         g_queue_peek_tail (f->eventq);
    660                         /* Add the increment */
    661                         g_time_val_add (&ev->t, PAIR_EVENTS_INC_TIMEVAL);
    662                         /* skip the previous event */
    663                         FD_W ("SKIPPED -- %s\n", _event_string (tail->e));
    664                         fnode_event_delete (tail);
    665                     } else {
    666                         break;
    667                     }
    668                 } else if (ev->e == FILE_MODIFIED && tail->e == FILE_ATTRIB) {
    669                     ev->has_twin = TRUE;
    670                     fnode_event_delete (tail);
    671                 } else if (ev->e == FILE_ATTRIB && f->change_update_id > 0) {
    672                     tail->has_twin = TRUE;
    673                     /* skip the current event */
    674                     fnode_event_delete (ev);
    675                     return;
    676                 } else {
    677                     break;
    678                 }
    679             } while ((tail = (fnode_event_t*)g_queue_peek_tail (f->eventq)) != NULL);
    680         }
    681     }
    682 
    683     /* must add the threshold time */
    684     g_time_val_add (&ev->t, PAIR_EVENTS_TIMEVAL);
    685 
    686     g_queue_push_tail (f->eventq, ev);
    687 
    688     /* starting process_events */
    689     if (f->eventq_id == 0) {
    690         f->eventq_id = g_timeout_add (PROCESS_EVENTQ_TIME,
    691           process_events,
    692           (gpointer)f);
    693         g_assert (f->eventq_id > 0);
    694     }
    695     FD_W ("%s 0x%p id:%-4d %s\n", __func__, f, f->eventq_id, FN_NAME(f));
    696 }
    697 
    698 gboolean
    699 fdata_class_init (void (*user_emit_cb) (fdata*, int),
    700   void (*user_emit_once_cb) (fdata*, int,  gpointer),
    701   int (*user_event_converter) (int event))
    702 {
    703     FD_W ("%s\n", __func__);
    704     if (user_emit_cb == NULL) {
    705         return FALSE;
    706     }
    707     if (user_emit_once_cb == NULL) {
    708         return FALSE;
    709     }
    710     if (user_event_converter == NULL) {
    711         return FALSE;
    712     }
    713     emit_cb = user_emit_cb;
    714     emit_once_cb = user_emit_once_cb;
    715     _event_converter = user_event_converter;
    716 
    717     if (!port_class_init (fdata_add_event)) {
    718         FD_W ("port_class_init failed.");
    719         return FALSE;
    720     }
    721     return TRUE;
    722 }
    723