1 /*************************************************************************** 2 * _ _ ____ _ 3 * Project ___| | | | _ \| | 4 * / __| | | | |_) | | 5 * | (__| |_| | _ <| |___ 6 * \___|\___/|_| \_\_____| 7 * 8 * Copyright (C) 1998 - 2017, Daniel Stenberg, <daniel (at) haxx.se>, et al. 9 * 10 * This software is licensed as described in the file COPYING, which 11 * you should have received as part of this distribution. The terms 12 * are also available at https://curl.haxx.se/docs/copyright.html. 13 * 14 * You may opt to use, copy, modify, merge, publish, distribute and/or sell 15 * copies of the Software, and permit persons to whom the Software is 16 * furnished to do so, under the terms of the COPYING file. 17 * 18 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY 19 * KIND, either express or implied. 20 * 21 ***************************************************************************/ 22 /* <DESC> 23 * multi socket API usage together with with glib2 24 * </DESC> 25 */ 26 /* Example application source code using the multi socket interface to 27 * download many files at once. 28 * 29 * Written by Jeff Pohlmeyer 30 31 Requires glib-2.x and a (POSIX?) system that has mkfifo(). 32 33 This is an adaptation of libcurl's "hipev.c" and libevent's "event-test.c" 34 sample programs, adapted to use glib's g_io_channel in place of libevent. 35 36 When running, the program creates the named pipe "hiper.fifo" 37 38 Whenever there is input into the fifo, the program reads the input as a list 39 of URL's and creates some new easy handles to fetch each URL via the 40 curl_multi "hiper" API. 41 42 43 Thus, you can try a single URL: 44 % echo http://www.yahoo.com > hiper.fifo 45 46 Or a whole bunch of them: 47 % cat my-url-list > hiper.fifo 48 49 The fifo buffer is handled almost instantly, so you can even add more URL's 50 while the previous requests are still being downloaded. 51 52 This is purely a demo app, all retrieved data is simply discarded by the write 53 callback. 54 55 */ 56 57 #include <glib.h> 58 #include <sys/stat.h> 59 #include <unistd.h> 60 #include <fcntl.h> 61 #include <stdlib.h> 62 #include <stdio.h> 63 #include <errno.h> 64 #include <curl/curl.h> 65 66 #define MSG_OUT g_print /* Change to "g_error" to write to stderr */ 67 #define SHOW_VERBOSE 0 /* Set to non-zero for libcurl messages */ 68 #define SHOW_PROGRESS 0 /* Set to non-zero to enable progress callback */ 69 70 /* Global information, common to all connections */ 71 typedef struct _GlobalInfo { 72 CURLM *multi; 73 guint timer_event; 74 int still_running; 75 } GlobalInfo; 76 77 /* Information associated with a specific easy handle */ 78 typedef struct _ConnInfo { 79 CURL *easy; 80 char *url; 81 GlobalInfo *global; 82 char error[CURL_ERROR_SIZE]; 83 } ConnInfo; 84 85 /* Information associated with a specific socket */ 86 typedef struct _SockInfo { 87 curl_socket_t sockfd; 88 CURL *easy; 89 int action; 90 long timeout; 91 GIOChannel *ch; 92 guint ev; 93 GlobalInfo *global; 94 } SockInfo; 95 96 /* Die if we get a bad CURLMcode somewhere */ 97 static void mcode_or_die(const char *where, CURLMcode code) 98 { 99 if(CURLM_OK != code) { 100 const char *s; 101 switch(code) { 102 case CURLM_BAD_HANDLE: s = "CURLM_BAD_HANDLE"; break; 103 case CURLM_BAD_EASY_HANDLE: s = "CURLM_BAD_EASY_HANDLE"; break; 104 case CURLM_OUT_OF_MEMORY: s = "CURLM_OUT_OF_MEMORY"; break; 105 case CURLM_INTERNAL_ERROR: s = "CURLM_INTERNAL_ERROR"; break; 106 case CURLM_BAD_SOCKET: s = "CURLM_BAD_SOCKET"; break; 107 case CURLM_UNKNOWN_OPTION: s = "CURLM_UNKNOWN_OPTION"; break; 108 case CURLM_LAST: s = "CURLM_LAST"; break; 109 default: s = "CURLM_unknown"; 110 } 111 MSG_OUT("ERROR: %s returns %s\n", where, s); 112 exit(code); 113 } 114 } 115 116 /* Check for completed transfers, and remove their easy handles */ 117 static void check_multi_info(GlobalInfo *g) 118 { 119 char *eff_url; 120 CURLMsg *msg; 121 int msgs_left; 122 ConnInfo *conn; 123 CURL *easy; 124 CURLcode res; 125 126 MSG_OUT("REMAINING: %d\n", g->still_running); 127 while((msg = curl_multi_info_read(g->multi, &msgs_left))) { 128 if(msg->msg == CURLMSG_DONE) { 129 easy = msg->easy_handle; 130 res = msg->data.result; 131 curl_easy_getinfo(easy, CURLINFO_PRIVATE, &conn); 132 curl_easy_getinfo(easy, CURLINFO_EFFECTIVE_URL, &eff_url); 133 MSG_OUT("DONE: %s => (%d) %s\n", eff_url, res, conn->error); 134 curl_multi_remove_handle(g->multi, easy); 135 free(conn->url); 136 curl_easy_cleanup(easy); 137 free(conn); 138 } 139 } 140 } 141 142 /* Called by glib when our timeout expires */ 143 static gboolean timer_cb(gpointer data) 144 { 145 GlobalInfo *g = (GlobalInfo *)data; 146 CURLMcode rc; 147 148 rc = curl_multi_socket_action(g->multi, 149 CURL_SOCKET_TIMEOUT, 0, &g->still_running); 150 mcode_or_die("timer_cb: curl_multi_socket_action", rc); 151 check_multi_info(g); 152 return FALSE; 153 } 154 155 /* Update the event timer after curl_multi library calls */ 156 static int update_timeout_cb(CURLM *multi, long timeout_ms, void *userp) 157 { 158 struct timeval timeout; 159 GlobalInfo *g = (GlobalInfo *)userp; 160 timeout.tv_sec = timeout_ms/1000; 161 timeout.tv_usec = (timeout_ms%1000)*1000; 162 163 MSG_OUT("*** update_timeout_cb %ld => %ld:%ld ***\n", 164 timeout_ms, timeout.tv_sec, timeout.tv_usec); 165 166 /* TODO 167 * 168 * if timeout_ms is 0, call curl_multi_socket_action() at once! 169 * 170 * if timeout_ms is -1, just delete the timer 171 * 172 * for all other values of timeout_ms, this should set or *update* 173 * the timer to the new value 174 */ 175 g->timer_event = g_timeout_add(timeout_ms, timer_cb, g); 176 return 0; 177 } 178 179 /* Called by glib when we get action on a multi socket */ 180 static gboolean event_cb(GIOChannel *ch, GIOCondition condition, gpointer data) 181 { 182 GlobalInfo *g = (GlobalInfo*) data; 183 CURLMcode rc; 184 int fd = g_io_channel_unix_get_fd(ch); 185 186 int action = 187 (condition & G_IO_IN ? CURL_CSELECT_IN : 0) | 188 (condition & G_IO_OUT ? CURL_CSELECT_OUT : 0); 189 190 rc = curl_multi_socket_action(g->multi, fd, action, &g->still_running); 191 mcode_or_die("event_cb: curl_multi_socket_action", rc); 192 193 check_multi_info(g); 194 if(g->still_running) { 195 return TRUE; 196 } 197 else { 198 MSG_OUT("last transfer done, kill timeout\n"); 199 if(g->timer_event) { 200 g_source_remove(g->timer_event); 201 } 202 return FALSE; 203 } 204 } 205 206 /* Clean up the SockInfo structure */ 207 static void remsock(SockInfo *f) 208 { 209 if(!f) { 210 return; 211 } 212 if(f->ev) { 213 g_source_remove(f->ev); 214 } 215 g_free(f); 216 } 217 218 /* Assign information to a SockInfo structure */ 219 static void setsock(SockInfo *f, curl_socket_t s, CURL *e, int act, 220 GlobalInfo *g) 221 { 222 GIOCondition kind = 223 (act&CURL_POLL_IN?G_IO_IN:0)|(act&CURL_POLL_OUT?G_IO_OUT:0); 224 225 f->sockfd = s; 226 f->action = act; 227 f->easy = e; 228 if(f->ev) { 229 g_source_remove(f->ev); 230 } 231 f->ev = g_io_add_watch(f->ch, kind, event_cb, g); 232 } 233 234 /* Initialize a new SockInfo structure */ 235 static void addsock(curl_socket_t s, CURL *easy, int action, GlobalInfo *g) 236 { 237 SockInfo *fdp = g_malloc0(sizeof(SockInfo)); 238 239 fdp->global = g; 240 fdp->ch = g_io_channel_unix_new(s); 241 setsock(fdp, s, easy, action, g); 242 curl_multi_assign(g->multi, s, fdp); 243 } 244 245 /* CURLMOPT_SOCKETFUNCTION */ 246 static int sock_cb(CURL *e, curl_socket_t s, int what, void *cbp, void *sockp) 247 { 248 GlobalInfo *g = (GlobalInfo*) cbp; 249 SockInfo *fdp = (SockInfo*) sockp; 250 static const char *whatstr[]={ "none", "IN", "OUT", "INOUT", "REMOVE" }; 251 252 MSG_OUT("socket callback: s=%d e=%p what=%s ", s, e, whatstr[what]); 253 if(what == CURL_POLL_REMOVE) { 254 MSG_OUT("\n"); 255 remsock(fdp); 256 } 257 else { 258 if(!fdp) { 259 MSG_OUT("Adding data: %s%s\n", 260 what&CURL_POLL_IN?"READ":"", 261 what&CURL_POLL_OUT?"WRITE":""); 262 addsock(s, e, what, g); 263 } 264 else { 265 MSG_OUT( 266 "Changing action from %d to %d\n", fdp->action, what); 267 setsock(fdp, s, e, what, g); 268 } 269 } 270 return 0; 271 } 272 273 /* CURLOPT_WRITEFUNCTION */ 274 static size_t write_cb(void *ptr, size_t size, size_t nmemb, void *data) 275 { 276 size_t realsize = size * nmemb; 277 ConnInfo *conn = (ConnInfo*) data; 278 (void)ptr; 279 (void)conn; 280 return realsize; 281 } 282 283 /* CURLOPT_PROGRESSFUNCTION */ 284 static int prog_cb(void *p, double dltotal, double dlnow, double ult, 285 double uln) 286 { 287 ConnInfo *conn = (ConnInfo *)p; 288 MSG_OUT("Progress: %s (%g/%g)\n", conn->url, dlnow, dltotal); 289 return 0; 290 } 291 292 /* Create a new easy handle, and add it to the global curl_multi */ 293 static void new_conn(char *url, GlobalInfo *g) 294 { 295 ConnInfo *conn; 296 CURLMcode rc; 297 298 conn = g_malloc0(sizeof(ConnInfo)); 299 conn->error[0]='\0'; 300 conn->easy = curl_easy_init(); 301 if(!conn->easy) { 302 MSG_OUT("curl_easy_init() failed, exiting!\n"); 303 exit(2); 304 } 305 conn->global = g; 306 conn->url = g_strdup(url); 307 curl_easy_setopt(conn->easy, CURLOPT_URL, conn->url); 308 curl_easy_setopt(conn->easy, CURLOPT_WRITEFUNCTION, write_cb); 309 curl_easy_setopt(conn->easy, CURLOPT_WRITEDATA, &conn); 310 curl_easy_setopt(conn->easy, CURLOPT_VERBOSE, (long)SHOW_VERBOSE); 311 curl_easy_setopt(conn->easy, CURLOPT_ERRORBUFFER, conn->error); 312 curl_easy_setopt(conn->easy, CURLOPT_PRIVATE, conn); 313 curl_easy_setopt(conn->easy, CURLOPT_NOPROGRESS, SHOW_PROGRESS?0L:1L); 314 curl_easy_setopt(conn->easy, CURLOPT_PROGRESSFUNCTION, prog_cb); 315 curl_easy_setopt(conn->easy, CURLOPT_PROGRESSDATA, conn); 316 curl_easy_setopt(conn->easy, CURLOPT_FOLLOWLOCATION, 1L); 317 curl_easy_setopt(conn->easy, CURLOPT_CONNECTTIMEOUT, 30L); 318 curl_easy_setopt(conn->easy, CURLOPT_LOW_SPEED_LIMIT, 1L); 319 curl_easy_setopt(conn->easy, CURLOPT_LOW_SPEED_TIME, 30L); 320 321 MSG_OUT("Adding easy %p to multi %p (%s)\n", conn->easy, g->multi, url); 322 rc = curl_multi_add_handle(g->multi, conn->easy); 323 mcode_or_die("new_conn: curl_multi_add_handle", rc); 324 325 /* note that the add_handle() will set a time-out to trigger very soon so 326 that the necessary socket_action() call will be called by this app */ 327 } 328 329 /* This gets called by glib whenever data is received from the fifo */ 330 static gboolean fifo_cb(GIOChannel *ch, GIOCondition condition, gpointer data) 331 { 332 #define BUF_SIZE 1024 333 gsize len, tp; 334 gchar *buf, *tmp, *all = NULL; 335 GIOStatus rv; 336 337 do { 338 GError *err = NULL; 339 rv = g_io_channel_read_line(ch, &buf, &len, &tp, &err); 340 if(buf) { 341 if(tp) { 342 buf[tp]='\0'; 343 } 344 new_conn(buf, (GlobalInfo*)data); 345 g_free(buf); 346 } 347 else { 348 buf = g_malloc(BUF_SIZE + 1); 349 while(TRUE) { 350 buf[BUF_SIZE]='\0'; 351 g_io_channel_read_chars(ch, buf, BUF_SIZE, &len, &err); 352 if(len) { 353 buf[len]='\0'; 354 if(all) { 355 tmp = all; 356 all = g_strdup_printf("%s%s", tmp, buf); 357 g_free(tmp); 358 } 359 else { 360 all = g_strdup(buf); 361 } 362 } 363 else { 364 break; 365 } 366 } 367 if(all) { 368 new_conn(all, (GlobalInfo*)data); 369 g_free(all); 370 } 371 g_free(buf); 372 } 373 if(err) { 374 g_error("fifo_cb: %s", err->message); 375 g_free(err); 376 break; 377 } 378 } while((len) && (rv == G_IO_STATUS_NORMAL)); 379 return TRUE; 380 } 381 382 int init_fifo(void) 383 { 384 struct stat st; 385 const char *fifo = "hiper.fifo"; 386 int socket; 387 388 if(lstat (fifo, &st) == 0) { 389 if((st.st_mode & S_IFMT) == S_IFREG) { 390 errno = EEXIST; 391 perror("lstat"); 392 exit(1); 393 } 394 } 395 396 unlink(fifo); 397 if(mkfifo (fifo, 0600) == -1) { 398 perror("mkfifo"); 399 exit(1); 400 } 401 402 socket = open(fifo, O_RDWR | O_NONBLOCK, 0); 403 404 if(socket == -1) { 405 perror("open"); 406 exit(1); 407 } 408 MSG_OUT("Now, pipe some URL's into > %s\n", fifo); 409 410 return socket; 411 } 412 413 int main(int argc, char **argv) 414 { 415 GlobalInfo *g; 416 CURLMcode rc; 417 GMainLoop*gmain; 418 int fd; 419 GIOChannel* ch; 420 g = g_malloc0(sizeof(GlobalInfo)); 421 422 fd = init_fifo(); 423 ch = g_io_channel_unix_new(fd); 424 g_io_add_watch(ch, G_IO_IN, fifo_cb, g); 425 gmain = g_main_loop_new(NULL, FALSE); 426 g->multi = curl_multi_init(); 427 curl_multi_setopt(g->multi, CURLMOPT_SOCKETFUNCTION, sock_cb); 428 curl_multi_setopt(g->multi, CURLMOPT_SOCKETDATA, g); 429 curl_multi_setopt(g->multi, CURLMOPT_TIMERFUNCTION, update_timeout_cb); 430 curl_multi_setopt(g->multi, CURLMOPT_TIMERDATA, g); 431 432 /* we don't call any curl_multi_socket*() function yet as we have no handles 433 added! */ 434 435 g_main_loop_run(gmain); 436 curl_multi_cleanup(g->multi); 437 return 0; 438 } 439