1 /*** 2 This file is part of avahi. 3 4 avahi is free software; you can redistribute it and/or modify it 5 under the terms of the GNU Lesser General Public License as 6 published by the Free Software Foundation; either version 2.1 of the 7 License, or (at your option) any later version. 8 9 avahi is distributed in the hope that it will be useful, but WITHOUT 10 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 11 or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General 12 Public License for more details. 13 14 You should have received a copy of the GNU Lesser General Public 15 License along with avahi; if not, write to the Free Software 16 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 17 USA. 18 ***/ 19 20 #ifdef HAVE_CONFIG_H 21 #include <config.h> 22 #endif 23 24 #include <stdlib.h> 25 26 #include <avahi-common/timeval.h> 27 #include "avahi-common/avahi-malloc.h" 28 29 #include "response-sched.h" 30 #include "log.h" 31 #include "rr-util.h" 32 33 /* Local packets are supressed this long after sending them */ 34 #define AVAHI_RESPONSE_HISTORY_MSEC 500 35 36 /* Local packets are deferred this long before sending them */ 37 #define AVAHI_RESPONSE_DEFER_MSEC 20 38 39 /* Additional jitter for deferred packets */ 40 #define AVAHI_RESPONSE_JITTER_MSEC 100 41 42 /* Remote packets can suppress local traffic as long as this value */ 43 #define AVAHI_RESPONSE_SUPPRESS_MSEC 700 44 45 typedef struct AvahiResponseJob AvahiResponseJob; 46 47 typedef enum { 48 AVAHI_SCHEDULED, 49 AVAHI_DONE, 50 AVAHI_SUPPRESSED 51 } AvahiResponseJobState; 52 53 struct AvahiResponseJob { 54 AvahiResponseScheduler *scheduler; 55 AvahiTimeEvent *time_event; 56 57 AvahiResponseJobState state; 58 struct timeval delivery; 59 60 AvahiRecord *record; 61 int flush_cache; 62 AvahiAddress querier; 63 int querier_valid; 64 65 AVAHI_LLIST_FIELDS(AvahiResponseJob, jobs); 66 }; 67 68 struct AvahiResponseScheduler { 69 AvahiInterface *interface; 70 AvahiTimeEventQueue *time_event_queue; 71 72 AVAHI_LLIST_HEAD(AvahiResponseJob, jobs); 73 AVAHI_LLIST_HEAD(AvahiResponseJob, history); 74 AVAHI_LLIST_HEAD(AvahiResponseJob, suppressed); 75 }; 76 77 static AvahiResponseJob* job_new(AvahiResponseScheduler *s, AvahiRecord *record, AvahiResponseJobState state) { 78 AvahiResponseJob *rj; 79 80 assert(s); 81 assert(record); 82 83 if (!(rj = avahi_new(AvahiResponseJob, 1))) { 84 avahi_log_error(__FILE__": Out of memory"); 85 return NULL; 86 } 87 88 rj->scheduler = s; 89 rj->record = avahi_record_ref(record); 90 rj->time_event = NULL; 91 rj->flush_cache = 0; 92 rj->querier_valid = 0; 93 94 if ((rj->state = state) == AVAHI_SCHEDULED) 95 AVAHI_LLIST_PREPEND(AvahiResponseJob, jobs, s->jobs, rj); 96 else if (rj->state == AVAHI_DONE) 97 AVAHI_LLIST_PREPEND(AvahiResponseJob, jobs, s->history, rj); 98 else /* rj->state == AVAHI_SUPPRESSED */ 99 AVAHI_LLIST_PREPEND(AvahiResponseJob, jobs, s->suppressed, rj); 100 101 return rj; 102 } 103 104 static void job_free(AvahiResponseScheduler *s, AvahiResponseJob *rj) { 105 assert(s); 106 assert(rj); 107 108 if (rj->time_event) 109 avahi_time_event_free(rj->time_event); 110 111 if (rj->state == AVAHI_SCHEDULED) 112 AVAHI_LLIST_REMOVE(AvahiResponseJob, jobs, s->jobs, rj); 113 else if (rj->state == AVAHI_DONE) 114 AVAHI_LLIST_REMOVE(AvahiResponseJob, jobs, s->history, rj); 115 else /* rj->state == AVAHI_SUPPRESSED */ 116 AVAHI_LLIST_REMOVE(AvahiResponseJob, jobs, s->suppressed, rj); 117 118 avahi_record_unref(rj->record); 119 avahi_free(rj); 120 } 121 122 static void elapse_callback(AvahiTimeEvent *e, void* data); 123 124 static void job_set_elapse_time(AvahiResponseScheduler *s, AvahiResponseJob *rj, unsigned msec, unsigned jitter) { 125 struct timeval tv; 126 127 assert(s); 128 assert(rj); 129 130 avahi_elapse_time(&tv, msec, jitter); 131 132 if (rj->time_event) 133 avahi_time_event_update(rj->time_event, &tv); 134 else 135 rj->time_event = avahi_time_event_new(s->time_event_queue, &tv, elapse_callback, rj); 136 } 137 138 static void job_mark_done(AvahiResponseScheduler *s, AvahiResponseJob *rj) { 139 assert(s); 140 assert(rj); 141 142 assert(rj->state == AVAHI_SCHEDULED); 143 144 AVAHI_LLIST_REMOVE(AvahiResponseJob, jobs, s->jobs, rj); 145 AVAHI_LLIST_PREPEND(AvahiResponseJob, jobs, s->history, rj); 146 147 rj->state = AVAHI_DONE; 148 149 job_set_elapse_time(s, rj, AVAHI_RESPONSE_HISTORY_MSEC, 0); 150 151 gettimeofday(&rj->delivery, NULL); 152 } 153 154 AvahiResponseScheduler *avahi_response_scheduler_new(AvahiInterface *i) { 155 AvahiResponseScheduler *s; 156 assert(i); 157 158 if (!(s = avahi_new(AvahiResponseScheduler, 1))) { 159 avahi_log_error(__FILE__": Out of memory"); 160 return NULL; 161 } 162 163 s->interface = i; 164 s->time_event_queue = i->monitor->server->time_event_queue; 165 166 AVAHI_LLIST_HEAD_INIT(AvahiResponseJob, s->jobs); 167 AVAHI_LLIST_HEAD_INIT(AvahiResponseJob, s->history); 168 AVAHI_LLIST_HEAD_INIT(AvahiResponseJob, s->suppressed); 169 170 return s; 171 } 172 173 void avahi_response_scheduler_free(AvahiResponseScheduler *s) { 174 assert(s); 175 176 avahi_response_scheduler_clear(s); 177 avahi_free(s); 178 } 179 180 void avahi_response_scheduler_clear(AvahiResponseScheduler *s) { 181 assert(s); 182 183 while (s->jobs) 184 job_free(s, s->jobs); 185 while (s->history) 186 job_free(s, s->history); 187 while (s->suppressed) 188 job_free(s, s->suppressed); 189 } 190 191 static void enumerate_aux_records_callback(AVAHI_GCC_UNUSED AvahiServer *s, AvahiRecord *r, int flush_cache, void* userdata) { 192 AvahiResponseJob *rj = userdata; 193 194 assert(r); 195 assert(rj); 196 197 avahi_response_scheduler_post(rj->scheduler, r, flush_cache, rj->querier_valid ? &rj->querier : NULL, 0); 198 } 199 200 static int packet_add_response_job(AvahiResponseScheduler *s, AvahiDnsPacket *p, AvahiResponseJob *rj) { 201 assert(s); 202 assert(p); 203 assert(rj); 204 205 /* Try to add this record to the packet */ 206 if (!avahi_dns_packet_append_record(p, rj->record, rj->flush_cache, 0)) 207 return 0; 208 209 /* Ok, this record will definitely be sent, so schedule the 210 * auxilliary packets, too */ 211 avahi_server_enumerate_aux_records(s->interface->monitor->server, s->interface, rj->record, enumerate_aux_records_callback, rj); 212 job_mark_done(s, rj); 213 214 return 1; 215 } 216 217 static void send_response_packet(AvahiResponseScheduler *s, AvahiResponseJob *rj) { 218 AvahiDnsPacket *p; 219 unsigned n; 220 221 assert(s); 222 assert(rj); 223 224 if (!(p = avahi_dns_packet_new_response(s->interface->hardware->mtu, 1))) 225 return; /* OOM */ 226 n = 1; 227 228 /* Put it in the packet. */ 229 if (packet_add_response_job(s, p, rj)) { 230 231 /* Try to fill up packet with more responses, if available */ 232 while (s->jobs) { 233 234 if (!packet_add_response_job(s, p, s->jobs)) 235 break; 236 237 n++; 238 } 239 240 } else { 241 size_t size; 242 243 avahi_dns_packet_free(p); 244 245 /* OK, the packet was too small, so create one that fits */ 246 size = avahi_record_get_estimate_size(rj->record) + AVAHI_DNS_PACKET_HEADER_SIZE; 247 248 if (!(p = avahi_dns_packet_new_response(size + AVAHI_DNS_PACKET_EXTRA_SIZE, 1))) 249 return; /* OOM */ 250 251 if (!packet_add_response_job(s, p, rj)) { 252 avahi_dns_packet_free(p); 253 254 avahi_log_warn("Record too large, cannot send"); 255 job_mark_done(s, rj); 256 return; 257 } 258 } 259 260 avahi_dns_packet_set_field(p, AVAHI_DNS_FIELD_ANCOUNT, n); 261 avahi_interface_send_packet(s->interface, p); 262 avahi_dns_packet_free(p); 263 } 264 265 static void elapse_callback(AVAHI_GCC_UNUSED AvahiTimeEvent *e, void* data) { 266 AvahiResponseJob *rj = data; 267 268 assert(rj); 269 270 if (rj->state == AVAHI_DONE || rj->state == AVAHI_SUPPRESSED) 271 job_free(rj->scheduler, rj); /* Lets drop this entry */ 272 else 273 send_response_packet(rj->scheduler, rj); 274 } 275 276 static AvahiResponseJob* find_scheduled_job(AvahiResponseScheduler *s, AvahiRecord *record) { 277 AvahiResponseJob *rj; 278 279 assert(s); 280 assert(record); 281 282 for (rj = s->jobs; rj; rj = rj->jobs_next) { 283 assert(rj->state == AVAHI_SCHEDULED); 284 285 if (avahi_record_equal_no_ttl(rj->record, record)) 286 return rj; 287 } 288 289 return NULL; 290 } 291 292 static AvahiResponseJob* find_history_job(AvahiResponseScheduler *s, AvahiRecord *record) { 293 AvahiResponseJob *rj; 294 295 assert(s); 296 assert(record); 297 298 for (rj = s->history; rj; rj = rj->jobs_next) { 299 assert(rj->state == AVAHI_DONE); 300 301 if (avahi_record_equal_no_ttl(rj->record, record)) { 302 /* Check whether this entry is outdated */ 303 304 /* avahi_log_debug("history age: %u", (unsigned) (avahi_age(&rj->delivery)/1000)); */ 305 306 if (avahi_age(&rj->delivery)/1000 > AVAHI_RESPONSE_HISTORY_MSEC) { 307 /* it is outdated, so let's remove it */ 308 job_free(s, rj); 309 return NULL; 310 } 311 312 return rj; 313 } 314 } 315 316 return NULL; 317 } 318 319 static AvahiResponseJob* find_suppressed_job(AvahiResponseScheduler *s, AvahiRecord *record, const AvahiAddress *querier) { 320 AvahiResponseJob *rj; 321 322 assert(s); 323 assert(record); 324 assert(querier); 325 326 for (rj = s->suppressed; rj; rj = rj->jobs_next) { 327 assert(rj->state == AVAHI_SUPPRESSED); 328 assert(rj->querier_valid); 329 330 if (avahi_record_equal_no_ttl(rj->record, record) && 331 avahi_address_cmp(&rj->querier, querier) == 0) { 332 /* Check whether this entry is outdated */ 333 334 if (avahi_age(&rj->delivery) > AVAHI_RESPONSE_SUPPRESS_MSEC*1000) { 335 /* it is outdated, so let's remove it */ 336 job_free(s, rj); 337 return NULL; 338 } 339 340 return rj; 341 } 342 } 343 344 return NULL; 345 } 346 347 int avahi_response_scheduler_post(AvahiResponseScheduler *s, AvahiRecord *record, int flush_cache, const AvahiAddress *querier, int immediately) { 348 AvahiResponseJob *rj; 349 struct timeval tv; 350 /* char *t; */ 351 352 assert(s); 353 assert(record); 354 355 assert(!avahi_key_is_pattern(record->key)); 356 357 /* t = avahi_record_to_string(record); */ 358 /* avahi_log_debug("post %i %s", immediately, t); */ 359 /* avahi_free(t); */ 360 361 /* Check whether this response is suppressed */ 362 if (querier && 363 (rj = find_suppressed_job(s, record, querier)) && 364 avahi_record_is_goodbye(record) == avahi_record_is_goodbye(rj->record) && 365 rj->record->ttl >= record->ttl/2) { 366 367 /* avahi_log_debug("Response suppressed by known answer suppression."); */ 368 return 0; 369 } 370 371 /* Check if we already sent this response recently */ 372 if ((rj = find_history_job(s, record))) { 373 374 if (avahi_record_is_goodbye(record) == avahi_record_is_goodbye(rj->record) && 375 rj->record->ttl >= record->ttl/2 && 376 (rj->flush_cache || !flush_cache)) { 377 /* avahi_log_debug("Response suppressed by local duplicate suppression (history)"); */ 378 return 0; 379 } 380 381 /* Outdated ... */ 382 job_free(s, rj); 383 } 384 385 avahi_elapse_time(&tv, immediately ? 0 : AVAHI_RESPONSE_DEFER_MSEC, immediately ? 0 : AVAHI_RESPONSE_JITTER_MSEC); 386 387 if ((rj = find_scheduled_job(s, record))) { 388 /* avahi_log_debug("Response suppressed by local duplicate suppression (scheduled)"); */ 389 390 /* Update a little ... */ 391 392 /* Update the time if the new is prior to the old */ 393 if (avahi_timeval_compare(&tv, &rj->delivery) < 0) { 394 rj->delivery = tv; 395 avahi_time_event_update(rj->time_event, &rj->delivery); 396 } 397 398 /* Update the flush cache bit */ 399 if (flush_cache) 400 rj->flush_cache = 1; 401 402 /* Update the querier field */ 403 if (!querier || (rj->querier_valid && avahi_address_cmp(querier, &rj->querier) != 0)) 404 rj->querier_valid = 0; 405 406 /* Update record data (just for the TTL) */ 407 avahi_record_unref(rj->record); 408 rj->record = avahi_record_ref(record); 409 410 return 1; 411 } else { 412 /* avahi_log_debug("Accepted new response job."); */ 413 414 /* Create a new job and schedule it */ 415 if (!(rj = job_new(s, record, AVAHI_SCHEDULED))) 416 return 0; /* OOM */ 417 418 rj->delivery = tv; 419 rj->time_event = avahi_time_event_new(s->time_event_queue, &rj->delivery, elapse_callback, rj); 420 rj->flush_cache = flush_cache; 421 422 if ((rj->querier_valid = !!querier)) 423 rj->querier = *querier; 424 425 return 1; 426 } 427 } 428 429 void avahi_response_scheduler_incoming(AvahiResponseScheduler *s, AvahiRecord *record, int flush_cache) { 430 AvahiResponseJob *rj; 431 assert(s); 432 433 /* This function is called whenever an incoming response was 434 * receieved. We drop scheduled responses which match here. The 435 * keyword is "DUPLICATE ANSWER SUPPRESION". */ 436 437 if ((rj = find_scheduled_job(s, record))) { 438 439 if ((!rj->flush_cache || flush_cache) && /* flush cache bit was set correctly */ 440 avahi_record_is_goodbye(record) == avahi_record_is_goodbye(rj->record) && /* both goodbye packets, or both not */ 441 record->ttl >= rj->record->ttl/2) { /* sensible TTL */ 442 443 /* A matching entry was found, so let's mark it done */ 444 /* avahi_log_debug("Response suppressed by distributed duplicate suppression"); */ 445 job_mark_done(s, rj); 446 } 447 448 return; 449 } 450 451 if ((rj = find_history_job(s, record))) { 452 /* Found a history job, let's update it */ 453 avahi_record_unref(rj->record); 454 rj->record = avahi_record_ref(record); 455 } else 456 /* Found no existing history job, so let's create a new one */ 457 if (!(rj = job_new(s, record, AVAHI_DONE))) 458 return; /* OOM */ 459 460 rj->flush_cache = flush_cache; 461 rj->querier_valid = 0; 462 463 gettimeofday(&rj->delivery, NULL); 464 job_set_elapse_time(s, rj, AVAHI_RESPONSE_HISTORY_MSEC, 0); 465 } 466 467 void avahi_response_scheduler_suppress(AvahiResponseScheduler *s, AvahiRecord *record, const AvahiAddress *querier) { 468 AvahiResponseJob *rj; 469 470 assert(s); 471 assert(record); 472 assert(querier); 473 474 if ((rj = find_scheduled_job(s, record))) { 475 476 if (rj->querier_valid && avahi_address_cmp(querier, &rj->querier) == 0 && /* same originator */ 477 avahi_record_is_goodbye(record) == avahi_record_is_goodbye(rj->record) && /* both goodbye packets, or both not */ 478 record->ttl >= rj->record->ttl/2) { /* sensible TTL */ 479 480 /* A matching entry was found, so let's drop it */ 481 /* avahi_log_debug("Known answer suppression active!"); */ 482 job_free(s, rj); 483 } 484 } 485 486 if ((rj = find_suppressed_job(s, record, querier))) { 487 488 /* Let's update the old entry */ 489 avahi_record_unref(rj->record); 490 rj->record = avahi_record_ref(record); 491 492 } else { 493 494 /* Create a new entry */ 495 if (!(rj = job_new(s, record, AVAHI_SUPPRESSED))) 496 return; /* OOM */ 497 rj->querier_valid = 1; 498 rj->querier = *querier; 499 } 500 501 gettimeofday(&rj->delivery, NULL); 502 job_set_elapse_time(s, rj, AVAHI_RESPONSE_SUPPRESS_MSEC, 0); 503 } 504 505 void avahi_response_scheduler_force(AvahiResponseScheduler *s) { 506 assert(s); 507 508 /* Send all scheduled responses immediately */ 509 while (s->jobs) 510 send_response_packet(s, s->jobs); 511 } 512