1 /*****************************************************************************/ 2 /* "NetPIPE" -- Network Protocol Independent Performance Evaluator. */ 3 /* Copyright 1997, 1998 Iowa State University Research Foundation, Inc. */ 4 /* */ 5 /* This program is free software; you can redistribute it and/or modify */ 6 /* it under the terms of the GNU General Public License as published by */ 7 /* the Free Software Foundation. You should have received a copy of the */ 8 /* GNU General Public License along with this program; if not, write to the */ 9 /* Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ 10 /* */ 11 /* Files needed for use: */ 12 /* * netpipe.c ---- Driver source */ 13 /* * netpipe.h ---- General include file */ 14 /* * TCP.c ---- TCP calls source */ 15 /* * TCP.h ---- Include file for TCP calls and data structs */ 16 /* * MPI.c ---- MPI calls source */ 17 /* * MPI.h ---- Include file for MPI calls and data structs */ 18 /* * PVM.c ---- PVM calls source */ 19 /* * PVM.h ---- Include file for PVM calls and data structs */ 20 /*****************************************************************************/ 21 #include "netpipe.h" 22 23 extern char *optarg; 24 25 int main(int argc, char *argv[]) 26 { 27 FILE *out; /* Output data file */ 28 char s[255]; /* Generic string */ 29 char *memtmp; 30 char *memtmp1; 31 32 int c, /* option index */ 33 i, j, n, nq, /* Loop indices */ 34 asyncReceive = 0, /* Pre-post a receive buffer? */ 35 bufoffset = 0, /* Align buffer to this */ 36 bufalign = 16 * 1024, /* Boundary to align buffer to */ 37 errFlag, /* Error occurred in inner testing loop */ 38 nrepeat, /* Number of time to do the transmission */ 39 len, /* Number of bytes to be transmitted */ 40 inc = 0, /* Increment value */ 41 trans = -1, /* Transmitter flag. 1 if transmitting. */ 42 detailflag = 0, /* Set to examine the signature curve detail */ 43 bufszflag = 0, /* Set to change the TCP socket buffer size */ 44 pert, /* Perturbation value */ 45 start = 1, /* Starting value for signature curve */ 46 end = MAXINT, /* Ending value for signature curve */ 47 streamopt = 0, /* Streaming mode flag */ 48 printopt = 0; /* Debug print statements flag */ 49 50 ArgStruct args; /* Argumentsfor all the calls */ 51 52 double t, t0, t1, t2, /* Time variables */ 53 tlast, /* Time for the last transmission */ 54 latency; /* Network message latency */ 55 56 Data bwdata[NSAMP]; /* Bandwidth curve data */ 57 58 short port = DEFPORT; /* Port number for connection */ 59 #ifdef HAVE_GETRUSAGE 60 struct rusage prev_rusage, curr_rusage; /* Resource usage */ 61 double user_time, sys_time; /* User & system time used */ 62 double best_user_time, best_sys_time; /* Total user & system time used */ 63 double ut1, ut2, st1, st2; /* User & system time ctrs for variance */ 64 double ut_var, st_var; /* Variance in user & system time */ 65 #endif 66 67 #ifdef MPI 68 MPI_Init(&argc, &argv); 69 #endif 70 71 strcpy(s, "NetPIPE.out"); 72 #ifndef MPI 73 if (argc < 2) 74 PrintUsage(); 75 #endif 76 77 /* Parse the arguments. See Usage for description */ 78 while ((c = getopt(argc, argv, "Pstrh:p:o:A:O:l:u:i:b:a")) != -1) { 79 switch (c) { 80 case 'o': 81 strcpy(s, optarg); 82 break; 83 84 case 't': 85 trans = 1; 86 break; 87 88 case 'r': 89 trans = 0; 90 break; 91 92 case 's': 93 streamopt = 1; 94 break; 95 96 case 'l': /*detailflag = 1; */ 97 start = atoi(optarg); 98 if (start < 1) { 99 fprintf(stderr, "Need a starting value >= 1\n"); 100 exit(743); 101 } 102 break; 103 104 case 'u': /*detailflag = 1; */ 105 end = atoi(optarg); 106 break; 107 108 case 'i': 109 detailflag = 1; 110 inc = atoi(optarg); 111 break; 112 113 case 'b': 114 bufszflag = 1; 115 #ifdef TCP 116 args.prot.rcvbufsz = atoi(optarg); 117 args.prot.sndbufsz = args.prot.rcvbufsz; 118 #endif 119 break; 120 121 case 'P': 122 printopt = 1; 123 break; 124 125 case 'A': 126 bufalign = atoi(optarg); 127 break; 128 129 case 'O': 130 bufoffset = atoi(optarg); 131 break; 132 133 case 'p': 134 port = atoi(optarg); 135 break; 136 137 case 'h': 138 if (trans == 1) { 139 args.host = (char *)malloc(strlen(optarg) + 1); 140 strcpy(args.host, optarg); 141 } else { 142 fprintf(stderr, 143 "Error: -t must be specified before -h\n"); 144 exit(-11); 145 } 146 break; 147 148 case 'a': 149 asyncReceive = 1; 150 break; 151 152 default: 153 PrintUsage(); 154 exit(-12); 155 } 156 } 157 if (start > end) { 158 fprintf(stderr, "Start MUST be LESS than end\n"); 159 exit(420132); 160 } 161 #if defined(TCP) || defined(PVM) 162 /* 163 It should be explicitly specified whether this is the transmitter 164 or the receiver. 165 */ 166 if (trans < 0) { 167 fprintf(stderr, "Error: either -t or -r must be specified\n"); 168 exit(-11); 169 } 170 #endif 171 172 args.nbuff = TRIALS; 173 args.tr = trans; 174 args.port = port; 175 176 #if defined(TCP) 177 if (!bufszflag) { 178 args.prot.sndbufsz = 0; 179 args.prot.rcvbufsz = 0; 180 } else 181 fprintf(stderr, "Send and Recv Buffers are %d bytes\n", 182 args.prot.sndbufsz); 183 #endif 184 185 Setup(&args); 186 Establish(&args); 187 188 if (args.tr) { 189 if ((out = fopen(s, "w")) == NULL) { 190 fprintf(stderr, "Can't open %s for output\n", s); 191 exit(1); 192 } 193 } else 194 out = stdout; 195 196 args.bufflen = 1; 197 args.buff = (char *)malloc(args.bufflen); 198 args.buff1 = (char *)malloc(args.bufflen); 199 if (asyncReceive) 200 PrepareToReceive(&args); 201 Sync(&args); 202 t0 = When(); 203 t0 = When(); 204 t0 = When(); 205 #ifdef HAVE_GETRUSAGE 206 getrusage(RUSAGE_SELF, &prev_rusage); 207 #endif 208 t0 = When(); 209 for (i = 0; i < LATENCYREPS; i++) { 210 if (args.tr) { 211 SendData(&args); 212 RecvData(&args); 213 if (asyncReceive && (i < LATENCYREPS - 1)) { 214 PrepareToReceive(&args); 215 } 216 } else { 217 RecvData(&args); 218 if (asyncReceive && (i < LATENCYREPS - 1)) { 219 PrepareToReceive(&args); 220 } 221 SendData(&args); 222 } 223 } 224 latency = (When() - t0) / (2 * LATENCYREPS); 225 #ifdef HAVE_GETRUSAGE 226 getrusage(RUSAGE_SELF, &curr_rusage); 227 #endif 228 free(args.buff); 229 free(args.buff1); 230 231 if (args.tr) { 232 SendTime(&args, &latency); 233 } else { 234 RecvTime(&args, &latency); 235 } 236 if (args.tr && printopt) { 237 fprintf(stderr, "Latency: %.7f\n", latency); 238 fprintf(stderr, "Now starting main loop\n"); 239 } 240 tlast = latency; 241 if (inc == 0) { 242 /* Set a starting value for the message size increment. */ 243 inc = (start > 1) ? start / 2 : 1; 244 } 245 246 /* Main loop of benchmark */ 247 for (nq = n = 0, len = start, errFlag = 0; 248 n < NSAMP - 3 && tlast < STOPTM && len <= end && !errFlag; 249 len = len + inc, nq++) { 250 if (nq > 2 && !detailflag) { 251 /* 252 This has the effect of exponentially increasing the block 253 size. If detailflag is false, then the block size is 254 linearly increased (the increment is not adjusted). 255 */ 256 inc = ((nq % 2)) ? inc + inc : inc; 257 } 258 259 /* This is a perturbation loop to test nearby values */ 260 for (pert = (!detailflag && inc > PERT + 1) ? -PERT : 0; 261 pert <= PERT; 262 n++, pert += (!detailflag 263 && inc > PERT + 1) ? PERT : PERT + 1) { 264 265 /* Calculate how many times to repeat the experiment. */ 266 if (args.tr) { 267 nrepeat = MAX((RUNTM / ((double)args.bufflen / 268 (args.bufflen - inc + 269 1.0) * tlast)), 270 TRIALS); 271 SendRepeat(&args, nrepeat); 272 } else { 273 RecvRepeat(&args, &nrepeat); 274 } 275 276 /* Allocate the buffer */ 277 args.bufflen = len + pert; 278 if ((args.buff = 279 (char *)malloc(args.bufflen + bufalign)) == 280 (char *)NULL) { 281 fprintf(stderr, "Couldn't allocate memory\n"); 282 errFlag = -1; 283 break; 284 } 285 if ((args.buff1 = 286 (char *)malloc(args.bufflen + bufalign)) == 287 (char *)NULL) { 288 fprintf(stderr, "Couldn't allocate memory\n"); 289 errFlag = -1; 290 break; 291 } 292 /* 293 Possibly align the data buffer: make memtmp and memtmp1 294 point to the original blocks (so they can be freed later), 295 then adjust args.buff and args.buff1 if the user requested it. 296 */ 297 memtmp = args.buff; 298 memtmp1 = args.buff1; 299 if (bufalign != 0) 300 args.buff += (bufalign - 301 ((int)(*args.buff) % bufalign) + 302 bufoffset) % bufalign; 303 304 if (bufalign != 0) 305 args.buff1 += (bufalign - 306 ((int)(*args.buff1) % bufalign) + 307 bufoffset) % bufalign; 308 309 if (args.tr && printopt) 310 fprintf(stderr, "%3d: %9d bytes %4d times --> ", 311 n, args.bufflen, nrepeat); 312 313 /* Finally, we get to transmit or receive and time */ 314 if (args.tr) { 315 /* 316 This is the transmitter: send the block TRIALS times, and 317 if we are not streaming, expect the receiver to return each 318 block. 319 */ 320 bwdata[n].t = LONGTIME; 321 t2 = t1 = 0; 322 #ifdef HAVE_GETRUSAGE 323 ut1 = ut2 = st1 = st2 = 0.0; 324 best_user_time = best_sys_time = LONGTIME; 325 #endif 326 for (i = 0; i < TRIALS; i++) { 327 Sync(&args); 328 #ifdef HAVE_GETRUSAGE 329 getrusage(RUSAGE_SELF, &prev_rusage); 330 #endif 331 t0 = When(); 332 for (j = 0; j < nrepeat; j++) { 333 if (asyncReceive && !streamopt) { 334 PrepareToReceive(&args); 335 } 336 SendData(&args); 337 if (!streamopt) { 338 RecvData(&args); 339 } 340 } 341 t = (When() - 342 t0) / ((1 + !streamopt) * nrepeat); 343 #ifdef HAVE_GETRUSAGE 344 getrusage(RUSAGE_SELF, &curr_rusage); 345 user_time = 346 ((curr_rusage.ru_utime.tv_sec - 347 prev_rusage.ru_utime.tv_sec) + 348 (double) 349 (curr_rusage.ru_utime.tv_usec - 350 prev_rusage.ru_utime.tv_usec) * 351 1.0E-6) / ((1 + 352 !streamopt) * nrepeat); 353 sys_time = 354 ((curr_rusage.ru_stime.tv_sec - 355 prev_rusage.ru_stime.tv_sec) + 356 (double) 357 (curr_rusage.ru_stime.tv_usec - 358 prev_rusage.ru_stime.tv_usec) * 359 1.0E-6) / ((1 + 360 !streamopt) * nrepeat); 361 ut2 += user_time * user_time; 362 st2 += sys_time * sys_time; 363 ut1 += user_time; 364 st1 += sys_time; 365 if ((user_time + sys_time) < 366 (best_user_time + best_sys_time)) { 367 best_user_time = user_time; 368 best_sys_time = sys_time; 369 } 370 #endif 371 372 if (!streamopt) { 373 t2 += t * t; 374 t1 += t; 375 bwdata[n].t = 376 MIN(bwdata[n].t, t); 377 } 378 } 379 if (!streamopt) 380 SendTime(&args, &bwdata[n].t); 381 else 382 RecvTime(&args, &bwdata[n].t); 383 384 if (!streamopt) 385 bwdata[n].variance = 386 t2 / TRIALS - 387 t1 / TRIALS * t1 / TRIALS; 388 389 #ifdef HAVE_GETRUSAGE 390 ut_var = 391 ut2 / TRIALS - 392 (ut1 / TRIALS) * (ut1 / TRIALS); 393 st_var = 394 st2 / TRIALS - 395 (st1 / TRIALS) * (st1 / TRIALS); 396 #endif 397 398 } else { 399 /* 400 This is the receiver: receive the block TRIALS times, and 401 if we are not streaming, send the block back to the 402 sender. 403 */ 404 bwdata[n].t = LONGTIME; 405 t2 = t1 = 0; 406 for (i = 0; i < TRIALS; i++) { 407 if (asyncReceive) { 408 PrepareToReceive(&args); 409 } 410 Sync(&args); 411 t0 = When(); 412 for (j = 0; j < nrepeat; j++) { 413 RecvData(&args); 414 if (asyncReceive 415 && (j < nrepeat - 1)) { 416 PrepareToReceive(&args); 417 } 418 if (!streamopt) 419 SendData(&args); 420 } 421 t = (When() - 422 t0) / ((1 + !streamopt) * nrepeat); 423 424 if (streamopt) { 425 t2 += t * t; 426 t1 += t; 427 bwdata[n].t = 428 MIN(bwdata[n].t, t); 429 } 430 } 431 if (streamopt) 432 SendTime(&args, &bwdata[n].t); 433 else 434 RecvTime(&args, &bwdata[n].t); 435 436 if (streamopt) 437 bwdata[n].variance = 438 t2 / TRIALS - 439 t1 / TRIALS * t1 / TRIALS; 440 441 } 442 tlast = bwdata[n].t; 443 bwdata[n].bits = args.bufflen * CHARSIZE; 444 bwdata[n].bps = 445 bwdata[n].bits / (bwdata[n].t * 1024 * 1024); 446 bwdata[n].repeat = nrepeat; 447 448 if (args.tr) { 449 fprintf(out, "%.7f %.7f %d %d %.7f", 450 bwdata[n].t, bwdata[n].bps, 451 bwdata[n].bits, bwdata[n].bits / 8, 452 bwdata[n].variance); 453 #ifdef HAVE_GETRUSAGE 454 fprintf(out, " %.7f %.7f %.7f %.7f", 455 ut1 / (double)TRIALS, 456 st1 / (double)TRIALS, ut_var, st_var); 457 #endif 458 fprintf(out, "\n"); 459 } 460 fflush(out); 461 462 free(memtmp); 463 free(memtmp1); 464 465 if (args.tr && printopt) { 466 fprintf(stderr, " %6.2f Mbps in %.7f sec", 467 bwdata[n].bps, tlast); 468 #ifdef HAVE_GETRUSAGE 469 fprintf(stderr, 470 ", avg utime=%.7f avg stime=%.7f, ", 471 ut1 / (double)TRIALS, 472 st1 / (double)TRIALS); 473 fprintf(stderr, "min utime=%.7f stime=%.7f, ", 474 best_user_time, best_sys_time); 475 fprintf(stderr, "utime var=%.7f stime var=%.7f", 476 ut_var, st_var); 477 #endif 478 fprintf(stderr, "\n"); 479 } 480 } /* End of perturbation loop */ 481 482 } /* End of main loop */ 483 484 if (args.tr) 485 fclose(out); 486 487 CleanUp(&args); 488 return (0); 489 } 490 491 /* Return the current time in seconds, using a double precision number. */ 492 double When() 493 { 494 struct timeval tp; 495 gettimeofday(&tp, NULL); 496 return ((double)tp.tv_sec + (double)tp.tv_usec * 1e-6); 497 } 498 499 void PrintUsage(void) 500 { 501 printf("\n NETPIPE USAGE \n\n"); 502 printf("A: specify buffers alignment e.g.: <-A 1024>\n"); 503 printf("a: asynchronous receive (a.k.a. preposted receive)\n"); 504 #if defined(TCP) 505 printf("b: specify send and receive buffer sizes e.g. <-b 32768>\n"); 506 printf("h: specify hostname <-h host>\n"); 507 #endif 508 printf("i: specify increment step size e.g. <-i 64>\n"); 509 printf("l: lower bound start value e.g. <-i 1>\n"); 510 printf("O: specify buffer offset e.g. <-O 127>\n"); 511 printf("o: specify output filename <-o fn>\n"); 512 printf("P: print on screen\n"); 513 #if defined(TCP) 514 printf("p: specify port e.g. <-p 5150>\n"); 515 #endif 516 printf("r: receiver\n"); 517 printf("s: stream option\n"); 518 printf("t: transmitter\n"); 519 printf("u: upper bound stop value e.g. <-u 1048576>\n"); 520 printf("\n"); 521 exit(-12); 522 } 523