Home | History | Annotate | Download | only in importer
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 'use strict';
      6 
      7 /**
      8  * @fileoverview Implements a WebSocket client that receives
      9  * a stream of slices from a server.
     10  *
     11  */
     12 
     13 base.require('base.events');
     14 base.require('tracing.trace_model');
     15 base.require('tracing.trace_model.slice');
     16 
     17 base.exportTo('tracing.importer', function() {
     18 
     19   var STATE_PAUSED = 0x1;
     20   var STATE_CAPTURING = 0x2;
     21 
     22   /**
     23    * Converts a stream of trace data from a websocket into a model.
     24    *
     25    * Events consumed by this importer have the following JSON structure:
     26    *
     27    * {
     28    *   'cmd': 'commandName',
     29    *   ... command specific data
     30    * }
     31    *
     32    * The importer understands 2 commands:
     33    *      'ptd' (Process Thread Data)
     34    *      'pcd' (Process Counter Data)
     35    *
     36    * The command specific data is as follows:
     37    *
     38    * {
     39    *   'pid': 'Remote Process Id',
     40    *   'td': {
     41    *                  'n': 'Thread Name Here',
     42    *                  's: [ {
     43    *                              'l': 'Slice Label',
     44    *                              's': startTime,
     45    *                              'e': endTime
     46    *                              }, ... ]
     47    *         }
     48    * }
     49    *
     50    * {
     51    *  'pid' 'Remote Process Id',
     52    *  'cd': {
     53    *      'n': 'Counter Name',
     54    *      'sn': ['Series Name',...]
     55    *      'sc': [seriesColor, ...]
     56    *      'c': [
     57    *            {
     58    *              't': timestamp,
     59    *              'v': [value0, value1, ...]
     60    *            },
     61    *            ....
     62    *           ]
     63    *       }
     64    * }
     65    * @param {Model} model that will be updated
     66    * when events are received.
     67    * @constructor
     68    */
     69   function TimelineStreamImporter(model) {
     70     var self = this;
     71     this.model_ = model;
     72     this.connection_ = undefined;
     73     this.state_ = STATE_CAPTURING;
     74     this.connectionOpenHandler_ =
     75         this.connectionOpenHandler_.bind(this);
     76     this.connectionCloseHandler_ =
     77         this.connectionCloseHandler_.bind(this);
     78     this.connectionErrorHandler_ =
     79         this.connectionErrorHandler_.bind(this);
     80     this.connectionMessageHandler_ =
     81         this.connectionMessageHandler_.bind(this);
     82   }
     83 
     84   TimelineStreamImporter.prototype = {
     85     __proto__: base.EventTarget.prototype,
     86 
     87     cleanup_: function() {
     88       if (!this.connection_)
     89         return;
     90       this.connection_.removeEventListener('open',
     91           this.connectionOpenHandler_);
     92       this.connection_.removeEventListener('close',
     93           this.connectionCloseHandler_);
     94       this.connection_.removeEventListener('error',
     95           this.connectionErrorHandler_);
     96       this.connection_.removeEventListener('message',
     97           this.connectionMessageHandler_);
     98     },
     99 
    100     connectionOpenHandler_: function() {
    101       this.dispatchEvent({'type': 'connect'});
    102     },
    103 
    104     connectionCloseHandler_: function() {
    105       this.dispatchEvent({'type': 'disconnect'});
    106       this.cleanup_();
    107     },
    108 
    109     connectionErrorHandler_: function() {
    110       this.dispatchEvent({'type': 'connectionerror'});
    111       this.cleanup_();
    112     },
    113 
    114     connectionMessageHandler_: function(event) {
    115       var packet = JSON.parse(event.data);
    116       var command = packet['cmd'];
    117       var pid = packet['pid'];
    118       var modelDirty = false;
    119       if (command == 'ptd') {
    120         var process = this.model_.getOrCreateProcess(pid);
    121         var threadData = packet['td'];
    122         var threadName = threadData['n'];
    123         var threadSlices = threadData['s'];
    124         var thread = process.getOrCreateThread(threadName);
    125         for (var s = 0; s < threadSlices.length; s++) {
    126           var slice = threadSlices[s];
    127           thread.sliceGroup.pushSlice(new tracing.trace_model.ThreadSlice(
    128               'streamed',
    129               slice['l'],
    130               0,
    131               slice['s'],
    132               {},
    133               slice['e'] - slice['s']));
    134         }
    135         modelDirty = true;
    136       } else if (command == 'pcd') {
    137         var process = this.model_.getOrCreateProcess(pid);
    138         var counterData = packet['cd'];
    139         var counterName = counterData['n'];
    140         var counterSeriesNames = counterData['sn'];
    141         var counterSeriesColors = counterData['sc'];
    142         var counterValues = counterData['c'];
    143         var counter = process.getOrCreateCounter('streamed', counterName);
    144         if (counterSeriesNames.length != counterSeriesColors.length) {
    145           var importError = 'Streamed counter name length does not match' +
    146                             'counter color length' + counterSeriesNames.length +
    147                             ' vs ' + counterSeriesColors.length;
    148           this.model_.importErrors.push(importError);
    149           return;
    150         }
    151         if (counter.series.length === 0) {
    152           for (var i = 0; i < counterSeriesNames.length; ++i) {
    153             counter.addSeries(new tracing.trace_model.CounterSeries(
    154                 counterSeriesNames[i], counterSeriesColors[i]));
    155           }
    156         } else {
    157           if (counter.series.length != counterSeriesNames.length) {
    158             var importError = 'Streamed counter ' + counterName +
    159                 'changed number of seriesNames';
    160             this.model_.importErrors.push(importError);
    161             return;
    162           } else {
    163             for (var i = 0; i < counter.series.length; i++) {
    164               var oldSeriesName = counter.series[i].name;
    165               var newSeriesName = counterSeriesNames[i];
    166 
    167               if (oldSeriesName != newSeriesName) {
    168                 var importError = 'Streamed counter ' + counterName +
    169                     'series name changed from ' +
    170                     oldSeriesName + ' to ' +
    171                     newSeriesName;
    172                 this.model_.importErrors.push(importError);
    173                 return;
    174               }
    175             }
    176           }
    177         }
    178         for (var c = 0; c < counterValues.length; c++) {
    179           var count = counterValues[c];
    180           var ts = count['t'];
    181           var values = count['v'];
    182           for (var i = 0; i < values.length; ++i) {
    183             counter.series[i].addSample(ts, values[i]);
    184           }
    185         }
    186         modelDirty = true;
    187       }
    188       if (modelDirty == true) {
    189         this.model_.updateBounds();
    190         this.dispatchEvent({'type': 'modelchange',
    191           'model': this.model_});
    192       }
    193     },
    194 
    195     get connected() {
    196       if (this.connection_ !== undefined &&
    197           this.connection_.readyState == WebSocket.OPEN) {
    198         return true;
    199       }
    200       return false;
    201     },
    202 
    203     get paused() {
    204       return this.state_ == STATE_PAUSED;
    205     },
    206 
    207     /**
    208      * Connects the stream to a websocket.
    209      * @param {WebSocket} wsConnection The websocket to use for the stream.
    210      */
    211     connect: function(wsConnection) {
    212       this.connection_ = wsConnection;
    213       this.connection_.addEventListener('open',
    214           this.connectionOpenHandler_);
    215       this.connection_.addEventListener('close',
    216           this.connectionCloseHandler_);
    217       this.connection_.addEventListener('error',
    218           this.connectionErrorHandler_);
    219       this.connection_.addEventListener('message',
    220           this.connectionMessageHandler_);
    221     },
    222 
    223     pause: function() {
    224       if (this.state_ == STATE_PAUSED)
    225         throw new Error('Already paused.');
    226       if (!this.connection_)
    227         throw new Error('Not connected.');
    228       this.connection_.send(JSON.stringify({'cmd': 'pause'}));
    229       this.state_ = STATE_PAUSED;
    230     },
    231 
    232     resume: function() {
    233       if (this.state_ == STATE_CAPTURING)
    234         throw new Error('Already capturing.');
    235       if (!this.connection_)
    236         throw new Error('Not connected.');
    237       this.connection_.send(JSON.stringify({'cmd': 'resume'}));
    238       this.state_ = STATE_CAPTURING;
    239     }
    240   };
    241 
    242   return {
    243     TimelineStreamImporter: TimelineStreamImporter
    244   };
    245 });
    246