browser/components/readinglist/Sync.jsm
author Mark Hammond <mhammond@skippinet.com.au>
Thu, 16 Apr 2015 16:16:10 +1000
changeset 258318 36b7439df64e235156adf136845df6526df17530
parent 258317 cf459cec404b72140102a05d567a3b76a52fa5f8
permissions -rw-r--r--
Bug 1153121 - have the readinglist sync engine log existing local items if trace logging is enabled. r=adw

/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this file,
 * You can obtain one at http://mozilla.org/MPL/2.0/. */

"use strict";

this.EXPORTED_SYMBOLS = [
  "Sync",
];

const { classes: Cc, interfaces: Ci, utils: Cu } = Components;

Cu.import("resource://gre/modules/Log.jsm");
Cu.import("resource://gre/modules/Task.jsm");
Cu.import("resource://gre/modules/XPCOMUtils.jsm");

XPCOMUtils.defineLazyModuleGetter(this, "Preferences",
  "resource://gre/modules/Preferences.jsm");
XPCOMUtils.defineLazyModuleGetter(this, "ReadingList",
  "resource:///modules/readinglist/ReadingList.jsm");
XPCOMUtils.defineLazyModuleGetter(this, "ServerClient",
  "resource:///modules/readinglist/ServerClient.jsm");

// The maximum number of sub-requests per POST /batch supported by the server.
// See http://readinglist.readthedocs.org/en/latest/api/batch.html.
const BATCH_REQUEST_LIMIT = 25;

// The Last-Modified header of server responses is stored here.
const SERVER_LAST_MODIFIED_HEADER_PREF = "readinglist.sync.serverLastModified";

// Maps local record properties to server record properties.
const SERVER_PROPERTIES_BY_LOCAL_PROPERTIES = {
  guid: "id",
  serverLastModified: "last_modified",
  url: "url",
  preview: "preview",
  title: "title",
  resolvedURL: "resolved_url",
  resolvedTitle: "resolved_title",
  excerpt: "excerpt",
  archived: "archived",
  deleted: "deleted",
  favorite: "favorite",
  isArticle: "is_article",
  wordCount: "word_count",
  unread: "unread",
  addedBy: "added_by",
  addedOn: "added_on",
  storedOn: "stored_on",
  markedReadBy: "marked_read_by",
  markedReadOn: "marked_read_on",
  readPosition: "read_position",
};

// Local record properties that can be uploaded in new items.
const NEW_RECORD_PROPERTIES = `
  url
  title
  resolvedURL
  resolvedTitle
  excerpt
  favorite
  isArticle
  wordCount
  unread
  addedBy
  addedOn
  markedReadBy
  markedReadOn
  readPosition
  preview
`.trim().split(/\s+/);

// Local record properties that can be uploaded in changed items.
const MUTABLE_RECORD_PROPERTIES = `
  title
  resolvedURL
  resolvedTitle
  excerpt
  favorite
  isArticle
  wordCount
  unread
  markedReadBy
  markedReadOn
  readPosition
  preview
`.trim().split(/\s+/);

let log = Log.repository.getLogger("readinglist.sync");


/**
 * An object that syncs reading list state with a server.  To sync, make a new
 * SyncImpl object and then call start() on it.
 *
 * @param readingList The ReadingList to sync.
 */
function SyncImpl(readingList) {
  this.list = readingList;
  this._client = new ServerClient();
}

/**
 * This implementation uses the sync algorithm described here:
 * https://github.com/mozilla-services/readinglist/wiki/Client-phases
 * The "phases" mentioned in the methods below refer to the phases in that
 * document.
 */
SyncImpl.prototype = {

  /**
   * Starts sync, if it's not already started.
   *
   * @return Promise<null> this.promise, i.e., a promise that will be resolved
   *         when sync completes, rejected on error.
   */
  start() {
    if (!this.promise) {
      this.promise = Task.spawn(function* () {
        try {
          yield this._start();
        } finally {
          delete this.promise;
        }
      }.bind(this));
    }
    return this.promise;
  },

  /**
   * A Promise<null> that will be non-null when sync is ongoing.  Resolved when
   * sync completes, rejected on error.
   */
  promise: null,

  /**
   * See the document linked above that describes the sync algorithm.
   */
  _start: Task.async(function* () {
    log.info("Starting sync");
    yield this._logDiagnostics();
    yield this._uploadStatusChanges();
    yield this._uploadNewItems();
    yield this._uploadDeletedItems();
    yield this._downloadModifiedItems();

    // TODO: "Repeat [this phase] until no conflicts occur," says the doc.
    yield this._uploadMaterialChanges();

    log.info("Sync done");
  }),

  /**
   * Phase 0 - for debugging we log some stuff about the local store before
   * we start syncing.
   * We only do this when the log level is "Trace" or lower as the info (a)
   * may be expensive to generate, (b) generate alot of output and (c) may
   * contain private information.
   */
  _logDiagnostics: Task.async(function* () {
    // Sadly our log is likely to have Log.Level.All, so loop over our
    // appenders looking for the effective level.
    let smallestLevel = log.appenders.reduce(
      (prev, appender) => Math.min(prev, appender.level),
      Log.Level.Error);

    if (smallestLevel > Log.Level.Trace) {
      return;
    }

    let localItems = [];
    yield this.list.forEachItem(localItem => localItems.push(localItem));
    log.trace("Have " + localItems.length + " local item(s)");
    for (let localItem of localItems) {
      // We need to use .record so we get access to a couple of the "internal" fields.
      let record = localItem._record;
      let redacted = {};
      for (let attr of ["guid", "url", "resolvedURL", "serverLastModified", "syncStatus"]) {
        redacted[attr] = record[attr];
      }
      log.trace(JSON.stringify(redacted));
    }
    // and the GUIDs of deleted items.
    let deletedGuids = []
    yield this.list.forEachSyncedDeletedGUID(guid => deletedGuids.push(guid));
    // This might be a huge line, but that's OK.
    log.trace("Have ${num} deleted item(s): ${deletedGuids}", {num: deletedGuids.length, deletedGuids});
  }),

  /**
   * Phase 1 part 1
   *
   * Uploads not-new items with status-only changes.  By design, status-only
   * changes will never conflict with what's on the server.
   */
  _uploadStatusChanges: Task.async(function* () {
    log.debug("Phase 1 part 1: Uploading status changes");
    yield this._uploadChanges(ReadingList.SyncStatus.CHANGED_STATUS,
                              ReadingList.SyncStatusProperties.STATUS);
  }),

  /**
   * There are two phases for uploading changed not-new items: one for items
   * with status-only changes, one for items with material changes.  The two
   * work similarly mechanically, and this method is a helper for both.
   *
   * @param syncStatus Local items matching this sync status will be uploaded.
   * @param localProperties An array of local record property names.  The
   *        uploaded item records will include only these properties.
   */
  _uploadChanges: Task.async(function* (syncStatus, localProperties) {
    // Get local items that match the given syncStatus.
    let requests = [];
    yield this.list.forEachItem(localItem => {
      requests.push({
        path: "/articles/" + localItem.guid,
        body: serverRecordFromLocalItem(localItem, localProperties),
      });
    }, { syncStatus: syncStatus });
    if (!requests.length) {
      log.debug("No local changes to upload");
      return;
    }

    // Send the request.
    let request = {
      body: {
        defaults: {
          method: "PATCH",
        },
        requests: requests,
      },
    };
    let batchResponse = yield this._postBatch(request);
    if (batchResponse.status != 200) {
      this._handleUnexpectedResponse(true, "uploading changes", batchResponse);
      return;
    }

    // Update local items based on the response.
    for (let response of batchResponse.body.responses) {
      if (response.status == 404) {
        // item deleted
        yield this._deleteItemForGUID(response.body.id);
        continue;
      }
      if (response.status == 409) {
        // "Conflict": A change violated a uniqueness constraint.  Mark the item
        // as having material changes, and reconcile and upload it in the
        // material-changes phase.
        // TODO
        continue;
      }
      if (response.status != 200) {
        this._handleUnexpectedResponse(false, "uploading a change", response);
        continue;
      }
      // Don't assume the local record and the server record aren't materially
      // different.  Reconcile the differences.
      // TODO

      let item = yield this._itemForGUID(response.body.id);
      yield this._updateItemWithServerRecord(item, response.body);
    }
  }),

  /**
   * Phase 1 part 2
   *
   * Uploads new items.
   */
  _uploadNewItems: Task.async(function* () {
    log.debug("Phase 1 part 2: Uploading new items");

    // Get new local items.
    let requests = [];
    yield this.list.forEachItem(localItem => {
      requests.push({
        body: serverRecordFromLocalItem(localItem, NEW_RECORD_PROPERTIES),
      });
    }, { syncStatus: ReadingList.SyncStatus.NEW });
    if (!requests.length) {
      log.debug("No new local items to upload");
      return;
    }

    // Send the request.
    let request = {
      body: {
        defaults: {
          method: "POST",
          path: "/articles",
        },
        requests: requests,
      },
    };
    let batchResponse = yield this._postBatch(request);
    if (batchResponse.status != 200) {
      this._handleUnexpectedResponse(true, "uploading new items", batchResponse);
      return;
    }

    // Update local items based on the response.
    for (let response of batchResponse.body.responses) {
      if (response.status == 303) {
        // "See Other": An item with the URL already exists.  Mark the item as
        // having material changes, and reconcile and upload it in the
        // material-changes phase.
        // TODO
        continue;
      }
      // Note that the server seems to return a 200 if an identical item already
      // exists, but we shouldn't be uploading identical items in this phase in
      // normal usage. But if something goes wrong locally (eg, we upload but
      // get some error even though the upload worked) we will see this.
      // So allow 200 but log a warning.
      if (response.status == 200) {
        log.debug("Attempting to upload a new item found the server already had it", response);
        // but we still process it.
      } else if (response.status != 201) {
        this._handleUnexpectedResponse(false, "uploading a new item", response);
        continue;
      }
      let item = yield this.list.itemForURL(response.body.url);
      yield this._updateItemWithServerRecord(item, response.body);
    }
  }),

  /**
   * Phase 1 part 3
   *
   * Uploads deleted synced items.
   */
  _uploadDeletedItems: Task.async(function* () {
    log.debug("Phase 1 part 3: Uploading deleted items");

    // Get deleted synced local items.
    let requests = [];
    yield this.list.forEachSyncedDeletedGUID(guid => {
      requests.push({
        path: "/articles/" + guid,
      });
    });
    if (!requests.length) {
      log.debug("No local deleted synced items to upload");
      return;
    }

    // Send the request.
    let request = {
      body: {
        defaults: {
          method: "DELETE",
        },
        requests: requests,
      },
    };
    let batchResponse = yield this._postBatch(request);
    if (batchResponse.status != 200) {
      this._handleUnexpectedResponse(true, "uploading deleted items", batchResponse);
      return;
    }

    // Delete local items based on the response.
    for (let response of batchResponse.body.responses) {
      // A 404 means the item was already deleted on the server, which is OK.
      // We still need to make sure it's deleted locally, though.
      if (response.status != 200 && response.status != 404) {
        this._handleUnexpectedResponse(false, "uploading a deleted item", response);
        continue;
      }
      yield this._deleteItemForGUID(response.body.id);
    }
  }),

  /**
   * Phase 2
   *
   * Downloads items that were modified since the last sync.
   */
  _downloadModifiedItems: Task.async(function* () {
    log.debug("Phase 2: Downloading modified items");

    // Get modified items from the server.
    let path = "/articles";
    if (this._serverLastModifiedHeader) {
      path += "?_since=" + this._serverLastModifiedHeader;
    }
    let request = {
      method: "GET",
      path: path,
    };
    let response = yield this._sendRequest(request);
    if (response.status != 200) {
      this._handleUnexpectedResponse(true, "downloading modified items", response);
      return;
    }

    // Update local items based on the response.
    for (let serverRecord of response.body.items) {
      if (serverRecord.deleted) {
        // _deleteItemForGUID is a no-op if no item exists with the GUID.
        yield this._deleteItemForGUID(serverRecord.id);
        continue;
      }
      let localItem = yield this._itemForGUID(serverRecord.id);
      if (localItem) {
        if (localItem.serverLastModified == serverRecord.last_modified) {
          // We just uploaded this item in the new-items phase.
          continue;
        }
        // The local item may have materially changed.  In that case, don't
        // overwrite the local changes with the server record.  Instead, mark
        // the item as having material changes and reconcile and upload it in
        // the material-changes phase.
        // TODO

        yield this._updateItemWithServerRecord(localItem, serverRecord);
        continue;
      }
      // A potentially new item.  addItem() will fail here when an item was
      // added to the local list between the time we uploaded new items and
      // now.
      let localRecord = localRecordFromServerRecord(serverRecord);
      try {
        yield this.list.addItem(localRecord);
      } catch (ex) {
        if (ex instanceof ReadingList.Error.Exists) {
          log.debug("Tried to add an item that already exists.");
        } else {
          log.error("Error adding an item from server record ${serverRecord} ${ex}",
                    { serverRecord, ex });
        }
      }
    }

    // Now that changes have been successfully applied, advance the server
    // last-modified timestamp so that next time we fetch items starting from
    // the current point.  Response header names are lowercase.
    if (response.headers && "last-modified" in response.headers) {
      this._serverLastModifiedHeader = response.headers["last-modified"];
    }
  }),

  /**
   * Phase 3 (material changes)
   *
   * Uploads not-new items with material changes.
   */
  _uploadMaterialChanges: Task.async(function* () {
    log.debug("Phase 3: Uploading material changes");
    yield this._uploadChanges(ReadingList.SyncStatus.CHANGED_MATERIAL,
                              MUTABLE_RECORD_PROPERTIES);
  }),

  /**
   * Gets the local ReadingListItem with the given GUID.
   *
   * @param guid The item's GUID.
   * @return The matching ReadingListItem.
   */
  _itemForGUID: Task.async(function* (guid) {
    return (yield this.list.item({ guid: guid }));
  }),

  /**
   * Updates the given local ReadingListItem with the given server record.  The
   * local item's sync status is updated to reflect the fact that the item has
   * been synced and is up to date.
   *
   * @param item A local ReadingListItem.
   * @param serverRecord A server record representing the item.
   */
  _updateItemWithServerRecord: Task.async(function* (localItem, serverRecord) {
    if (!localItem) {
      // The item may have been deleted from the local list between the time we
      // saw that it needed updating and now.
      log.debug("Tried to update a null local item from server record",
                serverRecord);
      return;
    }
    localItem._record = localRecordFromServerRecord(serverRecord);
    try {
      yield this.list.updateItem(localItem);
    } catch (ex) {
      // The item may have been deleted from the local list after we fetched it.
      if (ex instanceof ReadingList.Error.Deleted) {
        log.debug("Tried to update an item that was deleted from server record",
                  serverRecord);
      } else {
        log.error("Error updating an item from server record ${serverRecord} ${ex}",
                  { serverRecord, ex });
      }
    }
  }),

  /**
   * Truly deletes the local ReadingListItem with the given GUID.
   *
   * @param guid The item's GUID.
   */
  _deleteItemForGUID: Task.async(function* (guid) {
    let item = yield this._itemForGUID(guid);
    if (item) {
      // If item is non-null, then it hasn't been deleted locally.  Therefore
      // it's important to delete it through its list so that the list and its
      // consumers are notified properly.  Set the syncStatus to NEW so that the
      // list truly deletes the item.
      item._record.syncStatus = ReadingList.SyncStatus.NEW;
      try {
        yield this.list.deleteItem(item);
      } catch (ex) {
        log.error("Failed delete local item with id ${guid} ${ex}",
                  { guid, ex });
      }
      return;
    }
    // If item is null, then it may not actually exist locally, or it may have
    // been synced and then deleted so that it's marked as being deleted.  In
    // that case, try to delete it directly from the store.  As far as the list
    // is concerned, the item has already been deleted.
    log.debug("Item not present in list, deleting it by GUID instead");
    try {
      this.list._store.deleteItemByGUID(guid);
    } catch (ex) {
      log.error("Failed to delete local item with id ${guid} ${ex}",
                { guid, ex });
    }
  }),

  /**
   * Sends a request to the server.
   *
   * @param req The request object: { method, path, body, headers }.
   * @return Promise<response> Resolved with the server's response object:
   *         { status, body, headers }.
   */
  _sendRequest: Task.async(function* (req) {
    log.debug("Sending request", req);
    let response = yield this._client.request(req);
    log.debug("Received response", response);
    return response;
  }),

  /**
   * The server limits the number of sub-requests in POST /batch'es to
   * BATCH_REQUEST_LIMIT.  This method takes an arbitrarily big batch request
   * and breaks it apart into many individual batch requests in order to stay
   * within the limit.
   *
   * @param bigRequest The same type of request object that _sendRequest takes.
   *        Since it's a POST /batch request, its `body` should have a
   *        `requests` property whose value is an array of sub-requests.
   *        `method` and `path` are automatically filled.
   * @return Promise<response> Resolved when all requests complete with 200s, or
   *         when the first response that is not a 200 is received.  In the
   *         first case, the resolved response is a combination of all the
   *         server responses, and response.body.responses contains the sub-
   *         responses for all the sub-requests in bigRequest.  In the second
   *         case, the resolved response is the non-200 response straight from
   *         the server.
   */
  _postBatch: Task.async(function* (bigRequest) {
    log.debug("Sending batch requests");
    let allSubResponses = [];
    let remainingSubRequests = bigRequest.body.requests;
    while (remainingSubRequests.length) {
      let request = Object.assign({}, bigRequest);
      request.method = "POST";
      request.path = "/batch";
      request.body.requests =
        remainingSubRequests.splice(0, BATCH_REQUEST_LIMIT);
      let response = yield this._sendRequest(request);
      if (response.status != 200) {
        return response;
      }
      allSubResponses = allSubResponses.concat(response.body.responses);
    }
    let bigResponse = {
      status: 200,
      body: {
        responses: allSubResponses,
      },
    };
    log.debug("All batch requests successfully sent");
    return bigResponse;
  }),

  _handleUnexpectedResponse(isTopLevel, contextMsgFragment, response) {
    log.error(`Unexpected response ${contextMsgFragment}`, response);
    // We want to throw in some cases so the sync engine knows there was an
    // error and retries using the error schedule. 401 implies an auth issue
    // (possibly transient, possibly not) - but things like 404 might just
    // relate to a single item and need not throw.  Any 5XX implies a
    // (hopefully transient) server error.
    if (isTopLevel && (response.status == 401 || response.status >= 500)) {
      throw new Error("Sync aborted due to " + response.status + " server response.");
    }
  },

  // TODO: Wipe this pref when user logs out.
  get _serverLastModifiedHeader() {
    if (!("__serverLastModifiedHeader" in this)) {
      this.__serverLastModifiedHeader =
        Preferences.get(SERVER_LAST_MODIFIED_HEADER_PREF, undefined);
    }
    return this.__serverLastModifiedHeader;
  },
  set _serverLastModifiedHeader(val) {
    this.__serverLastModifiedHeader = val;
    Preferences.set(SERVER_LAST_MODIFIED_HEADER_PREF, val);
  },
};


/**
 * Translates a local ReadingListItem into a server record.
 *
 * @param localItem The local ReadingListItem.
 * @param localProperties An array of local item property names.  Only these
 *        properties will be included in the server record.
 * @return The server record.
 */
function serverRecordFromLocalItem(localItem, localProperties) {
  let serverRecord = {};
  for (let localProp of localProperties) {
    let serverProp = SERVER_PROPERTIES_BY_LOCAL_PROPERTIES[localProp];
    if (localProp in localItem._record) {
      serverRecord[serverProp] = localItem._record[localProp];
    }
  }
  return serverRecord;
}

/**
 * Translates a server record into a local record.  The returned local record's
 * syncStatus will reflect the fact that the local record is up-to-date synced.
 *
 * @param serverRecord The server record.
 * @return The local record.
 */
function localRecordFromServerRecord(serverRecord) {
  let localRecord = {
    // Mark the record as being up-to-date synced.
    syncStatus: ReadingList.SyncStatus.SYNCED,
  };
  for (let localProp in SERVER_PROPERTIES_BY_LOCAL_PROPERTIES) {
    let serverProp = SERVER_PROPERTIES_BY_LOCAL_PROPERTIES[localProp];
    if (serverProp in serverRecord) {
      localRecord[localProp] = serverRecord[serverProp];
    }
  }
  return localRecord;
}

Object.defineProperty(this, "Sync", {
  get() {
    if (!this._singleton) {
      this._singleton = new SyncImpl(ReadingList);
    }
    return this._singleton;
  },
});