1 /*****************************************************************************/ 2 /* "NetPIPE" -- Network Protocol Independent Performance Evaluator. */ 3 /* Copyright 1997, 1998 Iowa State University Research Foundation, Inc. */ 4 /* */ 5 /* This program is free software; you can redistribute it and/or modify */ 6 /* it under the terms of the GNU General Public License as published by */ 7 /* the Free Software Foundation. You should have received a copy of the */ 8 /* GNU General Public License along with this program; if not, write to the */ 9 /* Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ 10 /* */ 11 /* Files needed for use: */ 12 /* * netpipe.c ---- Driver source */ 13 /* * netpipe.h ---- General include file */ 14 /* * TCP.c ---- TCP calls source */ 15 /* * TCP.h ---- Include file for TCP calls and data structs */ 16 /* * MPI.c ---- MPI calls source */ 17 /* * MPI.h ---- Include file for MPI calls and data structs */ 18 /* * PVM.c ---- PVM calls source */ 19 /* * PVM.h ---- Include file for PVM calls and data structs */ 20 /* 2002/03/18 --- Modified to specify server interfaces - Robbie Williamson */ 21 /* (robbiew (at) us.ibm.com) */ 22 /*****************************************************************************/ 23 #include "netpipe.h" 24 25 extern char *optarg; 26 27 /* Return the current time in seconds, using a double precision number. */ 28 double When() 29 { 30 struct timeval tp; 31 gettimeofday(&tp, NULL); 32 return ((double)tp.tv_sec + (double)tp.tv_usec * 1e-6); 33 } 34 35 int PrintUsage() 36 { 37 printf("\n NETPIPE USAGE \n\n"); 38 printf("A: specify buffers alignment e.g.: <-A 1024>\n"); 39 printf("a: asynchronous receive (a.k.a. preposted receive)\n"); 40 #if defined(TCP) 41 printf("b: specify send and receive buffer sizes e.g. <-b 32768>\n"); 42 printf("h: specify hostname <-h host>\n"); 43 printf("H: specify server hostname <-H HOST> e.g. Multiple NICs\n"); 44 #endif 45 printf("i: specify increment step size e.g. <-i 64>\n"); 46 printf("l: lower bound start value e.g. <-i 1>\n"); 47 printf("O: specify buffer offset e.g. <-O 127>\n"); 48 printf("o: specify output filename <-o fn>\n"); 49 printf("P: print on screen\n"); 50 #if defined(TCP) 51 printf("p: specify port e.g. <-p 5150>\n"); 52 #endif 53 printf("r: receiver\n"); 54 printf("s: stream option\n"); 55 printf("t: transmitter\n"); 56 printf("u: upper bound stop value e.g. <-u 1048576>\n"); 57 printf("\n"); 58 exit(-12); 59 } 60 61 int main(int argc, char *argv[]) 62 { 63 FILE *out; /* Output data file */ 64 char s[255]; /* Generic string */ 65 char *memtmp; 66 char *memtmp1; 67 68 int c, /* option index */ 69 i, j, n, nq, /* Loop indices */ 70 asyncReceive = 0, /* Pre-post a receive buffer? */ 71 bufoffset = 0, /* Align buffer to this */ 72 bufalign = 16 * 1024, /* Boundary to align buffer to */ 73 errFlag, /* Error occurred in inner testing loop */ 74 nrepeat, /* Number of time to do the transmission */ 75 len, /* Number of bytes to be transmitted */ 76 inc = 0, /* Increment value */ 77 trans = -1, /* Transmitter flag. 1 if transmitting. */ 78 server = 0, /* Server flag. 1 if specifying server. */ 79 detailflag = 0, /* Set to examine the signature curve detail */ 80 bufszflag = 0, /* Set to change the TCP socket buffer size */ 81 pert, /* Perturbation value */ 82 start = 1, /* Starting value for signature curve */ 83 end = MAXINT, /* Ending value for signature curve */ 84 streamopt = 0, /* Streaming mode flag */ 85 printopt = 0; /* Debug print statements flag */ 86 87 ArgStruct args; /* Argumentsfor all the calls */ 88 89 double t, t0, t1, t2, /* Time variables */ 90 tlast, /* Time for the last transmission */ 91 latency; /* Network message latency */ 92 93 Data bwdata[NSAMP]; /* Bandwidth curve data */ 94 95 short port = DEFPORT; /* Port number for connection */ 96 #ifdef HAVE_GETRUSAGE 97 struct rusage prev_rusage, curr_rusage; /* Resource usage */ 98 double user_time, sys_time; /* User & system time used */ 99 double best_user_time, best_sys_time; /* Total user & system time used */ 100 double ut1, ut2, st1, st2; /* User & system time ctrs for variance */ 101 double ut_var, st_var; /* Variance in user & system time */ 102 #endif 103 104 #ifdef MPI 105 MPI_Init(&argc, &argv); 106 #endif 107 108 strcpy(s, "NetPIPE.out"); 109 #ifndef MPI 110 if (argc < 2) 111 PrintUsage(); 112 #endif 113 /* Parse the arguments. See Usage for description */ 114 while ((c = getopt(argc, argv, "PstrhH:p:o:A:O:l:u:i:b:a")) != -1) { 115 switch (c) { 116 case 'o': 117 strcpy(s, optarg); 118 break; 119 120 case 't': 121 trans = 1; 122 break; 123 124 case 'r': 125 trans = 0; 126 break; 127 128 case 's': 129 streamopt = 1; 130 break; 131 132 case 'l': /*detailflag = 1; */ 133 start = atoi(optarg); 134 if (start < 1) { 135 fprintf(stderr, "Need a starting value >= 1\n"); 136 exit(743); 137 } 138 break; 139 140 case 'u': /*detailflag = 1; */ 141 end = atoi(optarg); 142 break; 143 144 case 'i': 145 detailflag = 1; 146 inc = atoi(optarg); 147 break; 148 149 case 'b': 150 bufszflag = 1; 151 #ifdef TCP 152 args.prot.rcvbufsz = atoi(optarg); 153 args.prot.sndbufsz = args.prot.rcvbufsz; 154 #endif 155 break; 156 157 case 'P': 158 printopt = 1; 159 break; 160 161 case 'A': 162 bufalign = atoi(optarg); 163 break; 164 165 case 'O': 166 bufoffset = atoi(optarg); 167 break; 168 169 case 'p': 170 port = atoi(optarg); 171 break; 172 173 case 'h': 174 if (trans == 1) { 175 args.host = (char *)malloc(strlen(optarg) + 1); 176 strcpy(args.host, optarg); 177 printf("host is %s\n", args.host); 178 } else { 179 fprintf(stderr, 180 "Error: -t must be specified before -h\n"); 181 exit(-11); 182 } 183 break; 184 185 case 'H': 186 if (trans == 0) { 187 args.server_host = 188 (char *)malloc(strlen(optarg) + 1); 189 strcpy(args.server_host, optarg); 190 printf("server is %s\n", args.server_host); 191 server = 1; 192 } else { 193 fprintf(stderr, 194 "Error: -r must be specified before -H\n"); 195 exit(-11); 196 } 197 break; 198 199 case 'a': 200 asyncReceive = 1; 201 break; 202 203 default: 204 PrintUsage(); 205 exit(-12); 206 } 207 } 208 if (start > end) { 209 fprintf(stderr, "Start MUST be LESS than end\n"); 210 exit(420132); 211 } 212 #if defined(TCP) || defined(PVM) 213 /* 214 It should be explicitly specified whether this is the transmitter 215 or the receiver. 216 */ 217 if (trans < 0) { 218 fprintf(stderr, "Error: either -t or -r must be specified\n"); 219 exit(-11); 220 } 221 #endif 222 223 args.nbuff = TRIALS; 224 args.tr = trans; 225 args.sr = server; 226 args.port = port; 227 228 #if defined(TCP) 229 if (!bufszflag) { 230 args.prot.sndbufsz = 0; 231 args.prot.rcvbufsz = 0; 232 } else 233 fprintf(stderr, "Send and Recv Buffers are %d bytes\n", 234 args.prot.sndbufsz); 235 #endif 236 237 Setup(&args); 238 Establish(&args); 239 240 if (args.tr) { 241 if ((out = fopen(s, "w")) == NULL) { 242 fprintf(stderr, "Can't open %s for output\n", s); 243 exit(1); 244 } 245 } else 246 out = stdout; 247 248 args.bufflen = 1; 249 args.buff = (char *)malloc(args.bufflen); 250 args.buff1 = (char *)malloc(args.bufflen); 251 if (asyncReceive) 252 PrepareToReceive(&args); 253 Sync(&args); 254 t0 = When(); 255 t0 = When(); 256 t0 = When(); 257 #ifdef HAVE_GETRUSAGE 258 getrusage(RUSAGE_SELF, &prev_rusage); 259 #endif 260 t0 = When(); 261 for (i = 0; i < LATENCYREPS; i++) { 262 if (args.tr) { 263 SendData(&args); 264 RecvData(&args); 265 if (asyncReceive && (i < LATENCYREPS - 1)) { 266 PrepareToReceive(&args); 267 } 268 } else { 269 RecvData(&args); 270 if (asyncReceive && (i < LATENCYREPS - 1)) { 271 PrepareToReceive(&args); 272 } 273 SendData(&args); 274 } 275 } 276 latency = (When() - t0) / (2 * LATENCYREPS); 277 #ifdef HAVE_GETRUSAGE 278 getrusage(RUSAGE_SELF, &curr_rusage); 279 #endif 280 free(args.buff); 281 free(args.buff1); 282 283 if (args.tr) { 284 SendTime(&args, &latency); 285 } else { 286 RecvTime(&args, &latency); 287 } 288 if (args.tr && printopt) { 289 fprintf(stderr, "Latency: %.7f\n", latency); 290 fprintf(stderr, "Now starting main loop\n"); 291 } 292 tlast = latency; 293 if (inc == 0) { 294 /* Set a starting value for the message size increment. */ 295 inc = (start > 1) ? start / 2 : 1; 296 } 297 298 /* Main loop of benchmark */ 299 for (nq = n = 0, len = start, errFlag = 0; 300 n < NSAMP - 3 && tlast < STOPTM && len <= end && !errFlag; 301 len = len + inc, nq++) { 302 if (nq > 2 && !detailflag) { 303 /* 304 This has the effect of exponentially increasing the block 305 size. If detailflag is false, then the block size is 306 linearly increased (the increment is not adjusted). 307 */ 308 inc = ((nq % 2)) ? inc + inc : inc; 309 } 310 311 /* This is a perturbation loop to test nearby values */ 312 for (pert = (!detailflag && inc > PERT + 1) ? -PERT : 0; 313 pert <= PERT; 314 n++, pert += (!detailflag 315 && inc > PERT + 1) ? PERT : PERT + 1) { 316 317 /* Calculate how many times to repeat the experiment. */ 318 if (args.tr) { 319 nrepeat = MAX((RUNTM / ((double)args.bufflen / 320 (args.bufflen - inc + 321 1.0) * tlast)), 322 TRIALS); 323 SendRepeat(&args, nrepeat); 324 } else { 325 RecvRepeat(&args, &nrepeat); 326 } 327 328 /* Allocate the buffer */ 329 args.bufflen = len + pert; 330 if ((args.buff = 331 (char *)malloc(args.bufflen + bufalign)) == 332 (char *)NULL) { 333 fprintf(stderr, "Couldn't allocate memory\n"); 334 errFlag = -1; 335 break; 336 } 337 if ((args.buff1 = 338 (char *)malloc(args.bufflen + bufalign)) == 339 (char *)NULL) { 340 fprintf(stderr, "Couldn't allocate memory\n"); 341 errFlag = -1; 342 break; 343 } 344 /* 345 Possibly align the data buffer: make memtmp and memtmp1 346 point to the original blocks (so they can be freed later), 347 then adjust args.buff and args.buff1 if the user requested it. 348 */ 349 memtmp = args.buff; 350 memtmp1 = args.buff1; 351 if (bufalign != 0) 352 args.buff += (bufalign - 353 ((intptr_t) args.buff % 354 bufalign) + 355 bufoffset) % bufalign; 356 357 if (bufalign != 0) 358 args.buff1 += (bufalign - 359 ((intptr_t) args.buff1 % 360 bufalign) + 361 bufoffset) % bufalign; 362 363 if (args.tr && printopt) 364 fprintf(stderr, "%3d: %9d bytes %4d times --> ", 365 n, args.bufflen, nrepeat); 366 367 /* Finally, we get to transmit or receive and time */ 368 if (args.tr) { 369 /* 370 This is the transmitter: send the block TRIALS times, and 371 if we are not streaming, expect the receiver to return each 372 block. 373 */ 374 bwdata[n].t = LONGTIME; 375 t2 = t1 = 0; 376 #ifdef HAVE_GETRUSAGE 377 ut1 = ut2 = st1 = st2 = 0.0; 378 best_user_time = best_sys_time = LONGTIME; 379 #endif 380 for (i = 0; i < TRIALS; i++) { 381 Sync(&args); 382 #ifdef HAVE_GETRUSAGE 383 getrusage(RUSAGE_SELF, &prev_rusage); 384 #endif 385 t0 = When(); 386 for (j = 0; j < nrepeat; j++) { 387 if (asyncReceive && !streamopt) { 388 PrepareToReceive(&args); 389 } 390 SendData(&args); 391 if (!streamopt) { 392 RecvData(&args); 393 } 394 } 395 t = (When() - 396 t0) / ((1 + !streamopt) * nrepeat); 397 #ifdef HAVE_GETRUSAGE 398 getrusage(RUSAGE_SELF, &curr_rusage); 399 user_time = 400 ((curr_rusage.ru_utime.tv_sec - 401 prev_rusage.ru_utime.tv_sec) + 402 (double) 403 (curr_rusage.ru_utime.tv_usec - 404 prev_rusage.ru_utime.tv_usec) * 405 1.0E-6) / ((1 + 406 !streamopt) * nrepeat); 407 sys_time = 408 ((curr_rusage.ru_stime.tv_sec - 409 prev_rusage.ru_stime.tv_sec) + 410 (double) 411 (curr_rusage.ru_stime.tv_usec - 412 prev_rusage.ru_stime.tv_usec) * 413 1.0E-6) / ((1 + 414 !streamopt) * nrepeat); 415 ut2 += user_time * user_time; 416 st2 += sys_time * sys_time; 417 ut1 += user_time; 418 st1 += sys_time; 419 if ((user_time + sys_time) < 420 (best_user_time + best_sys_time)) { 421 best_user_time = user_time; 422 best_sys_time = sys_time; 423 } 424 #endif 425 426 if (!streamopt) { 427 t2 += t * t; 428 t1 += t; 429 bwdata[n].t = 430 MIN(bwdata[n].t, t); 431 } 432 } 433 if (!streamopt) 434 SendTime(&args, &bwdata[n].t); 435 else 436 RecvTime(&args, &bwdata[n].t); 437 438 if (!streamopt) 439 bwdata[n].variance = 440 t2 / TRIALS - 441 t1 / TRIALS * t1 / TRIALS; 442 443 #ifdef HAVE_GETRUSAGE 444 ut_var = 445 ut2 / TRIALS - 446 (ut1 / TRIALS) * (ut1 / TRIALS); 447 st_var = 448 st2 / TRIALS - 449 (st1 / TRIALS) * (st1 / TRIALS); 450 #endif 451 452 } else { 453 /* 454 This is the receiver: receive the block TRIALS times, and 455 if we are not streaming, send the block back to the 456 sender. 457 */ 458 bwdata[n].t = LONGTIME; 459 t2 = t1 = 0; 460 for (i = 0; i < TRIALS; i++) { 461 if (asyncReceive) { 462 PrepareToReceive(&args); 463 } 464 Sync(&args); 465 t0 = When(); 466 for (j = 0; j < nrepeat; j++) { 467 RecvData(&args); 468 if (asyncReceive 469 && (j < nrepeat - 1)) { 470 PrepareToReceive(&args); 471 } 472 if (!streamopt) 473 SendData(&args); 474 } 475 t = (When() - 476 t0) / ((1 + !streamopt) * nrepeat); 477 478 if (streamopt) { 479 t2 += t * t; 480 t1 += t; 481 bwdata[n].t = 482 MIN(bwdata[n].t, t); 483 } 484 } 485 if (streamopt) 486 SendTime(&args, &bwdata[n].t); 487 else 488 RecvTime(&args, &bwdata[n].t); 489 490 if (streamopt) 491 bwdata[n].variance = 492 t2 / TRIALS - 493 t1 / TRIALS * t1 / TRIALS; 494 495 } 496 tlast = bwdata[n].t; 497 bwdata[n].bits = args.bufflen * CHARSIZE; 498 bwdata[n].bps = 499 bwdata[n].bits / (bwdata[n].t * 1024 * 1024); 500 bwdata[n].repeat = nrepeat; 501 502 if (args.tr) { 503 fprintf(out, "%.7f %.7f %d %d %.7f", 504 bwdata[n].t, bwdata[n].bps, 505 bwdata[n].bits, bwdata[n].bits / 8, 506 bwdata[n].variance); 507 #ifdef HAVE_GETRUSAGE 508 fprintf(out, " %.7f %.7f %.7f %.7f", 509 ut1 / (double)TRIALS, 510 st1 / (double)TRIALS, ut_var, st_var); 511 #endif 512 fprintf(out, "\n"); 513 } 514 fflush(out); 515 516 free(memtmp); 517 free(memtmp1); 518 519 if (args.tr && printopt) { 520 fprintf(stderr, " %6.3f Mbps in %.7f sec", 521 bwdata[n].bps, tlast); 522 #ifdef HAVE_GETRUSAGE 523 fprintf(stderr, 524 ", avg utime=%.7f avg stime=%.7f, ", 525 ut1 / (double)TRIALS, 526 st1 / (double)TRIALS); 527 fprintf(stderr, "min utime=%.7f stime=%.7f, ", 528 best_user_time, best_sys_time); 529 fprintf(stderr, "utime var=%.7f stime var=%.7f", 530 ut_var, st_var); 531 #endif 532 fprintf(stderr, "\n"); 533 } 534 } /* End of perturbation loop */ 535 536 } /* End of main loop */ 537 538 if (args.tr) 539 fclose(out); 540 541 CleanUp(&args); 542 return (0); 543 } 544