Bug 1317223 (part 1) - a collection_repair module (without any repairers) and integration with the clients engine. r=rnewman
authorMark Hammond <mhammond@skippinet.com.au>
Thu, 02 Mar 2017 16:14:51 +1100
changeset 345673 6ca43c00d5093b9196acea71b52a33bbdf625107
parent 345672 555710da7b9a13e20c927c8bb6adad118c79b932
child 345674 80ecb5941325877ed04a07ec1e3a276f70f0c5fd
push id87648
push usermhammond@skippinet.com.au
push dateFri, 03 Mar 2017 03:27:31 +0000
treeherdermozilla-inbound@e62d1a669bba [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersrnewman
bugs1317223
milestone54.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1317223 (part 1) - a collection_repair module (without any repairers) and integration with the clients engine. r=rnewman This creates a collection_repair module, somewhat analogous to the existing collection_validator module. This defines the public interface to request a new repair and respond to a remote repair request, and also includes changes to clients.js to call this public interface. MozReview-Commit-ID: 9JPpRrLgFoR
services/sync/modules/collection_repair.js
services/sync/modules/engines/clients.js
services/sync/moz.build
tools/lint/eslint/modules.json
new file mode 100644
--- /dev/null
+++ b/services/sync/modules/collection_repair.js
@@ -0,0 +1,123 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+"use strict";
+
+const Cu = Components.utils;
+
+Cu.import("resource://gre/modules/XPCOMUtils.jsm");
+Cu.import("resource://services-sync/main.js");
+
+this.EXPORTED_SYMBOLS = ["getRepairRequestor", "getAllRepairRequestors",
+                         "CollectionRepairRequestor",
+                         "getRepairResponder",
+                         "CollectionRepairResponder"];
+
+// The individual requestors/responders, lazily loaded.
+const REQUESTORS = {
+}
+
+const RESPONDERS = {
+}
+
+// Should we maybe enforce the requestors being a singleton?
+function _getRepairConstructor(which, collection) {
+  if (!(collection in which)) {
+    return null;
+  }
+  let [modname, symbolname] = which[collection];
+  let ns = {};
+  Cu.import("resource://services-sync/" + modname, ns);
+  return ns[symbolname];
+}
+
+function getRepairRequestor(collection) {
+  let ctor = _getRepairConstructor(REQUESTORS, collection);
+  if (!ctor) {
+    return null;
+  }
+  return new ctor();
+}
+
+function getAllRepairRequestors() {
+  let result = {};
+  for (let collection of Object.keys(REQUESTORS)) {
+    let ctor = _getRepairConstructor(REQUESTORS, collection);
+    result[collection] = new ctor();
+  }
+  return result;
+}
+
+function getRepairResponder(collection) {
+  let ctor = _getRepairConstructor(RESPONDERS, collection);
+  if (!ctor) {
+    return null;
+  }
+  return new ctor();
+}
+
+// The abstract classes.
+class CollectionRepairRequestor {
+  constructor(service = null) {
+    // allow service to be mocked in tests.
+    this.service = service || Weave.Service;
+  }
+
+  /* See if the repairer is willing and able to begin a repair process given
+     the specified validation information.
+     Returns true if a repair was started and false otherwise.
+
+     @param   validationInfo       {Object}
+              The validation info as returned by the collection's validator.
+
+     @param   flowID               {String}
+              A guid that uniquely identifies this repair process for this
+              collection, and which should be sent to any requestors and
+              reported in telemetry.
+
+  */
+  startRepairs(validationInfo, flowID) {
+    throw new Error("not implemented");
+  }
+
+  /* Work out what state our current repair request is in, and whether it can
+     proceed to a new state.
+     Returns true if we could continue the repair - even if the state didn't
+     actually move. Returns false if we aren't actually repairing.
+
+     @param   responseInfo       {Object}
+              An optional response to a previous repair request, as returned
+              by a remote repair responder.
+
+  */
+  continueRepairs(responseInfo = null) {
+    throw new Error("not implemented");
+  }
+}
+
+class CollectionRepairResponder {
+  constructor(service = null) {
+    // allow service to be mocked in tests.
+    this.service = service || Weave.Service;
+  }
+
+  /* Take some action in response to a repair request. Returns a promise that
+     resolves once the repair process has started, or rejects if there
+     was an error starting the repair.
+
+     Note that when the promise resolves the repair is not yet complete - at
+     some point in the future the repair will auto-complete, at which time
+     |rawCommand| will be removed from the list of client commands for this
+     client.
+
+     @param   request       {Object}
+              The repair request as sent by another client.
+
+     @param   rawCommand    {Object}
+              The command object as stored in the clients engine, and which
+              will be automatically removed once a repair completes.
+  */
+  async repair(request, rawCommand) {
+    throw new Error("not implemented");
+  }
+}
--- a/services/sync/modules/engines/clients.js
+++ b/services/sync/modules/engines/clients.js
@@ -34,16 +34,22 @@ Cu.import("resource://services-sync/engi
 Cu.import("resource://services-sync/record.js");
 Cu.import("resource://services-sync/resource.js");
 Cu.import("resource://services-sync/util.js");
 Cu.import("resource://gre/modules/Services.jsm");
 
 XPCOMUtils.defineLazyModuleGetter(this, "fxAccounts",
   "resource://gre/modules/FxAccounts.jsm");
 
+XPCOMUtils.defineLazyModuleGetter(this, "getRepairRequestor",
+  "resource://services-sync/collection_repair.js");
+
+XPCOMUtils.defineLazyModuleGetter(this, "getRepairResponder",
+  "resource://services-sync/collection_repair.js");
+
 const CLIENTS_TTL = 1814400; // 21 days
 const CLIENTS_TTL_REFRESH = 604800; // 7 days
 const STALE_CLIENT_REMOTE_AGE = 604800; // 7 days
 
 const SUPPORTED_PROTOCOL_VERSIONS = ["1.1", "1.5"];
 
 function hasDupeCommand(commands, action) {
   if (!commands) {
@@ -95,19 +101,23 @@ ClientEngine.prototype = {
     Svc.Prefs.set(this.name + ".lastRecordUpload", Math.floor(value));
   },
 
   get remoteClients() {
     // return all non-stale clients for external consumption.
     return Object.values(this._store._remoteClients).filter(v => !v.stale);
   },
 
-  remoteClientExists(id) {
+  remoteClient(id) {
     let client = this._store._remoteClients[id];
-    return !!(client && !client.stale);
+    return client && !client.stale ? client : null;
+  },
+
+  remoteClientExists(id) {
+    return !!this.remoteClient(id);
   },
 
   // Aggregate some stats on the composition of clients on this account
   get stats() {
     let stats = {
       hasMobile: this.localType == DEVICE_TYPE_MOBILE,
       names: [this.localName],
       numClients: 1,
@@ -245,16 +255,30 @@ ClientEngine.prototype = {
     delete this._currentlySyncingCommands;
     Async.promiseSpinningly(
       Utils.jsonRemove("commands-syncing", this).catch(err => {
         this._log.error("Failed to delete syncing-commands file", err);
       })
     );
   },
 
+  // Gets commands for a client we are yet to write to the server. Doesn't
+  // include commands for that client which are already on the server.
+  // We should rename this!
+  getClientCommands(clientId) {
+    const allCommands = this._readCommands();
+    return allCommands[clientId] || [];
+  },
+
+  removeLocalCommand(command) {
+    // the implementation of this engine is such that adding a command to
+    // the local client is how commands are deleted! ¯\_(ツ)_/¯
+    this._addClientCommand(this.localID, command);
+  },
+
   _addClientCommand(clientId, command) {
     const allCommands = this._readCommands();
     const clientCommands = allCommands[clientId] || [];
     if (hasDupeCommand(clientCommands, command)) {
       return false;
     }
     allCommands[clientId] = clientCommands.concat(command);
     this._saveCommands(allCommands);
@@ -484,16 +508,18 @@ ClientEngine.prototype = {
    */
   _commands: {
     resetAll:    { args: 0, desc: "Clear temporary local data for all engines" },
     resetEngine: { args: 1, desc: "Clear temporary local data for engine" },
     wipeAll:     { args: 0, desc: "Delete all client data for all engines" },
     wipeEngine:  { args: 1, desc: "Delete all client data for engine" },
     logout:      { args: 0, desc: "Log out client" },
     displayURI:  { args: 3, desc: "Instruct a client to display a URI" },
+    repairRequest:  {args: 1, desc: "Instruct a client to initiate a repair"},
+    repairResponse: {args: 1, desc: "Instruct a client a repair request is complete"},
   },
 
   /**
    * Sends a command+args pair to a specific client.
    *
    * @param command Command string
    * @param args Array of arguments/data for command
    * @param clientId Client to send command to
@@ -538,22 +564,23 @@ ClientEngine.prototype = {
   processIncomingCommands: function processIncomingCommands() {
     return this._notify("clients:process-commands", "", function() {
       if (!this.localCommands) {
         return true;
       }
 
       const clearedCommands = this._readCommands()[this.localID];
       const commands = this.localCommands.filter(command => !hasDupeCommand(clearedCommands, command));
-
+      let didRemoveCommand = false;
       let URIsToDisplay = [];
       // Process each command in order.
       for (let rawCommand of commands) {
+        let shouldRemoveCommand = true; // most commands are auto-removed.
         let {command, args, flowID} = rawCommand;
-        this._log.debug("Processing command: " + command + "(" + args + ")");
+        this._log.debug("Processing command " + command, args);
 
         this.service.recordTelemetryEvent("processcommand", command, undefined,
                                           { flowID });
 
         let engines = [args[0]];
         switch (command) {
           case "resetAll":
             engines = null;
@@ -569,24 +596,75 @@ ClientEngine.prototype = {
             break;
           case "logout":
             this.service.logout();
             return false;
           case "displayURI":
             let [uri, clientId, title] = args;
             URIsToDisplay.push({ uri, clientId, title });
             break;
+          case "repairResponse": {
+            // When we send a repair request to another device that understands
+            // it, that device will send a response indicating what it did.
+            let response = args[0];
+            let requestor = getRepairRequestor(response.collection);
+            if (!requestor) {
+              this._log.warn("repairResponse for unknown collection", response);
+              break;
+            }
+            if (!requestor.continueRepairs(response)) {
+              this._log.warn("repairResponse couldn't continue the repair", response);
+            }
+            break;
+          }
+          case "repairRequest": {
+            // Another device has sent us a request to make some repair.
+            let request = args[0];
+            let responder = getRepairResponder(request.collection);
+            if (!responder) {
+              this._log.warn("repairRequest for unknown collection", request);
+              break;
+            }
+            try {
+              if (Async.promiseSpinningly(responder.repair(request, rawCommand))) {
+                // We've started a repair - once that collection has synced it
+                // will write a "response" command and arrange for this repair
+                // request to be removed from the local command list - if we
+                // removed it now we might fail to write a response in cases of
+                // premature shutdown etc.
+                shouldRemoveCommand = false;
+              }
+            } catch (ex) {
+              if (Async.isShutdownException(ex)) {
+                // Let's assume this error was caused by the shutdown, so let
+                // it try again next time.
+                throw ex;
+              }
+              // otherwise there are no second chances - the command is removed
+              // and will not be tried again.
+              // (Note that this shouldn't be hit in the normal case - it's
+              // expected the responder will handle all reasonable failures and
+              // write a response indicating that it couldn't do what was asked.)
+              this._log.error("Failed to handle a repair request", ex);
+            }
+            break;
+          }
           default:
-            this._log.debug("Received an unknown command: " + command);
+            this._log.warn("Received an unknown command: " + command);
             break;
         }
         // Add the command to the "cleared" commands list
-        this._addClientCommand(this.localID, rawCommand)
+        if (shouldRemoveCommand) {
+          this.removeLocalCommand(rawCommand);
+          didRemoveCommand = true;
+        }
       }
-      this._tracker.addChangedID(this.localID);
+      if (didRemoveCommand) {
+        this._tracker.addChangedID(this.localID);
+      }
 
       if (URIsToDisplay.length) {
         this._handleDisplayURIs(URIsToDisplay);
       }
 
       return true;
     })();
   },
--- a/services/sync/moz.build
+++ b/services/sync/moz.build
@@ -16,16 +16,17 @@ EXTRA_COMPONENTS += [
     'Weave.js',
 ]
 
 EXTRA_JS_MODULES['services-sync'] += [
     'modules/addonsreconciler.js',
     'modules/addonutils.js',
     'modules/bookmark_validator.js',
     'modules/browserid_identity.js',
+    'modules/collection_repair.js',
     'modules/collection_validator.js',
     'modules/engines.js',
     'modules/keys.js',
     'modules/main.js',
     'modules/policies.js',
     'modules/record.js',
     'modules/resource.js',
     'modules/rest.js',
--- a/tools/lint/eslint/modules.json
+++ b/tools/lint/eslint/modules.json
@@ -30,16 +30,17 @@
   "clients.js": ["ClientEngine", "ClientsRec"],
   "CloudSyncAdapters.jsm": ["Adapters"],
   "CloudSyncBookmarks.jsm": ["Bookmarks"],
   "CloudSyncBookmarksFolderCache.jsm": ["FolderCache"],
   "CloudSyncEventSource.jsm": ["EventSource"],
   "CloudSyncLocal.jsm": ["Local"],
   "CloudSyncPlacesWrapper.jsm": ["PlacesWrapper"],
   "CloudSyncTabs.jsm": ["Tabs"],
+  "collection_repair.js": ["getRepairRequestor", "getAllRepairRequestors", "CollectionRepairRequestor", "getRepairResponder", "CollectionRepairResponder"],
   "collection_validator.js": ["CollectionValidator", "CollectionProblemData"],
   "Console.jsm": ["console", "ConsoleAPI"],
   "constants.js": ["WEAVE_VERSION", "SYNC_API_VERSION", "USER_API_VERSION", "MISC_API_VERSION", "STORAGE_VERSION", "PREFS_BRANCH", "PWDMGR_HOST", "PWDMGR_PASSWORD_REALM", "PWDMGR_PASSPHRASE_REALM", "PWDMGR_KEYBUNDLE_REALM", "DEFAULT_KEYBUNDLE_NAME", "HMAC_INPUT", "SYNC_KEY_ENCODED_LENGTH", "SYNC_KEY_DECODED_LENGTH", "SYNC_KEY_HYPHENATED_LENGTH", "NO_SYNC_NODE_INTERVAL", "MAX_ERROR_COUNT_BEFORE_BACKOFF", "MAX_IGNORE_ERROR_COUNT", "MINIMUM_BACKOFF_INTERVAL", "MAXIMUM_BACKOFF_INTERVAL", "HMAC_EVENT_INTERVAL", "MASTER_PASSWORD_LOCKED_RETRY_INTERVAL", "DEFAULT_BLOCK_PERIOD", "MOBILE_BATCH_SIZE", "DEFAULT_GUID_FETCH_BATCH_SIZE", "DEFAULT_MOBILE_GUID_FETCH_BATCH_SIZE", "DEFAULT_STORE_BATCH_SIZE", "HISTORY_STORE_BATCH_SIZE", "FORMS_STORE_BATCH_SIZE", "PASSWORDS_STORE_BATCH_SIZE", "ADDONS_STORE_BATCH_SIZE", "APPS_STORE_BATCH_SIZE", "DEFAULT_DOWNLOAD_BATCH_SIZE", "SINGLE_USER_THRESHOLD", "MULTI_DEVICE_THRESHOLD", "SCORE_INCREMENT_SMALL", "SCORE_INCREMENT_MEDIUM", "SCORE_INCREMENT_XLARGE", "SCORE_UPDATE_DELAY", "IDLE_OBSERVER_BACK_DELAY", "MAX_UPLOAD_RECORDS", "MAX_UPLOAD_BYTES", "MAX_HISTORY_UPLOAD", "MAX_HISTORY_DOWNLOAD", "NOTIFY_TAB_SENT_TTL_SECS", "STATUS_OK", "SYNC_FAILED", "LOGIN_FAILED", "SYNC_FAILED_PARTIAL", "CLIENT_NOT_CONFIGURED", "STATUS_DISABLED", "MASTER_PASSWORD_LOCKED", "LOGIN_SUCCEEDED", "SYNC_SUCCEEDED", "ENGINE_SUCCEEDED", "LOGIN_FAILED_NO_USERNAME", "LOGIN_FAILED_NO_PASSWORD", "LOGIN_FAILED_NO_PASSPHRASE", "LOGIN_FAILED_NETWORK_ERROR", "LOGIN_FAILED_SERVER_ERROR", "LOGIN_FAILED_INVALID_PASSPHRASE", "LOGIN_FAILED_LOGIN_REJECTED", "METARECORD_DOWNLOAD_FAIL", "VERSION_OUT_OF_DATE", "DESKTOP_VERSION_OUT_OF_DATE", "SETUP_FAILED_NO_PASSPHRASE", "CREDENTIALS_CHANGED", "ABORT_SYNC_COMMAND", "NO_SYNC_NODE_FOUND", "OVER_QUOTA", "PROLONGED_SYNC_FAILURE", "SERVER_MAINTENANCE", "RESPONSE_OVER_QUOTA", "ENGINE_UPLOAD_FAIL", "ENGINE_DOWNLOAD_FAIL", "ENGINE_UNKNOWN_FAIL", "ENGINE_APPLY_FAIL", "ENGINE_METARECORD_DOWNLOAD_FAIL", "ENGINE_METARECORD_UPLOAD_FAIL", "ENGINE_BATCH_INTERRUPTED", "JPAKE_ERROR_CHANNEL", "JPAKE_ERROR_NETWORK", "JPAKE_ERROR_SERVER", "JPAKE_ERROR_TIMEOUT", "JPAKE_ERROR_INTERNAL", "JPAKE_ERROR_INVALID", "JPAKE_ERROR_NODATA", "JPAKE_ERROR_KEYMISMATCH", "JPAKE_ERROR_WRONGMESSAGE", "JPAKE_ERROR_USERABORT", "JPAKE_ERROR_DELAYUNSUPPORTED", "INFO_COLLECTIONS", "INFO_COLLECTION_USAGE", "INFO_COLLECTION_COUNTS", "INFO_QUOTA", "kSyncMasterPasswordLocked", "kSyncWeaveDisabled", "kSyncNetworkOffline", "kSyncBackoffNotMet", "kFirstSyncChoiceNotMade", "FIREFOX_ID", "FENNEC_ID", "SEAMONKEY_ID", "TEST_HARNESS_ID", "MIN_PP_LENGTH", "MIN_PASS_LENGTH", "DEVICE_TYPE_DESKTOP", "DEVICE_TYPE_MOBILE", "SQLITE_MAX_VARIABLE_NUMBER"],
   "Constants.jsm": ["Roles", "Events", "Relations", "Filters", "States", "Prefilters"],
   "ContactDB.jsm": ["ContactDB", "DB_NAME", "STORE_NAME", "SAVED_GETALL_STORE_NAME", "REVISION_STORE", "DB_VERSION"],
   "content-server.jsm": ["init"],
   "content.jsm": ["registerContentFrame"],
   "ContentCrashHandlers.jsm": ["TabCrashHandler", "PluginCrashReporter", "UnsubmittedCrashHandler"],