Bug 1025799 - Progress events for app install. r=ochameau
authorJ. Ryan Stinnett <jryans@gmail.com>
Thu, 26 Jun 2014 14:19:00 +0200
changeset 191166 8fb18794fe60317d2f79b6726a331baa2da668d9
parent 191165 f6cee8c74faf8736354367bc6337eccca3ea66a9
child 191167 328c548bb1167fb221cd2fb063ea95d278030d65
push id8436
push usercbook@mozilla.com
push dateFri, 27 Jun 2014 13:56:57 +0000
treeherderb2g-inbound@22ea396750e8 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersochameau
bugs1025799
milestone33.0a1
Bug 1025799 - Progress events for app install. r=ochameau
toolkit/devtools/apps/app-actor-front.js
toolkit/devtools/apps/tests/unit/test_webappsActor.js
toolkit/devtools/client/dbg-client.jsm
toolkit/devtools/server/main.js
toolkit/devtools/transport/packets.js
toolkit/devtools/transport/stream-utils.js
toolkit/devtools/transport/transport.js
--- a/toolkit/devtools/apps/app-actor-front.js
+++ b/toolkit/devtools/apps/app-actor-front.js
@@ -1,28 +1,32 @@
 const {Ci, Cc, Cu, Cr} = require("chrome");
 Cu.import("resource://gre/modules/osfile.jsm");
 const {Services} = Cu.import("resource://gre/modules/Services.jsm");
 const {FileUtils} = Cu.import("resource://gre/modules/FileUtils.jsm");
 const {NetUtil} = Cu.import("resource://gre/modules/NetUtil.jsm");
 const {devtools} = Cu.import("resource://gre/modules/devtools/Loader.jsm", {});
 const {Promise: promise} = Cu.import("resource://gre/modules/Promise.jsm", {});
+const EventEmitter = require("devtools/toolkit/event-emitter");
 
 // XXX: bug 912476 make this module a real protocol.js front
 // by converting webapps actor to protocol.js
 
 const PR_USEC_PER_MSEC = 1000;
 const PR_RDWR = 0x04;
 const PR_CREATE_FILE = 0x08;
 const PR_TRUNCATE = 0x20;
 
 const CHUNK_SIZE = 10000;
 
 const appTargets = new Map();
 
+const AppActorFront = exports;
+EventEmitter.decorate(AppActorFront);
+
 function addDirToZip(writer, dir, basePath) {
   let files = dir.directoryEntries;
 
   while (files.hasMoreElements()) {
     let file = files.getNext().QueryInterface(Ci.nsIFile);
 
     if (file.isHidden() ||
         file.isSpecial() ||
@@ -97,25 +101,44 @@ function uploadPackageJSON(client, webap
   let request = {
     to: webappsActor,
     type: "uploadPackage"
   };
   client.request(request, (res) => {
     openFile(res.actor);
   });
 
+  let fileSize;
+  let bytesRead = 0;
+
+  function emitProgress() {
+    emitInstallProgress({
+      bytesSent: bytesRead,
+      totalBytes: fileSize
+    });
+  }
+
   function openFile(actor) {
+    let openedFile;
     OS.File.open(packageFile.path)
-      .then(function (file) {
-        uploadChunk(actor, file);
+      .then(file => {
+        openedFile = file;
+        return openedFile.stat();
+      })
+      .then(fileInfo => {
+        fileSize = fileInfo.size;
+        emitProgress();
+        uploadChunk(actor, openedFile);
       });
   }
   function uploadChunk(actor, file) {
     file.read(CHUNK_SIZE)
         .then(function (bytes) {
+          bytesRead += bytes.length;
+          emitProgress();
           // To work around the fact that JSON.stringify translates the typed
           // array to object, we are encoding the typed array here into a string
           let chunk = String.fromCharCode.apply(null, bytes);
 
           let request = {
             to: actor,
             type: "chunk",
             chunk: chunk
@@ -163,17 +186,21 @@ function uploadPackageBulk(client, webap
     let request = client.startBulkRequest({
       actor: actor,
       type: "stream",
       length: fileSize
     });
 
     request.on("bulk-send-ready", ({copyFrom}) => {
       NetUtil.asyncFetch(packageFile, function(inputStream) {
-        copyFrom(inputStream).then(() => {
+        let copying = copyFrom(inputStream);
+        copying.on("progress", (e, progress) => {
+          emitInstallProgress(progress);
+        });
+        copying.then(() => {
           console.log("Bulk upload done");
           inputStream.close();
           deferred.resolve(actor);
         });
       });
     });
   }
 
@@ -231,16 +258,26 @@ function installPackaged(client, webapps
             () => removeServerTemporaryFile(client, fileActor),
             () => removeServerTemporaryFile(client, fileActor));
         });
   });
   return deferred.promise;
 }
 exports.installPackaged = installPackaged;
 
+/**
+ * Emits numerous events as packaged app installation proceeds.
+ * The progress object contains:
+ *  * bytesSent:  The number of bytes sent so far
+ *  * totalBytes: The total number of bytes to send
+ */
+function emitInstallProgress(progress) {
+  AppActorFront.emit("install-progress", progress);
+}
+
 function installHosted(client, webappsActor, appId, metadata, manifest) {
   let deferred = promise.defer();
   let request = {
     to: webappsActor,
     type: "install",
     appId: appId,
     metadata: metadata,
     manifest: manifest
--- a/toolkit/devtools/apps/tests/unit/test_webappsActor.js
+++ b/toolkit/devtools/apps/tests/unit/test_webappsActor.js
@@ -1,14 +1,16 @@
 /* Any copyright is dedicated to the Public Domain.
    http://creativecommons.org/publicdomain/zero/1.0/ */
 
 const {devtools} = Cu.import("resource://gre/modules/devtools/Loader.jsm", {});
 const {require} = devtools;
-const {installHosted, installPackaged} = require("devtools/app-actor-front");
+const AppActorFront = require("devtools/app-actor-front");
+const {installHosted, installPackaged} = AppActorFront;
+const {Promise: promise} = Cu.import("resource://gre/modules/Promise.jsm", {});
 
 let gAppId = "actor-test";
 const APP_ORIGIN = "app://" + gAppId;
 
 add_test(function testLaunchInexistantApp() {
   let request = {type: "launch", manifestURL: "http://foo.com"};
   webappActorRequest(request, function (aResponse) {
     do_check_eq(aResponse.error, "NO_SUCH_APP");
@@ -173,39 +175,64 @@ add_test(function testUninstall() {
 });
 
 add_test(function testFileUploadInstall() {
   let packageFile = do_get_file("data/app.zip");
 
   // Disable the bulk trait temporarily to test the JSON upload path
   gClient.traits.bulk = false;
 
-  installPackaged(gClient, gActor, packageFile.path, gAppId)
+  let progressDeferred = promise.defer();
+  // Ensure we get at least one progress event at the end
+  AppActorFront.on("install-progress", function onProgress(e, progress) {
+    if (progress.bytesSent == progress.totalBytes) {
+      AppActorFront.off("install-progress", onProgress);
+      progressDeferred.resolve();
+    }
+  });
+
+  let installed =
+    installPackaged(gClient, gActor, packageFile.path, gAppId)
     .then(function ({ appId }) {
       do_check_eq(appId, gAppId);
+    }, function (e) {
+      do_throw("Failed install uploaded packaged app: " + e.error + ": " + e.message);
+    });
 
+  promise.all([progressDeferred.promise, installed])
+    .then(() => {
       // Restore default bulk trait value
       gClient.traits.bulk = true;
-
       run_next_test();
-    }, function (e) {
-      do_throw("Failed install uploaded packaged app: " + e.error + ": " + e.message);
     });
 });
 
 add_test(function testBulkUploadInstall() {
   let packageFile = do_get_file("data/app.zip");
   do_check_true(gClient.traits.bulk);
-  installPackaged(gClient, gActor, packageFile.path, gAppId)
+
+  let progressDeferred = promise.defer();
+  // Ensure we get at least one progress event at the end
+  AppActorFront.on("install-progress", function onProgress(e, progress) {
+    if (progress.bytesSent == progress.totalBytes) {
+      AppActorFront.off("install-progress", onProgress);
+      progressDeferred.resolve();
+    }
+  });
+
+  let installed =
+    installPackaged(gClient, gActor, packageFile.path, gAppId)
     .then(function ({ appId }) {
       do_check_eq(appId, gAppId);
-      run_next_test();
     }, function (e) {
       do_throw("Failed bulk install uploaded packaged app: " + e.error + ": " + e.message);
     });
+
+  promise.all([progressDeferred.promise, installed])
+    .then(run_next_test);
 });
 
 add_test(function testInstallHosted() {
   gAppId = "hosted-app";
   let metadata = {
     origin: "http://foo.com",
     installOrigin: "http://metadata.foo.com",
     manifestURL: "http://foo.com/metadata/manifest.webapp"
@@ -245,9 +272,8 @@ add_test(function testCheckHostedApp() {
   });
 });
 
 function run_test() {
   setup();
 
   run_next_test();
 }
-
--- a/toolkit/devtools/client/dbg-client.jsm
+++ b/toolkit/devtools/client/dbg-client.jsm
@@ -658,16 +658,18 @@ DebuggerClient.prototype = {
    *           * copyTo: A helper function for getting your data out of the
    *                     stream that meets the stream handling requirements
    *                     above, and has the following signature:
    *             @param  output nsIAsyncOutputStream
    *                     The stream to copy to.
    *             @return Promise
    *                     The promise is resolved when copying completes or
    *                     rejected if any (unexpected) errors occur.
+   *                     This object also emits "progress" events for each chunk
+   *                     that is copied.  See stream-utils.js.
    */
   request: function (aRequest, aOnResponse) {
     if (!this.mainRoot) {
       throw Error("Have not yet received a hello packet from the server.");
     }
     if (!aRequest.to) {
       let type = aRequest.type || "";
       throw Error("'" + type + "' request packet has no destination.");
@@ -723,16 +725,18 @@ DebuggerClient.prototype = {
    *           * copyFrom: A helper function for getting your data onto the
    *                       stream that meets the stream handling requirements
    *                       above, and has the following signature:
    *             @param  input nsIAsyncInputStream
    *                     The stream to copy from.
    *             @return Promise
    *                     The promise is resolved when copying completes or
    *                     rejected if any (unexpected) errors occur.
+   *                     This object also emits "progress" events for each chunk
+   *                     that is copied.  See stream-utils.js.
    *         * json-reply: The server replied with a JSON packet, which is
    *           passed as event data.
    *         * bulk-reply: The server replied with bulk data, which you can read
    *           using the event data object containing:
    *           * actor:  Name of actor that received the packet
    *           * type:   Name of actor's method that was called on receipt
    *           * length: Size of the data to be read
    *           * stream: This input stream should only be used directly if you
@@ -748,16 +752,18 @@ DebuggerClient.prototype = {
    *           * copyTo: A helper function for getting your data out of the
    *                     stream that meets the stream handling requirements
    *                     above, and has the following signature:
    *             @param  output nsIAsyncOutputStream
    *                     The stream to copy to.
    *             @return Promise
    *                     The promise is resolved when copying completes or
    *                     rejected if any (unexpected) errors occur.
+   *                     This object also emits "progress" events for each chunk
+   *                     that is copied.  See stream-utils.js.
    */
   startBulkRequest: function(request) {
     if (!this.traits.bulk) {
       throw Error("Server doesn't support bulk transfers");
     }
     if (!this.mainRoot) {
       throw Error("Have not yet received a hello packet from the server.");
     }
@@ -927,16 +933,18 @@ DebuggerClient.prototype = {
    *        * copyTo: A helper function for getting your data out of the stream
    *                  that meets the stream handling requirements above, and has
    *                  the following signature:
    *          @param  output nsIAsyncOutputStream
    *                  The stream to copy to.
    *          @return Promise
    *                  The promise is resolved when copying completes or rejected
    *                  if any (unexpected) errors occur.
+   *                  This object also emits "progress" events for each chunk
+   *                  that is copied.  See stream-utils.js.
    */
   onBulkPacket: function(packet) {
     let { actor, type, length } = packet;
 
     if (!actor) {
       DevToolsUtils.reportException(
         "onBulkPacket",
         new Error("Server did not specify an actor, dropping bulk packet: " +
--- a/toolkit/devtools/server/main.js
+++ b/toolkit/devtools/server/main.js
@@ -1194,16 +1194,18 @@ DebuggerServerConnection.prototype = {
    *        * copyTo: A helper function for getting your data out of the stream
    *                  that meets the stream handling requirements above, and has
    *                  the following signature:
    *          @param  output nsIAsyncOutputStream
    *                  The stream to copy to.
    *          @return Promise
    *                  The promise is resolved when copying completes or rejected
    *                  if any (unexpected) errors occur.
+   *                  This object also emits "progress" events for each chunk
+   *                  that is copied.  See stream-utils.js.
    */
   onBulkPacket: function(packet) {
     let { actor: actorKey, type, length } = packet;
 
     let actor = this._getOrCreateActor(actorKey);
     if (!actor) {
       return;
     }
--- a/toolkit/devtools/transport/packets.js
+++ b/toolkit/devtools/transport/packets.js
@@ -23,16 +23,17 @@
  *   * destroy()
  *     Called to clean up at the end of use
  */
 
 const { Cc, Ci, Cu } = require("chrome");
 const DevToolsUtils = require("devtools/toolkit/DevToolsUtils");
 const { dumpn, dumpv } = DevToolsUtils;
 const StreamUtils = require("devtools/toolkit/transport/stream-utils");
+const EventEmitter = require("devtools/toolkit/event-emitter");
 
 DevToolsUtils.defineLazyGetter(this, "unicodeConverter", () => {
   const unicodeConverter = Cc["@mozilla.org/intl/scriptableunicodeconverter"]
                            .createInstance(Ci.nsIScriptableUnicodeConverter);
   unicodeConverter.charset = "UTF-8";
   return unicodeConverter;
 });
 
@@ -269,18 +270,19 @@ BulkPacket.prototype.read = function(str
   let deferred = promise.defer();
 
   this._transport._onBulkReadReady({
     actor: this.actor,
     type: this.type,
     length: this.length,
     copyTo: (output) => {
       dumpv("CT length: " + this.length);
-      deferred.resolve(StreamUtils.copyStream(stream, output, this.length));
-      return deferred.promise;
+      let copying = StreamUtils.copyStream(stream, output, this.length);
+      deferred.resolve(copying);
+      return copying;
     },
     stream: stream,
     done: deferred
   });
 
   // Await the result of reading from the stream
   deferred.promise.then(() => {
     dumpv("onReadDone called, ending bulk mode");
@@ -318,18 +320,19 @@ BulkPacket.prototype.write = function(st
   // Temporarily pause the monitoring of the output stream
   this._transport.pauseOutgoing();
 
   let deferred = promise.defer();
 
   this._readyForWriting.resolve({
     copyFrom: (input) => {
       dumpv("CF length: " + this.length);
-      deferred.resolve(StreamUtils.copyStream(input, stream, this.length));
-      return deferred.promise;
+      let copying = StreamUtils.copyStream(input, stream, this.length);
+      deferred.resolve(copying);
+      return copying;
     },
     stream: stream,
     done: deferred
   });
 
   // Await the result of writing to the stream
   deferred.promise.then(() => {
     dumpv("onWriteDone called, ending bulk mode");
--- a/toolkit/devtools/transport/stream-utils.js
+++ b/toolkit/devtools/transport/stream-utils.js
@@ -3,16 +3,17 @@
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 "use strict";
 
 const { Ci, Cc, Cu, Cr, CC } = require("chrome");
 const Services = require("Services");
 const DevToolsUtils = require("devtools/toolkit/DevToolsUtils");
 const { dumpv } = DevToolsUtils;
+const EventEmitter = require("devtools/toolkit/event-emitter");
 
 DevToolsUtils.defineLazyGetter(this, "IOUtil", () => {
   return Cc["@mozilla.org/io-util;1"].getService(Ci.nsIIOUtil);
 });
 
 DevToolsUtils.defineLazyGetter(this, "ScriptableInputStream", () => {
   return CC("@mozilla.org/scriptableinputstream;1",
             "nsIScriptableInputStream", "init");
@@ -54,52 +55,62 @@ const BUFFER_SIZE = 0x8000;
  *         (unexpected) errors occur.
  */
 function copyStream(input, output, length) {
   let copier = new StreamCopier(input, output, length);
   return copier.copy();
 }
 
 function StreamCopier(input, output, length) {
+  EventEmitter.decorate(this);
   this._id = StreamCopier._nextId++;
   this.input = input;
   // Save off the base output stream, since we know it's async as we've required
   this.baseAsyncOutput = output;
   if (IOUtil.outputStreamIsBuffered(output)) {
     this.output = output;
   } else {
     this.output = Cc["@mozilla.org/network/buffered-output-stream;1"].
                   createInstance(Ci.nsIBufferedOutputStream);
     this.output.init(output, BUFFER_SIZE);
   }
+  this._length = length;
   this._amountLeft = length;
   this._deferred = promise.defer();
 
   this._copy = this._copy.bind(this);
   this._flush = this._flush.bind(this);
   this._destroy = this._destroy.bind(this);
-  this._deferred.promise.then(this._destroy, this._destroy);
+
+  // Copy promise's then method up to this object.
+  // Allows the copier to offer a promise interface for the simple succeed or
+  // fail scenarios, but also emit events (due to the EventEmitter) for other
+  // states, like progress.
+  this.then = this._deferred.promise.then.bind(this._deferred.promise);
+  this.then(this._destroy, this._destroy);
 
   // Stream ready callback starts as |_copy|, but may switch to |_flush| at end
   // if flushing would block the output stream.
   this._streamReadyCallback = this._copy;
 }
 StreamCopier._nextId = 0;
 
 StreamCopier.prototype = {
 
-  get copied() { return this._deferred.promise; },
-
   copy: function() {
-    try {
-      this._copy();
-    } catch(e) {
-      this._deferred.reject(e);
-    }
-    return this.copied;
+    // Dispatch to the next tick so that it's possible to attach a progress
+    // event listener, even for extremely fast copies (like when testing).
+    Services.tm.currentThread.dispatch(() => {
+      try {
+        this._copy();
+      } catch(e) {
+        this._deferred.reject(e);
+      }
+    }, 0);
+    return this;
   },
 
   _copy: function() {
     let bytesAvailable = this.input.available();
     let amountToCopy = Math.min(bytesAvailable, this._amountLeft);
     this._debug("Trying to copy: " + amountToCopy);
 
     let bytesCopied;
@@ -110,27 +121,35 @@ StreamCopier.prototype = {
       this._debug("Waiting for output stream");
       this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread);
       return;
     }
 
     this._amountLeft -= bytesCopied;
     this._debug("Copied: " + bytesCopied +
                 ", Left: " + this._amountLeft);
+    this._emitProgress();
 
     if (this._amountLeft === 0) {
       this._debug("Copy done!");
       this._flush();
       return;
     }
 
     this._debug("Waiting for input stream");
     this.input.asyncWait(this, 0, 0, Services.tm.currentThread);
   },
 
+  _emitProgress: function() {
+    this.emit("progress", {
+      bytesSent: this._length - this._amountLeft,
+      totalBytes: this._length
+    });
+  },
+
   _flush: function() {
     try {
       this.output.flush();
     } catch(e if e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK ||
                  e.result == Cr.NS_ERROR_FAILURE) {
       this._debug("Flush would block, will retry");
       this._streamReadyCallback = this._flush;
       this._debug("Waiting for output stream");
--- a/toolkit/devtools/transport/transport.js
+++ b/toolkit/devtools/transport/transport.js
@@ -83,16 +83,18 @@ const PACKET_HEADER_MAX = 200;
  *   * copyTo: A helper function for getting your data out of the stream that
  *             meets the stream handling requirements above, and has the
  *             following signature:
  *     @param  output nsIAsyncOutputStream
  *             The stream to copy to.
  *     @return Promise
  *             The promise is resolved when copying completes or rejected if any
  *             (unexpected) errors occur.
+ *             This object also emits "progress" events for each chunk that is
+ *             copied.  See stream-utils.js.
  *
  * - onClosed(reason) - called when the connection is closed. |reason| is
  *   an optional nsresult or object, typically passed when the transport is
  *   closed due to some error in a underlying stream.
  *
  * See ./packets.js and the Remote Debugging Protocol specification for more
  * details on the format of these packets.
  */
@@ -167,16 +169,18 @@ DebuggerTransport.prototype = {
    *           * copyFrom: A helper function for getting your data onto the
    *                       stream that meets the stream handling requirements
    *                       above, and has the following signature:
    *             @param  input nsIAsyncInputStream
    *                     The stream to copy from.
    *             @return Promise
    *                     The promise is resolved when copying completes or
    *                     rejected if any (unexpected) errors occur.
+   *                     This object also emits "progress" events for each chunk
+   *                     that is copied.  See stream-utils.js.
    */
   startBulkSend: function(header) {
     let packet = new BulkPacket(this);
     packet.header = header;
     this._outgoing.push(packet);
     this._flushOutgoing();
     return packet.streamReadyForWriting;
   },
@@ -571,19 +575,20 @@ LocalDebuggerTransport.prototype = {
       // Receiver
       let deferred = promise.defer();
 
       this.other.hooks.onBulkPacket({
         actor: actor,
         type: type,
         length: length,
         copyTo: (output) => {
-          deferred.resolve(
-            StreamUtils.copyStream(pipe.inputStream, output, length));
-          return deferred.promise;
+          let copying =
+            StreamUtils.copyStream(pipe.inputStream, output, length);
+          deferred.resolve(copying);
+          return copying;
         },
         stream: pipe.inputStream,
         done: deferred
       });
 
       // Await the result of reading from the stream
       deferred.promise.then(() => pipe.inputStream.close(), this.close);
     }, "LocalDebuggerTransport instance's this.other.hooks.onBulkPacket"));
@@ -593,19 +598,20 @@ LocalDebuggerTransport.prototype = {
 
     // The remote transport is not capable of resolving immediately here, so we
     // shouldn't be able to either.
     DevToolsUtils.executeSoon(() => {
       let copyDeferred = promise.defer();
 
       sendDeferred.resolve({
         copyFrom: (input) => {
-          copyDeferred.resolve(
-            StreamUtils.copyStream(input, pipe.outputStream, length));
-          return copyDeferred.promise;
+          let copying =
+            StreamUtils.copyStream(input, pipe.outputStream, length);
+          copyDeferred.resolve(copying);
+          return copying;
         },
         stream: pipe.outputStream,
         done: copyDeferred
       });
 
       // Await the result of writing to the stream
       copyDeferred.promise.then(() => pipe.outputStream.close(), this.close);
     });