1 /*************************************************************************** 2 * _ _ ____ _ 3 * Project ___| | | | _ \| | 4 * / __| | | | |_) | | 5 * | (__| |_| | _ <| |___ 6 * \___|\___/|_| \_\_____| 7 * 8 * Copyright (C) 1998 - 2016, Daniel Stenberg, <daniel (at) haxx.se>, et al. 9 * 10 * This software is licensed as described in the file COPYING, which 11 * you should have received as part of this distribution. The terms 12 * are also available at https://curl.haxx.se/docs/copyright.html. 13 * 14 * You may opt to use, copy, modify, merge, publish, distribute and/or sell 15 * copies of the Software, and permit persons to whom the Software is 16 * furnished to do so, under the terms of the COPYING file. 17 * 18 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY 19 * KIND, either express or implied. 20 * 21 ***************************************************************************/ 22 /* <DESC> 23 * multi socket API usage together with with glib2 24 * </DESC> 25 */ 26 /* Example application source code using the multi socket interface to 27 * download many files at once. 28 * 29 * Written by Jeff Pohlmeyer 30 31 Requires glib-2.x and a (POSIX?) system that has mkfifo(). 32 33 This is an adaptation of libcurl's "hipev.c" and libevent's "event-test.c" 34 sample programs, adapted to use glib's g_io_channel in place of libevent. 35 36 When running, the program creates the named pipe "hiper.fifo" 37 38 Whenever there is input into the fifo, the program reads the input as a list 39 of URL's and creates some new easy handles to fetch each URL via the 40 curl_multi "hiper" API. 41 42 43 Thus, you can try a single URL: 44 % echo http://www.yahoo.com > hiper.fifo 45 46 Or a whole bunch of them: 47 % cat my-url-list > hiper.fifo 48 49 The fifo buffer is handled almost instantly, so you can even add more URL's 50 while the previous requests are still being downloaded. 51 52 This is purely a demo app, all retrieved data is simply discarded by the write 53 callback. 54 55 */ 56 57 #include <glib.h> 58 #include <sys/stat.h> 59 #include <unistd.h> 60 #include <fcntl.h> 61 #include <stdlib.h> 62 #include <stdio.h> 63 #include <errno.h> 64 #include <curl/curl.h> 65 66 #define MSG_OUT g_print /* Change to "g_error" to write to stderr */ 67 #define SHOW_VERBOSE 0 /* Set to non-zero for libcurl messages */ 68 #define SHOW_PROGRESS 0 /* Set to non-zero to enable progress callback */ 69 70 /* Global information, common to all connections */ 71 typedef struct _GlobalInfo { 72 CURLM *multi; 73 guint timer_event; 74 int still_running; 75 } GlobalInfo; 76 77 /* Information associated with a specific easy handle */ 78 typedef struct _ConnInfo { 79 CURL *easy; 80 char *url; 81 GlobalInfo *global; 82 char error[CURL_ERROR_SIZE]; 83 } ConnInfo; 84 85 /* Information associated with a specific socket */ 86 typedef struct _SockInfo { 87 curl_socket_t sockfd; 88 CURL *easy; 89 int action; 90 long timeout; 91 GIOChannel *ch; 92 guint ev; 93 GlobalInfo *global; 94 } SockInfo; 95 96 /* Die if we get a bad CURLMcode somewhere */ 97 static void mcode_or_die(const char *where, CURLMcode code) 98 { 99 if(CURLM_OK != code) { 100 const char *s; 101 switch (code) { 102 case CURLM_BAD_HANDLE: s="CURLM_BAD_HANDLE"; break; 103 case CURLM_BAD_EASY_HANDLE: s="CURLM_BAD_EASY_HANDLE"; break; 104 case CURLM_OUT_OF_MEMORY: s="CURLM_OUT_OF_MEMORY"; break; 105 case CURLM_INTERNAL_ERROR: s="CURLM_INTERNAL_ERROR"; break; 106 case CURLM_BAD_SOCKET: s="CURLM_BAD_SOCKET"; break; 107 case CURLM_UNKNOWN_OPTION: s="CURLM_UNKNOWN_OPTION"; break; 108 case CURLM_LAST: s="CURLM_LAST"; break; 109 default: s="CURLM_unknown"; 110 } 111 MSG_OUT("ERROR: %s returns %s\n", where, s); 112 exit(code); 113 } 114 } 115 116 /* Check for completed transfers, and remove their easy handles */ 117 static void check_multi_info(GlobalInfo *g) 118 { 119 char *eff_url; 120 CURLMsg *msg; 121 int msgs_left; 122 ConnInfo *conn; 123 CURL *easy; 124 CURLcode res; 125 126 MSG_OUT("REMAINING: %d\n", g->still_running); 127 while((msg = curl_multi_info_read(g->multi, &msgs_left))) { 128 if(msg->msg == CURLMSG_DONE) { 129 easy = msg->easy_handle; 130 res = msg->data.result; 131 curl_easy_getinfo(easy, CURLINFO_PRIVATE, &conn); 132 curl_easy_getinfo(easy, CURLINFO_EFFECTIVE_URL, &eff_url); 133 MSG_OUT("DONE: %s => (%d) %s\n", eff_url, res, conn->error); 134 curl_multi_remove_handle(g->multi, easy); 135 free(conn->url); 136 curl_easy_cleanup(easy); 137 free(conn); 138 } 139 } 140 } 141 142 /* Called by glib when our timeout expires */ 143 static gboolean timer_cb(gpointer data) 144 { 145 GlobalInfo *g = (GlobalInfo *)data; 146 CURLMcode rc; 147 148 rc = curl_multi_socket_action(g->multi, 149 CURL_SOCKET_TIMEOUT, 0, &g->still_running); 150 mcode_or_die("timer_cb: curl_multi_socket_action", rc); 151 check_multi_info(g); 152 return FALSE; 153 } 154 155 /* Update the event timer after curl_multi library calls */ 156 static int update_timeout_cb(CURLM *multi, long timeout_ms, void *userp) 157 { 158 struct timeval timeout; 159 GlobalInfo *g=(GlobalInfo *)userp; 160 timeout.tv_sec = timeout_ms/1000; 161 timeout.tv_usec = (timeout_ms%1000)*1000; 162 163 MSG_OUT("*** update_timeout_cb %ld => %ld:%ld ***\n", 164 timeout_ms, timeout.tv_sec, timeout.tv_usec); 165 166 g->timer_event = g_timeout_add(timeout_ms, timer_cb, g); 167 return 0; 168 } 169 170 /* Called by glib when we get action on a multi socket */ 171 static gboolean event_cb(GIOChannel *ch, GIOCondition condition, gpointer data) 172 { 173 GlobalInfo *g = (GlobalInfo*) data; 174 CURLMcode rc; 175 int fd=g_io_channel_unix_get_fd(ch); 176 177 int action = 178 (condition & G_IO_IN ? CURL_CSELECT_IN : 0) | 179 (condition & G_IO_OUT ? CURL_CSELECT_OUT : 0); 180 181 rc = curl_multi_socket_action(g->multi, fd, action, &g->still_running); 182 mcode_or_die("event_cb: curl_multi_socket_action", rc); 183 184 check_multi_info(g); 185 if(g->still_running) { 186 return TRUE; 187 } 188 else { 189 MSG_OUT("last transfer done, kill timeout\n"); 190 if(g->timer_event) { 191 g_source_remove(g->timer_event); 192 } 193 return FALSE; 194 } 195 } 196 197 /* Clean up the SockInfo structure */ 198 static void remsock(SockInfo *f) 199 { 200 if(!f) { 201 return; 202 } 203 if(f->ev) { 204 g_source_remove(f->ev); 205 } 206 g_free(f); 207 } 208 209 /* Assign information to a SockInfo structure */ 210 static void setsock(SockInfo *f, curl_socket_t s, CURL *e, int act, 211 GlobalInfo *g) 212 { 213 GIOCondition kind = 214 (act&CURL_POLL_IN?G_IO_IN:0)|(act&CURL_POLL_OUT?G_IO_OUT:0); 215 216 f->sockfd = s; 217 f->action = act; 218 f->easy = e; 219 if(f->ev) { 220 g_source_remove(f->ev); 221 } 222 f->ev=g_io_add_watch(f->ch, kind, event_cb, g); 223 } 224 225 /* Initialize a new SockInfo structure */ 226 static void addsock(curl_socket_t s, CURL *easy, int action, GlobalInfo *g) 227 { 228 SockInfo *fdp = g_malloc0(sizeof(SockInfo)); 229 230 fdp->global = g; 231 fdp->ch=g_io_channel_unix_new(s); 232 setsock(fdp, s, easy, action, g); 233 curl_multi_assign(g->multi, s, fdp); 234 } 235 236 /* CURLMOPT_SOCKETFUNCTION */ 237 static int sock_cb(CURL *e, curl_socket_t s, int what, void *cbp, void *sockp) 238 { 239 GlobalInfo *g = (GlobalInfo*) cbp; 240 SockInfo *fdp = (SockInfo*) sockp; 241 static const char *whatstr[]={ "none", "IN", "OUT", "INOUT", "REMOVE" }; 242 243 MSG_OUT("socket callback: s=%d e=%p what=%s ", s, e, whatstr[what]); 244 if(what == CURL_POLL_REMOVE) { 245 MSG_OUT("\n"); 246 remsock(fdp); 247 } 248 else { 249 if(!fdp) { 250 MSG_OUT("Adding data: %s%s\n", 251 what&CURL_POLL_IN?"READ":"", 252 what&CURL_POLL_OUT?"WRITE":""); 253 addsock(s, e, what, g); 254 } 255 else { 256 MSG_OUT( 257 "Changing action from %d to %d\n", fdp->action, what); 258 setsock(fdp, s, e, what, g); 259 } 260 } 261 return 0; 262 } 263 264 /* CURLOPT_WRITEFUNCTION */ 265 static size_t write_cb(void *ptr, size_t size, size_t nmemb, void *data) 266 { 267 size_t realsize = size * nmemb; 268 ConnInfo *conn = (ConnInfo*) data; 269 (void)ptr; 270 (void)conn; 271 return realsize; 272 } 273 274 /* CURLOPT_PROGRESSFUNCTION */ 275 static int prog_cb (void *p, double dltotal, double dlnow, double ult, 276 double uln) 277 { 278 ConnInfo *conn = (ConnInfo *)p; 279 MSG_OUT("Progress: %s (%g/%g)\n", conn->url, dlnow, dltotal); 280 return 0; 281 } 282 283 /* Create a new easy handle, and add it to the global curl_multi */ 284 static void new_conn(char *url, GlobalInfo *g) 285 { 286 ConnInfo *conn; 287 CURLMcode rc; 288 289 conn = g_malloc0(sizeof(ConnInfo)); 290 conn->error[0]='\0'; 291 conn->easy = curl_easy_init(); 292 if(!conn->easy) { 293 MSG_OUT("curl_easy_init() failed, exiting!\n"); 294 exit(2); 295 } 296 conn->global = g; 297 conn->url = g_strdup(url); 298 curl_easy_setopt(conn->easy, CURLOPT_URL, conn->url); 299 curl_easy_setopt(conn->easy, CURLOPT_WRITEFUNCTION, write_cb); 300 curl_easy_setopt(conn->easy, CURLOPT_WRITEDATA, &conn); 301 curl_easy_setopt(conn->easy, CURLOPT_VERBOSE, (long)SHOW_VERBOSE); 302 curl_easy_setopt(conn->easy, CURLOPT_ERRORBUFFER, conn->error); 303 curl_easy_setopt(conn->easy, CURLOPT_PRIVATE, conn); 304 curl_easy_setopt(conn->easy, CURLOPT_NOPROGRESS, SHOW_PROGRESS?0L:1L); 305 curl_easy_setopt(conn->easy, CURLOPT_PROGRESSFUNCTION, prog_cb); 306 curl_easy_setopt(conn->easy, CURLOPT_PROGRESSDATA, conn); 307 curl_easy_setopt(conn->easy, CURLOPT_FOLLOWLOCATION, 1L); 308 curl_easy_setopt(conn->easy, CURLOPT_CONNECTTIMEOUT, 30L); 309 curl_easy_setopt(conn->easy, CURLOPT_LOW_SPEED_LIMIT, 1L); 310 curl_easy_setopt(conn->easy, CURLOPT_LOW_SPEED_TIME, 30L); 311 312 MSG_OUT("Adding easy %p to multi %p (%s)\n", conn->easy, g->multi, url); 313 rc =curl_multi_add_handle(g->multi, conn->easy); 314 mcode_or_die("new_conn: curl_multi_add_handle", rc); 315 316 /* note that the add_handle() will set a time-out to trigger very soon so 317 that the necessary socket_action() call will be called by this app */ 318 } 319 320 /* This gets called by glib whenever data is received from the fifo */ 321 static gboolean fifo_cb (GIOChannel *ch, GIOCondition condition, gpointer data) 322 { 323 #define BUF_SIZE 1024 324 gsize len, tp; 325 gchar *buf, *tmp, *all=NULL; 326 GIOStatus rv; 327 328 do { 329 GError *err=NULL; 330 rv = g_io_channel_read_line(ch, &buf, &len, &tp, &err); 331 if(buf) { 332 if(tp) { 333 buf[tp]='\0'; 334 } 335 new_conn(buf, (GlobalInfo*)data); 336 g_free(buf); 337 } 338 else { 339 buf = g_malloc(BUF_SIZE+1); 340 while(TRUE) { 341 buf[BUF_SIZE]='\0'; 342 g_io_channel_read_chars(ch, buf, BUF_SIZE, &len, &err); 343 if(len) { 344 buf[len]='\0'; 345 if(all) { 346 tmp=all; 347 all=g_strdup_printf("%s%s", tmp, buf); 348 g_free(tmp); 349 } 350 else { 351 all = g_strdup(buf); 352 } 353 } 354 else { 355 break; 356 } 357 } 358 if(all) { 359 new_conn(all, (GlobalInfo*)data); 360 g_free(all); 361 } 362 g_free(buf); 363 } 364 if(err) { 365 g_error("fifo_cb: %s", err->message); 366 g_free(err); 367 break; 368 } 369 } while((len) && (rv == G_IO_STATUS_NORMAL)); 370 return TRUE; 371 } 372 373 int init_fifo(void) 374 { 375 struct stat st; 376 const char *fifo = "hiper.fifo"; 377 int socket; 378 379 if(lstat (fifo, &st) == 0) { 380 if((st.st_mode & S_IFMT) == S_IFREG) { 381 errno = EEXIST; 382 perror("lstat"); 383 exit (1); 384 } 385 } 386 387 unlink (fifo); 388 if(mkfifo (fifo, 0600) == -1) { 389 perror("mkfifo"); 390 exit (1); 391 } 392 393 socket = open (fifo, O_RDWR | O_NONBLOCK, 0); 394 395 if(socket == -1) { 396 perror("open"); 397 exit (1); 398 } 399 MSG_OUT("Now, pipe some URL's into > %s\n", fifo); 400 401 return socket; 402 } 403 404 int main(int argc, char **argv) 405 { 406 GlobalInfo *g; 407 CURLMcode rc; 408 GMainLoop*gmain; 409 int fd; 410 GIOChannel* ch; 411 g=g_malloc0(sizeof(GlobalInfo)); 412 413 fd=init_fifo(); 414 ch=g_io_channel_unix_new(fd); 415 g_io_add_watch(ch, G_IO_IN, fifo_cb, g); 416 gmain=g_main_loop_new(NULL, FALSE); 417 g->multi = curl_multi_init(); 418 curl_multi_setopt(g->multi, CURLMOPT_SOCKETFUNCTION, sock_cb); 419 curl_multi_setopt(g->multi, CURLMOPT_SOCKETDATA, g); 420 curl_multi_setopt(g->multi, CURLMOPT_TIMERFUNCTION, update_timeout_cb); 421 curl_multi_setopt(g->multi, CURLMOPT_TIMERDATA, g); 422 423 /* we don't call any curl_multi_socket*() function yet as we have no handles 424 added! */ 425 426 g_main_loop_run(gmain); 427 curl_multi_cleanup(g->multi); 428 return 0; 429 } 430