00001
00002
00003
00004
00005
00006
00007 #include "q.h"
00008
00009
00010
00011
00012
00013
00014
00015 static int initScaleFactor (double **scaleFactorArrayPtr,
00016 util_timing_t * resetThresholdPtr,
00017 double subSampleFraction);
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 Q_Cos_t *
00032 Q_CosAlloc (int N)
00033 {
00034 int i;
00035 Q_Cos_t *result;
00036
00037 result = (Q_Cos_t *) malloc (sizeof (Q_Cos_t));
00038 result->numClasses = N;
00039 result->packetQueueArray = (lsList *) malloc (sizeof (lsList) * N);
00040 result->numPktsInQueue = 0;
00041
00042 for (i = 0; i < N; i++)
00043 {
00044 result->packetQueueArray[i] = lsCreate ();
00045 }
00046
00047 return result;
00048 }
00049
00050
00051
00052
00053
00054
00055
00056 int
00057 Q_CosFree (Q_Cos_t * aCosStructure)
00058 {
00059 int i, N;
00060
00061 for (i = 0; i < N; i++)
00062 {
00063 lsList packetQueue = aCosStructure->packetQueueArray[i];
00064 lsDestroy (packetQueue, 0);
00065 }
00066 free (aCosStructure);
00067 return 0;
00068 }
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083 Q_Drr_t *
00084 Q_DrrAlloc (int (*flowCmp) (Pkt_EthernetHdr_t *, Pkt_EthernetHdr_t *),
00085 int (*flowHash) (Pkt_EthernetHdr_t *, int))
00086 {
00087 Q_Drr_t *result;
00088
00089 result = (Q_Drr_t *) malloc (sizeof (Q_Drr_t));
00090
00091 result->flowToQueueTable = Hash_InitTable ( ( int(*)() ) flowCmp, ( int(*)() ) flowHash);
00092
00093 result->qOfFlows = lsCreate ();
00094 result->numPktsInQueue = 0;
00095 result->numBytesInQueue = 0;
00096
00097 return result;
00098 }
00099
00100
00101
00102
00103
00104
00105
00106 int
00107 Q_DrrFree (Q_Drr_t * aDrrStruct)
00108 {
00109 st_generator *stGen;
00110 int count;
00111 Q_Flow_t *flow;
00112
00113 st_foreach_item (aDrrStruct->flowToQueueTable, stGen, (char **) &count,
00114 (char **) &flow)
00115 {
00116 lsDestroy (flow->queue, 0);
00117 free (flow);
00118 }
00119 Hash_FreeTable (aDrrStruct->flowToQueueTable);
00120 lsDestroy (aDrrStruct->qOfFlows, 0);
00121
00122 free (aDrrStruct);
00123 return 0;
00124 }
00125
00126
00127
00128
00129
00130
00131
00132 int
00133 Q_CosInsertPpBegin (Q_Cos_t * cosStruct,
00134 Pkt_ProcessPkt_t * anEthFrame, int classIndex)
00135 {
00136
00137 return Q_CosInsertPpBeginEnd (cosStruct, anEthFrame, classIndex, 0);
00138 }
00139
00140
00141
00142
00143
00144
00145 int
00146 Q_CosInsertPpEnd (Q_Cos_t * cosStruct,
00147 Pkt_ProcessPkt_t * anEthFrame, int classIndex)
00148 {
00149
00150 return Q_CosInsertPpBeginEnd (cosStruct, anEthFrame, classIndex, 1);
00151 }
00152
00153
00154
00155
00156
00157
00158
00159
00160
00161
00162
00163
00164
00165
00166
00167
00168
00169
00170 int
00171 Q_CosInsertPpBeginEnd (Q_Cos_t * cosStruct,
00172 Pkt_ProcessPkt_t * anEthFrame,
00173 int classIndex, int insertAtEnd)
00174 {
00175 int numClasses;
00176
00177 numClasses = Q_CosNumClasses (cosStruct);
00178 if (classIndex < 0 || classIndex >= numClasses)
00179 {
00180 printf ("Panic: class passed in is not valid\n");
00181 assert (0);
00182 }
00183
00184 lsList packetQueue = cosStruct->packetQueueArray[classIndex];
00185
00186 if (insertAtEnd)
00187 {
00188 lsNewEnd (packetQueue, (lsGeneric) anEthFrame, 0);
00189 }
00190 else
00191 {
00192 lsNewBegin (packetQueue, (lsGeneric) anEthFrame, 0);
00193 }
00194 cosStruct->numPktsInQueue++;
00195
00196 return 1;
00197 }
00198
00199
00200
00201
00202
00203
00204
00205
00206
00207
00208 Pkt_ProcessPkt_t *
00209 Q_CosReadPp (Q_Cos_t * cosStruct)
00210 {
00211 Pkt_ProcessPkt_t *result = NIL (Pkt_ProcessPkt_t);
00212
00213 int i;
00214 for (i = 0; i < cosStruct->numClasses; i++)
00215 {
00216 if (lsDelBegin (cosStruct->packetQueueArray[i], (lsGeneric *) & result)
00217 == LS_NOMORE)
00218 {
00219 continue;
00220 }
00221 else
00222 {
00223 break;
00224 }
00225 }
00226 if (result != NIL (Pkt_ProcessPkt_t))
00227 {
00228 cosStruct->numPktsInQueue--;
00229 }
00230 return result;
00231 }
00232
00233
00234
00235
00236
00237
00238
00239 Pkt_ProcessPkt_t *
00240 Q_CosReadHead (Q_Cos_t * cosStruct, int *classPtr)
00241 {
00242 Pkt_ProcessPkt_t *result = NIL (Pkt_ProcessPkt_t);
00243
00244 int i;
00245 for (i = 0; i < cosStruct->numClasses; i++)
00246 {
00247 if (lsFirstItem
00248 (cosStruct->packetQueueArray[i], (lsGeneric *) & result,
00249 0) == LS_NOMORE)
00250 {
00251 continue;
00252 }
00253 else
00254 {
00255 if (classPtr != NIL (int))
00256 {
00257 *classPtr = i;
00258 }
00259 return result;
00260 }
00261 }
00262 return NIL (Pkt_ProcessPkt_t);
00263 }
00264
00265
00266
00267
00268
00269
00270
00271
00272 Pkt_ProcessPkt_t *
00273 Q_CosHead (Q_Cos_t * cosStruct, int *classPtr)
00274 {
00275 Pkt_ProcessPkt_t *result = NIL (Pkt_ProcessPkt_t);
00276
00277 int i;
00278 for (i = 0; i < cosStruct->numClasses; i++)
00279 {
00280 if (lsDelBegin (cosStruct->packetQueueArray[i], (lsGeneric *) & result)
00281 == LS_NOMORE)
00282 {
00283 continue;
00284 }
00285 else
00286 {
00287 cosStruct->numPktsInQueue--;
00288 if (classPtr != NIL (int))
00289 {
00290 *classPtr = i;
00291 }
00292 return result;
00293 }
00294 }
00295 return NIL (Pkt_ProcessPkt_t);
00296 }
00297
00298
00299
00300
00301
00302
00303
00304
00305 Pkt_ProcessPkt_t *
00306 Q_CosHeadClass (Q_Cos_t * cosStruct, int classIndex )
00307 {
00308 Pkt_ProcessPkt_t *result = NIL (Pkt_ProcessPkt_t);
00309 assert ((0 <= classIndex ) && (classIndex < cosStruct->numClasses));
00310
00311 if (lsDelBegin (cosStruct->packetQueueArray[classIndex], (lsGeneric *) & result)
00312 == LS_NOMORE)
00313 {
00314 return NIL (Pkt_ProcessPkt_t);
00315 }
00316 return result;
00317 }
00318
00319
00320
00321
00322 int
00323 Q_CosNumClasses (Q_Cos_t * cosStruct)
00324 {
00325 return cosStruct->numClasses;
00326 }
00327
00328
00329
00330
00331 int
00332 Q_DrrTestIsEmpty (Q_Drr_t * flowQueue)
00333 {
00334 return (flowQueue->numPktsInQueue > 0) ? 0 : 1;
00335 }
00336
00337
00338
00339
00340 int
00341 Q_CosTestIsEmpty (Q_Cos_t * cosStruct)
00342 {
00343 return (cosStruct->numPktsInQueue > 0) ? 0 : 1;
00344 }
00345
00346
00347
00348
00349 int
00350 Q_DrrInsertPpBegin (Q_Drr_t * drrStruct, Pkt_ProcessPkt_t * pp)
00351 {
00352 return Q_DrrInsertPpBeginEnd (drrStruct, pp, 1);
00353 }
00354
00355
00356
00357
00358 int
00359 Q_DrrInsertPpEnd (Q_Drr_t * drrStruct, Pkt_ProcessPkt_t * pp)
00360 {
00361 return Q_DrrInsertPpBeginEnd (drrStruct, pp, 0);
00362 }
00363
00364
00365
00366
00367 int
00368 Q_DrrInsertPpBeginEnd (Q_Drr_t * drrStruct,
00369 Pkt_ProcessPkt_t * pp, int insertAtEnd)
00370 {
00371 Q_Flow_t *aQ;
00372
00373 st_table *flowToQueueTable = Q_DrrReadFlowToQueueTable (drrStruct);
00374
00375 if (!Hash_Lookup (flowToQueueTable, (char *) pp->pkt, (char **) &aQ))
00376 {
00377
00378 aQ = Q_AllocFlow ();
00379
00380
00381
00382
00383 Pkt_IpHdr_t *tmpPkt = Pkt_EthernetExtractIp (pp->pkt);
00384 int ipHdrLength = tmpPkt->ihl * 4;
00385
00386 int length =
00387 sizeof (Pkt_EthernetHdr_t) + ipHdrLength + sizeof (Pkt_TcpHdr_t);
00388 char *flowId = (char *) malloc (length);
00389 memcpy (flowId, pp->pkt, length);
00390
00391 Hash_Insert (flowToQueueTable, (char *) flowId, (char *) aQ);
00392
00393 lsNewEnd (drrStruct->qOfFlows, (lsGeneric) aQ->queue, 0);
00394 }
00395 if (insertAtEnd)
00396 {
00397 lsNewEnd (aQ->queue, (lsGeneric) pp, 0);
00398 }
00399 else
00400 {
00401 lsNewBegin (aQ->queue, (lsGeneric) pp, 0);
00402 }
00403
00404 aQ->numPkts++;
00405 aQ->numBytes += Pkt_EthernetPktHdrReadLength (pp->pkt);
00406
00407 drrStruct->numPktsInQueue++;
00408
00409 return 1;
00410 }
00411
00412
00413
00414
00415 Q_Flow_t *
00416 Q_DrrReadFlow (Q_Drr_t * drrStruct, Pkt_EthernetHdr_t * pkt)
00417 {
00418 Q_Flow_t *aQ;
00419 if (!Hash_Lookup (drrStruct->flowToQueueTable, (char *) pkt, (char **) &aQ))
00420 {
00421 return NIL (Q_Flow_t);
00422 }
00423 else
00424 {
00425 return aQ;
00426 }
00427 }
00428
00429
00430
00431
00432
00433
00434
00435
00436
00437
00438
00439
00440
00441
00442
00443
00444
00445
00446 Pkt_ProcessPkt_t *
00447 Q_DrrHead (Q_Drr_t * drrStruct)
00448 {
00449 Pkt_ProcessPkt_t *result;
00450 lsList aPktQueue;
00451
00452 lsList qOfFlows = drrStruct->qOfFlows;
00453
00454 if ((lsLength (qOfFlows) == 0))
00455 {
00456 return NIL (Pkt_ProcessPkt_t);
00457 }
00458
00459 int status;
00460 status = lsDelBegin (qOfFlows, (lsGeneric *) & aPktQueue);
00461 assert (status == LS_OK);
00462
00463 status = lsDelBegin (aPktQueue, (lsGeneric *) & result);
00464 assert (status == LS_OK);
00465
00466 drrStruct->numPktsInQueue--;
00467
00468 Q_Flow_t *aFlow;
00469 int succ =
00470 Hash_Lookup (drrStruct->flowToQueueTable, (char *) result->pkt,
00471 (char **) &aFlow);
00472 assert (succ);
00473
00474 if (lsLength (aPktQueue) == 0)
00475 {
00476
00477 assert (aFlow->numPkts == 1);
00478 Pkt_EthernetHdr_t *tmp = result->pkt;
00479 Hash_Delete (drrStruct->flowToQueueTable, (char **) &tmp, 0);
00480
00481 lsDestroy (aPktQueue, NULL);
00482 }
00483 else
00484 {
00485
00486 lsNewEnd (qOfFlows, (lsGeneric) aPktQueue, 0);
00487
00488 aFlow->numPkts--;
00489 aFlow->numBytes -= Pkt_EthernetPktHdrReadLength (result->pkt);
00490 }
00491
00492 return result;
00493 }
00494
00495
00496
00497
00498
00499
00500
00501 Pkt_ProcessPkt_t *
00502 Q_DrrReadHead (Q_Drr_t * drrStruct)
00503 {
00504 Pkt_ProcessPkt_t *result;
00505 lsList aPktQueue;
00506
00507 lsList qOfFlows = drrStruct->qOfFlows;
00508
00509 if ((lsLength (qOfFlows) == 0))
00510 {
00511 return NIL (Pkt_ProcessPkt_t);
00512 }
00513
00514 int status;
00515 status = lsFirstItem (qOfFlows, (lsGeneric *) & aPktQueue, 0);
00516 assert (status == LS_OK);
00517
00518 status = lsFirstItem (aPktQueue, (lsGeneric *) & result, 0);
00519 assert (status == LS_OK);
00520
00521 return result;
00522 }
00523
00524
00525
00526
00527
00528
00529
00530 st_table *
00531 Q_DrrReadFlowToQueueTable (Q_Drr_t * drrStruct)
00532 {
00533 return drrStruct->flowToQueueTable;
00534 }
00535
00536
00537
00538
00539
00540
00541
00542 int
00543 Q_DrrNumFlows (Q_Drr_t * drrStruct)
00544 {
00545 int count = st_count (drrStruct->flowToQueueTable);
00546 return count;
00547 }
00548
00549
00550
00551
00552
00553
00554
00555
00556
00557
00558 void
00559 Q_CosPrint (Q_Cos_t * cosData)
00560 {
00561 printf (">>> Printing COS data structure...\n");
00562 printf ("Num of classes = %d\n", cosData->numClasses);
00563
00564 int i;
00565 for (i = 0; i < cosData->numClasses; i++)
00566 {
00567 printf ("COS %d has %d classes\n", i,
00568 lsLength (cosData->packetQueueArray[i]));
00569 }
00570
00571 return;
00572 }
00573
00574
00575
00576
00577
00578
00579
00580 void
00581 Q_DrrPrint (Q_Drr_t * drrData)
00582 {
00583 st_generator *stGen;
00584 int flowId;
00585 lsList aQ;
00586
00587 printf ("Printing DRR structure:\n");
00588
00589 if (st_count (drrData->flowToQueueTable) > 100)
00590 {
00591 printf
00592 ("This DRR struct has more than 100 indivudual queues, skipping!\n");
00593 }
00594 else
00595 {
00596 st_foreach_item (drrData->flowToQueueTable, stGen, (char **) &flowId,
00597 (char **) &aQ)
00598 {
00599 printf ("Flow id = %u and queue for this flow is has %d elements\n",
00600 flowId, lsLength (aQ));
00601 }
00602 }
00603 return;
00604 }
00605
00606
00607
00608
00609
00610
00611 Q_Flow_t *
00612 Q_AllocFlow ()
00613 {
00614 Q_Flow_t *flow = (Q_Flow_t *) malloc (sizeof (Q_Flow_t));
00615 flow->queue = lsCreate ();
00616 flow->numBytes = 0;
00617 flow->numPkts = 0;
00618 flow->rate = 0.0;
00619 util_timing_set (&flow->startOfCurrentWindow);
00620 flow->numBytes = 0.0;
00621
00622 return flow;
00623 }
00624
00625
00626
00627
00628
00629
00630 double
00631 Q_ComputeRate (util_timing_t startOfWindow,
00632 double timeWindowDuration,
00633 int numBytesExitedInWindow, int pktLength)
00634 {
00635 double newRate;
00636
00637 util_timing_t currentTime;
00638 util_timing_set (¤tTime);
00639 double delta =
00640 util_timing_secs (currentTime) - util_timing_secs (startOfWindow);
00641
00642 if (delta > timeWindowDuration)
00643 {
00644 newRate = 0.0;
00645 }
00646 else
00647 {
00648 newRate = (8.0 * (double) (numBytesExitedInWindow + pktLength)) / delta;
00649 }
00650 return newRate;
00651 }
00652
00653
00654
00655
00656
00657
00658 double
00659 Q_ComputeRateOld (util_timing_t lastTime, Pkt_EthernetHdr_t * pkt)
00660 {
00661 static int count = 0;
00662 util_timing_t currentTime;
00663 util_timing_set (¤tTime);
00664 util_timing_t delta = util_timing_diff (currentTime, lastTime);
00665 double deltaTime = util_timing_secs (delta);
00666 double result;
00667
00668 const double tick = 0.00000001;
00669 static double *scaleFactor = NIL (double);
00670 static util_timing_t resetThreshold;
00671
00672 if (scaleFactor == NIL (double))
00673 {
00674 initScaleFactor (&scaleFactor, &resetThreshold, tick);
00675 }
00676
00677 if (util_timing_compare (resetThreshold, delta))
00678 {
00679
00680 result =
00681 8.0 * ((double) Pkt_EthernetPktHdrReadLength (pkt)) / deltaTime;
00682 }
00683 else
00684 {
00685
00686
00687 double a = 0.999999931;
00688 result = pow (a, (deltaTime / tick)) +
00689 (1.0 -
00690 a) * (8.0 * (double) Pkt_EthernetPktHdrReadLength (pkt)) / deltaTime;
00691
00692
00693
00694
00695
00696 }
00697 if (0 == (count % 1000))
00698 {
00699 printf ("count %d, rate %.8f\n", count, result);
00700 }
00701 count++;
00702 return result;
00703 }
00704
00705
00706
00707
00708
00709
00710 static int
00711 initScaleFactor (double **scaleFactorArrayPtr,
00712 util_timing_t * resetThresholdPtr, double subSampleFraction)
00713 {
00714
00715 const double resetThresholdTime = 10.0;
00716 util_timing_t resetThreshold = util_timing_pair (resetThresholdTime);
00717 *resetThresholdPtr = resetThreshold;
00718
00719
00720 const double sampleTime = 0.512 * 0.000001;
00721
00722
00723 const double alpha = 0.9999996451;
00724
00725 const int subSampleRate = (int) (1.0 / subSampleFraction);
00726 int length = (int) ((resetThresholdTime / sampleTime) * subSampleFraction);
00727 static double *scaleFactor = NIL (double);
00728 scaleFactor = (double *) malloc (sizeof (double) * length);
00729
00730 double currProd = alpha;
00731 int loopBound = (int) (resetThresholdTime / sampleTime);
00732
00733 int i;
00734 for (i = 0; i < loopBound; i++)
00735 {
00736 if ((i % subSampleRate) == 0)
00737 {
00738 scaleFactor[i % subSampleRate] = currProd;
00739 }
00740 currProd *= alpha;
00741 }
00742 *scaleFactorArrayPtr = scaleFactor;
00743
00744 return length;
00745 }
00746
00747
00748
00749
00750
00751
00752
00753 Q_Q_t *
00754 Q_AllocQ (char *name)
00755 {
00756 Q_Q_t *result = (Q_Q_t *) malloc (sizeof (Q_Q_t));
00757
00758
00759 result->flowType = Pkt_SrcIpFlow_c;
00760 result->flowMaxRate = 0;
00761 result->maxFlows = -1;
00762 (result->lastEntryTime).high = 0;
00763 (result->lastEntryTime).low = 0;
00764
00765
00766 result->flowQ = NIL (Q_Drr_t);
00767
00768 result->name = name;
00769 result->type = Q_QueueTypeUndef_c;
00770
00771 result->maxRate = INFINITY;
00772 result->currentRate = 0;
00773 result->flowMaxRate = INFINITY;
00774 result->maxBytesPerFlow = INFINITY;
00775 result->flowDropMethod = Q_DropTail_c;
00776
00777 result->currentPriority = 0;
00778 result->assignedPriority = 0;
00779
00780 result->numPktsInQueue = 0;
00781 result->numPktsProcessed = 0.0;
00782
00783 result->maxAllowedBytes = 0;
00784 result->numBytesInQueue = 0;
00785 result->numBytesEntered = 0.0;
00786 result->numBytesExited = 0.0;
00787
00788 result->timeWindowDuration = 1.0;
00789 result->numBytesExitedInWindow = 0.0;
00790 (result->startOfCurrentWindow).high = 0;
00791 (result->startOfCurrentWindow).low = 0;
00792
00793 result->dropMethod = Q_DropTail_c;
00794
00795 result->numClasses = 0;
00796 result->cosQ = NIL (Q_Cos_t);
00797
00798 return result;
00799 }