Bug 739416 - Send commands. r=rnewman
authorRichard Newman <rnewman@mozilla.com>
Fri, 08 Jun 2012 17:09:54 -0700
changeset 96200 b0d55b8d2f5d36859aa91806f87b041ea8360850
parent 96199 8a73ed10f75edf68a0f1303f8615044975c9f278
child 96201 7e3e46c65fee733169018ece4202a9d02c8aaa66
push id10452
push userrnewman@mozilla.com
push dateSat, 09 Jun 2012 00:11:50 +0000
treeherdermozilla-inbound@7e3e46c65fee [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersrnewman
bugs739416
milestone16.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 739416 - Send commands. r=rnewman
mobile/android/base/sync/CommandProcessor.java
mobile/android/base/sync/CommandRunner.java
mobile/android/base/sync/GlobalSession.java
mobile/android/base/sync/net/SyncStorageRecordRequest.java
mobile/android/base/sync/repositories/android/ClientsDatabase.java
mobile/android/base/sync/repositories/android/ClientsDatabaseAccessor.java
mobile/android/base/sync/repositories/domain/ClientRecord.java
mobile/android/base/sync/stage/SyncClientsEngineStage.java
--- a/mobile/android/base/sync/CommandProcessor.java
+++ b/mobile/android/base/sync/CommandProcessor.java
@@ -1,128 +1,216 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
 import org.mozilla.gecko.R;
+import org.mozilla.gecko.sync.repositories.NullCursorException;
+import org.mozilla.gecko.sync.repositories.android.ClientsDatabaseAccessor;
+import org.mozilla.gecko.sync.repositories.domain.ClientRecord;
 
 import android.app.Notification;
 import android.app.NotificationManager;
 import android.app.PendingIntent;
 import android.content.Context;
 import android.content.Intent;
 import android.net.Uri;
 
 public class CommandProcessor {
   private static final String LOG_TAG = "Command";
   private static AtomicInteger currentId = new AtomicInteger();
+  private GlobalSession session;
   protected ConcurrentHashMap<String, CommandRunner> commands = new ConcurrentHashMap<String, CommandRunner>();
 
   private final static CommandProcessor processor = new CommandProcessor();
 
   public static CommandProcessor getProcessor() {
     return processor;
   }
 
   public static class Command {
     public final String commandType;
-    public final List<String> args;
+    public final JSONArray args;
+    private List<String> argsList;
 
-    public Command(String commandType, List<String> args) {
+    public Command(String commandType, JSONArray args) {
       this.commandType = commandType;
       this.args = args;
     }
+
+    public synchronized List<String> getArgsList() {
+      if (argsList == null) {
+        ArrayList<String> argsList = new ArrayList<String>(args.size());
+
+        for (int i = 0; i < args.size(); i++) {
+          argsList.add(args.get(i).toString());
+        }
+        this.argsList = argsList;
+      }
+      return this.argsList;
+    }
+
+    @SuppressWarnings("unchecked")
+    public JSONObject asJSONObject() {
+      JSONObject out = new JSONObject();
+      out.put("command", this.commandType);
+      out.put("args", this.args);
+      return out;
+    }
   }
 
   public void registerCommand(String commandType, CommandRunner command) {
     commands.put(commandType, command);
   }
 
+  public void registerSession(GlobalSession session) {
+    this.session = session;
+  }
+
   public void processCommand(ExtendedJSONObject unparsedCommand) {
     Command command = parseCommand(unparsedCommand);
     if (command == null) {
       Logger.debug(LOG_TAG, "Invalid command: " + unparsedCommand + " will not be processed.");
       return;
     }
 
     CommandRunner executableCommand = commands.get(command.commandType);
     if (executableCommand == null) {
       Logger.debug(LOG_TAG, "Command \"" + command.commandType + "\" not registered and will not be processed.");
       return;
     }
 
-    executableCommand.executeCommand(command.args);
+    executableCommand.executeCommand(command.getArgsList());
   }
 
   /**
    * Parse a JSON command into a ParsedCommand object for easier handling.
    *
    * @param unparsedCommand - command as ExtendedJSONObject
    * @return - null if command is invalid, else return ParsedCommand with
    *           no null attributes.
    */
-  protected Command parseCommand(ExtendedJSONObject unparsedCommand) {
+  protected static Command parseCommand(ExtendedJSONObject unparsedCommand) {
     String type = (String) unparsedCommand.get("command");
     if (type == null) {
       return null;
     }
 
     try {
       JSONArray unparsedArgs = unparsedCommand.getArray("args");
       if (unparsedArgs == null) {
         return null;
       }
-      ArrayList<String> args = new ArrayList<String>(unparsedArgs.size());
 
-      for (int i = 0; i < unparsedArgs.size(); i++) {
-        args.add(unparsedArgs.get(i).toString());
-      }
-
-      return new Command(type, args);
+      return new Command(type, unparsedArgs);
     } catch (NonArrayJSONException e) {
       Logger.debug(LOG_TAG, "Unable to parse args array. Invalid command");
       return null;
     }
   }
 
-  public void displayURI(List<String> args, Context context) {
-    // These two args are guaranteed to exist by trusting the client sender.
-    String uri = args.get(0);
-    String clientId = args.get(1);
+  /**
+   * Validates and sends a command to a client or all clients.
+   *
+   * Calling this does not actually sync the command data to the server. If the
+   * client already has the command/args pair, it won't receive a duplicate
+   * command.
+   *
+   * @param clientID
+   *        Client ID to send command to. If null, send to all remote
+   *        clients.
+   * @param command
+   *        Command to invoke on remote clients
+   */
+  public void sendCommand(String clientID, Command command) {
+    Logger.debug(LOG_TAG, "In sendCommand.");
+
+    CommandRunner commandData = commands.get(command.commandType);
+
+    // Don't send commands that we don't know about.
+    if (commandData == null) {
+      Logger.error(LOG_TAG, "Unknown command to send: " + command);
+      return;
+    }
+
+    // Don't send a command with the wrong number of arguments.
+    if (!commandData.argumentsAreValid(command.getArgsList())) {
+      Logger.error(LOG_TAG, "Expected " + commandData.argCount + " args for '" +
+                   command + "', but got " + command.args);
+      return;
+    }
 
-    Logger.info(LOG_TAG, "Received a URI for display: " + uri + " from " + clientId);
+    if (clientID != null) {
+      this.sendCommandToClient(clientID, command);
+      return;
+    }
+
+    ClientsDatabaseAccessor db = new ClientsDatabaseAccessor(session.getContext());
+    try {
+      Map<String, ClientRecord> clientMap = db.fetchAllClients();
+      for (ClientRecord client : clientMap.values()) {
+        this.sendCommandToClient(client.guid, command);
+      }
+    } catch (NullCursorException e) {
+      Logger.error(LOG_TAG, "NullCursorException when fetching all GUIDs");
+    } finally {
+      db.close();
+    }
+  }
+
+  protected void sendCommandToClient(String clientID, Command command) {
+    Logger.info(LOG_TAG, "Sending " + command.commandType + " to " + clientID);
+
+    ClientsDatabaseAccessor db = new ClientsDatabaseAccessor(session.getContext());
+    try {
+      db.store(clientID, command);
+    } catch (NullCursorException e) {
+      Logger.error(LOG_TAG, "NullCursorException: Unable to send command.");
+    } finally {
+      db.close();
+    }
+  }
+
+  public static void displayURI(final List<String> args, final Context context) {
+    // We trust the client sender that these exist.
+    final String uri = args.get(0);
+    final String clientId = args.get(1);
+
+    Logger.pii(LOG_TAG, "Received a URI for display: " + uri + " from " + clientId);
 
     String title = null;
     if (args.size() == 3) {
       title = args.get(2);
     }
 
-    // Get NotificationManager.
-    String ns = Context.NOTIFICATION_SERVICE;
-    NotificationManager mNotificationManager = (NotificationManager)context.getSystemService(ns);
+    final String ns = Context.NOTIFICATION_SERVICE;
+    final NotificationManager notificationManager = (NotificationManager) context.getSystemService(ns);
 
-    // Create a Notficiation.
-    int icon = R.drawable.sync_ic_launcher;
+    // Create a Notificiation.
+    final int icon = R.drawable.sync_ic_launcher;
     String notificationTitle = context.getString(R.string.sync_new_tab);
     if (title != null) {
       notificationTitle = notificationTitle.concat(": " + title);
     }
-    long when = System.currentTimeMillis();
+
+    final long when = System.currentTimeMillis();
     Notification notification = new Notification(icon, notificationTitle, when);
     notification.flags = Notification.FLAG_AUTO_CANCEL;
 
     // Set pending intent associated with the notification.
     Intent notificationIntent = new Intent(Intent.ACTION_VIEW, Uri.parse(uri));
     PendingIntent contentIntent = PendingIntent.getActivity(context, 0, notificationIntent, 0);
     notification.setLatestEventInfo(context, notificationTitle, uri, contentIntent);
 
     // Send notification.
-    mNotificationManager.notify(currentId.getAndIncrement(), notification);
+    notificationManager.notify(currentId.getAndIncrement(), notification);
   }
 }
--- a/mobile/android/base/sync/CommandRunner.java
+++ b/mobile/android/base/sync/CommandRunner.java
@@ -1,11 +1,22 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync;
 
 import java.util.List;
 
-public interface CommandRunner {
-  public void executeCommand(List<String> args);
+public abstract class CommandRunner {
+  public final int argCount;
+
+  public CommandRunner(int argCount) {
+    this.argCount = argCount;
+  }
+
+  public abstract void executeCommand(List<String> args);
+
+  public boolean argumentsAreValid(List<String> args) {
+    return args != null &&
+           args.size() == argCount;
+  }
 }
--- a/mobile/android/base/sync/GlobalSession.java
+++ b/mobile/android/base/sync/GlobalSession.java
@@ -162,54 +162,55 @@ public class GlobalSession implements Cr
 
     registerCommands();
     prepareStages();
 
     // TODO: data-driven plan for the sync, referring to prepareStages.
   }
 
   protected void registerCommands() {
-    CommandProcessor processor = CommandProcessor.getProcessor();
+    final CommandProcessor processor = CommandProcessor.getProcessor();
+    processor.registerSession(this);
 
-    processor.registerCommand("resetEngine", new CommandRunner() {
+    processor.registerCommand("resetEngine", new CommandRunner(1) {
       @Override
       public void executeCommand(List<String> args) {
         HashSet<String> names = new HashSet<String>();
         names.add(args.get(0));
         resetStagesByName(names);
       }
     });
 
-    processor.registerCommand("resetAll", new CommandRunner() {
+    processor.registerCommand("resetAll", new CommandRunner(0) {
       @Override
       public void executeCommand(List<String> args) {
         resetAllStages();
       }
     });
 
-    processor.registerCommand("wipeEngine", new CommandRunner() {
+    processor.registerCommand("wipeEngine", new CommandRunner(1) {
       @Override
       public void executeCommand(List<String> args) {
         HashSet<String> names = new HashSet<String>();
         names.add(args.get(0));
         wipeStagesByName(names);
       }
     });
 
-    processor.registerCommand("wipeAll", new CommandRunner() {
+    processor.registerCommand("wipeAll", new CommandRunner(0) {
       @Override
       public void executeCommand(List<String> args) {
         wipeAllStages();
       }
     });
 
-    processor.registerCommand("displayURI", new CommandRunner() {
+    processor.registerCommand("displayURI", new CommandRunner(3) {
       @Override
       public void executeCommand(List<String> args) {
-        CommandProcessor.getProcessor().displayURI(args, getContext());
+        CommandProcessor.displayURI(args, context);
       }
     });
   }
 
   protected void prepareStages() {
     HashMap<Stage, GlobalSyncStage> stages = new HashMap<Stage, GlobalSyncStage>();
 
     stages.put(Stage.checkPreconditions,      new CheckPreconditionsStage(this));
--- a/mobile/android/base/sync/net/SyncStorageRecordRequest.java
+++ b/mobile/android/base/sync/net/SyncStorageRecordRequest.java
@@ -87,16 +87,25 @@ public class SyncStorageRecordRequest ex
     toPOST.add(body);
     try {
       this.resource.post(jsonEntity(toPOST));
     } catch (UnsupportedEncodingException e) {
       this.delegate.handleRequestError(e);
     }
   }
 
+  public void post(JSONArray body) {
+    // Let's do this the trivial way for now.
+    try {
+      this.resource.post(jsonEntity(body));
+    } catch (UnsupportedEncodingException e) {
+      this.delegate.handleRequestError(e);
+    }
+  }
+
   public void put(JSONObject body) {
     // Let's do this the trivial way for now.
     try {
       this.resource.put(jsonEntity(body));
     } catch (UnsupportedEncodingException e) {
       this.delegate.handleRequestError(e);
     }
   }
--- a/mobile/android/base/sync/repositories/android/ClientsDatabase.java
+++ b/mobile/android/base/sync/repositories/android/ClientsDatabase.java
@@ -23,84 +23,176 @@ public class ClientsDatabase extends Cac
 
   // Clients Table.
   public static final String TBL_CLIENTS      = "clients";
   public static final String COL_ACCOUNT_GUID = "guid";
   public static final String COL_PROFILE      = "profile";
   public static final String COL_NAME         = "name";
   public static final String COL_TYPE         = "device_type";
 
-  public static final String[] TBL_COLUMNS = new String[] { COL_ACCOUNT_GUID, COL_PROFILE, COL_NAME, COL_TYPE };
-  public static final String TBL_KEY = COL_ACCOUNT_GUID + " = ? AND " +
-                                       COL_PROFILE + " = ?";
+  public static final String[] TBL_CLIENTS_COLUMNS = new String[] { COL_ACCOUNT_GUID, COL_PROFILE, COL_NAME, COL_TYPE };
+  public static final String TBL_CLIENTS_KEY = COL_ACCOUNT_GUID + " = ? AND " +
+                                               COL_PROFILE + " = ?";
+
+  // Commands Table.
+  public static final String TBL_COMMANDS = "commands";
+  public static final String COL_COMMAND  = "command";
+  public static final String COL_ARGS     = "args";
+
+  public static final String[] TBL_COMMANDS_COLUMNS    = new String[] { COL_ACCOUNT_GUID, COL_COMMAND, COL_ARGS };
+  public static final String   TBL_COMMANDS_KEY        = COL_ACCOUNT_GUID + " = ? AND " +
+                                                         COL_COMMAND + " = ? AND " +
+                                                         COL_ARGS + " = ?";
+  public static final String   TBL_COMMANDS_GUID_QUERY = COL_ACCOUNT_GUID + " = ? ";
 
   private final RepoUtils.QueryHelper queryHelper;
 
   public ClientsDatabase(Context context) {
     super(context, DB_NAME, null, SCHEMA_VERSION);
     this.queryHelper = new RepoUtils.QueryHelper(context, null, LOG_TAG);
   }
 
   @Override
   public void onCreate(SQLiteDatabase db) {
-    String createTableSql = "CREATE TABLE " + TBL_CLIENTS + " ("
+    createClientsTable(db);
+    createCommandsTable(db);
+  }
+
+  public static void createClientsTable(SQLiteDatabase db) {
+    String createClientsTableSql = "CREATE TABLE " + TBL_CLIENTS + " ("
         + COL_ACCOUNT_GUID + " TEXT, "
         + COL_PROFILE + " TEXT, "
         + COL_NAME + " TEXT, "
         + COL_TYPE + " TEXT, "
         + "PRIMARY KEY (" + COL_ACCOUNT_GUID + ", " + COL_PROFILE + "))";
-    db.execSQL(createTableSql);
+    db.execSQL(createClientsTableSql);
   }
 
-  public void wipe() {
-    SQLiteDatabase db = this.getCachedWritableDatabase();
-    onUpgrade(db, SCHEMA_VERSION, SCHEMA_VERSION);
+  public static void createCommandsTable(SQLiteDatabase db) {
+    String createCommandsTableSql = "CREATE TABLE " + TBL_COMMANDS + " ("
+        + COL_ACCOUNT_GUID + " TEXT, "
+        + COL_COMMAND + " TEXT, "
+        + COL_ARGS + " TEXT, "
+        + "PRIMARY KEY (" + COL_ACCOUNT_GUID + ", " + COL_COMMAND + ", " + COL_ARGS + "), "
+        + "FOREIGN KEY (" + COL_ACCOUNT_GUID + ") REFERENCES " + TBL_CLIENTS + " (" + COL_ACCOUNT_GUID + "))";
+    db.execSQL(createCommandsTableSql);
   }
 
   @Override
   public void onUpgrade(SQLiteDatabase db, int oldVersion, int newVersion) {
     // For now we'll just drop and recreate the tables.
     db.execSQL("DROP TABLE IF EXISTS " + TBL_CLIENTS);
+    db.execSQL("DROP TABLE IF EXISTS " + TBL_COMMANDS);
     onCreate(db);
   }
 
+  public void wipeDB() {
+    SQLiteDatabase db = this.getCachedWritableDatabase();
+    onUpgrade(db, SCHEMA_VERSION, SCHEMA_VERSION);
+  }
+
+  public void wipeClientsTable() {
+    SQLiteDatabase db = this.getCachedWritableDatabase();
+    db.execSQL("DELETE FROM " + TBL_CLIENTS);
+  }
+
+  public void wipeCommandsTable() {
+    SQLiteDatabase db = this.getCachedWritableDatabase();
+    db.execSQL("DELETE FROM " + TBL_COMMANDS);
+  }
+
   // If a record with given GUID exists, we'll update it,
   // otherwise we'll insert it.
   public void store(String profileId, ClientRecord record) {
     SQLiteDatabase db = this.getCachedWritableDatabase();
 
     ContentValues cv = new ContentValues();
     cv.put(COL_ACCOUNT_GUID, record.guid);
     cv.put(COL_PROFILE, profileId);
     cv.put(COL_NAME, record.name);
     cv.put(COL_TYPE, record.type);
 
     String[] args = new String[] { record.guid, profileId };
-    int rowsUpdated = db.update(TBL_CLIENTS, cv, TBL_KEY, args);
+    int rowsUpdated = db.update(TBL_CLIENTS, cv, TBL_CLIENTS_KEY, args);
 
     if (rowsUpdated >= 1) {
       Logger.debug(LOG_TAG, "Replaced client record for row with accountGUID " + record.guid);
     } else {
       long rowId = db.insert(TBL_CLIENTS, null, cv);
       Logger.debug(LOG_TAG, "Inserted client record into row: " + rowId);
     }
   }
 
-  public Cursor fetch(String accountGuid, String profileId) throws NullCursorException {
-    String[] args = new String[] { accountGuid, profileId };
-    SQLiteDatabase db = this.getCachedReadableDatabase();
+  /**
+   * Store a command in the commands database if it doesn't already exist.
+   *
+   * @param accountGUID
+   * @param command - The command type
+   * @param args - A JSON string of args
+   * @throws NullCursorException
+   */
+  public void store(String accountGUID, String command, String args) throws NullCursorException {
+    Logger.trace(LOG_TAG, "Storing command " + command + " with args " + args);
+    SQLiteDatabase db = this.getCachedWritableDatabase();
 
-    return queryHelper.safeQuery(db, ".fetch", TBL_CLIENTS, TBL_COLUMNS, TBL_KEY, args);
+    ContentValues cv = new ContentValues();
+    cv.put(COL_ACCOUNT_GUID, accountGUID);
+    cv.put(COL_COMMAND, command);
+    if (args == null) {
+      cv.put(COL_ARGS, "[]");
+    } else {
+      cv.put(COL_ARGS, args);
+    }
+
+    Cursor cur = this.fetchSpecificCommand(accountGUID, command, args);
+    try {
+      if (cur.moveToFirst()) {
+        Logger.debug(LOG_TAG, "Command already exists in database.");
+        return;
+      }
+    } finally {
+      cur.close();
+    }
+
+    long rowId = db.insert(TBL_COMMANDS, null, cv);
+    Logger.debug(LOG_TAG, "Inserted command into row: " + rowId);
   }
 
-  public Cursor fetchAll() throws NullCursorException {
+  public Cursor fetchClientsCursor(String accountGUID, String profileId) throws NullCursorException {
+    String[] args = new String[] { accountGUID, profileId };
+    SQLiteDatabase db = this.getCachedReadableDatabase();
+
+    return queryHelper.safeQuery(db, ".fetchClientsCursor", TBL_CLIENTS, TBL_CLIENTS_COLUMNS, TBL_CLIENTS_KEY, args);
+  }
+
+  public Cursor fetchSpecificCommand(String accountGUID, String command, String commandArgs) throws NullCursorException {
+    String[] args = new String[] { accountGUID, command, commandArgs };
     SQLiteDatabase db = this.getCachedReadableDatabase();
 
-    return queryHelper.safeQuery(db, ".fetch", TBL_CLIENTS, TBL_COLUMNS, null, null);
+    return queryHelper.safeQuery(db, ".fetchSpecificCommand", TBL_COMMANDS, TBL_COMMANDS_COLUMNS, TBL_COMMANDS_KEY, args);
+  }
+
+  public Cursor fetchCommandsForClient(String accountGUID) throws NullCursorException {
+    String[] args = new String[] { accountGUID };
+    SQLiteDatabase db = this.getCachedReadableDatabase();
+
+    return queryHelper.safeQuery(db, ".fetchCommandsForClient", TBL_COMMANDS, TBL_COMMANDS_COLUMNS, TBL_COMMANDS_GUID_QUERY, args);
   }
 
-  public void delete(String accountGUID, String profileId) {
+  public Cursor fetchAllClients() throws NullCursorException {
+    SQLiteDatabase db = this.getCachedReadableDatabase();
+
+    return queryHelper.safeQuery(db, ".fetchAllClients", TBL_CLIENTS, TBL_CLIENTS_COLUMNS, null, null);
+  }
+
+  public Cursor fetchAllCommands() throws NullCursorException {
+    SQLiteDatabase db = this.getCachedReadableDatabase();
+
+    return queryHelper.safeQuery(db, ".fetchAllCommands", TBL_COMMANDS, TBL_COMMANDS_COLUMNS, null, null);
+  }
+
+  public void deleteClient(String accountGUID, String profileId) {
     String[] args = new String[] { accountGUID, profileId };
 
     SQLiteDatabase db = this.getCachedWritableDatabase();
-    db.delete(TBL_CLIENTS, TBL_KEY, args);
+    db.delete(TBL_CLIENTS, TBL_CLIENTS_KEY, args);
   }
 }
--- a/mobile/android/base/sync/repositories/android/ClientsDatabaseAccessor.java
+++ b/mobile/android/base/sync/repositories/android/ClientsDatabaseAccessor.java
@@ -1,19 +1,23 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.repositories.android;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import org.json.simple.JSONArray;
+import org.mozilla.gecko.sync.CommandProcessor.Command;
 import org.mozilla.gecko.sync.repositories.NullCursorException;
 import org.mozilla.gecko.sync.repositories.domain.ClientRecord;
 
 import android.content.Context;
 import android.database.Cursor;
 
 public class ClientsDatabaseAccessor {
 
@@ -34,82 +38,131 @@ public class ClientsDatabaseAccessor {
   }
 
   public void store(Collection<ClientRecord> records) {
     for (ClientRecord record : records) {
       this.store(record);
     }
   }
 
-  public ClientRecord fetch(String accountGUID) throws NullCursorException {
-    Cursor cur = null;
+  public void store(String accountGUID, Command command) throws NullCursorException {
+    db.store(accountGUID, command.commandType, command.args.toJSONString());
+  }
+
+  public ClientRecord fetchClient(String accountGUID) throws NullCursorException {
+    final Cursor cur = db.fetchClientsCursor(accountGUID, getProfileId());
     try {
-      cur = db.fetch(accountGUID, getProfileId());
-
-      if (cur == null || !cur.moveToFirst()) {
+      if (!cur.moveToFirst()) {
         return null;
       }
       return recordFromCursor(cur);
     } finally {
-      if (cur != null) {
-        cur.close();
-      }
+      cur.close();
     }
   }
 
-  public Map<String, ClientRecord> fetchAll() throws NullCursorException {
-    HashMap<String, ClientRecord> map = new HashMap<String, ClientRecord>();
-    Cursor cur = null;
+  public Map<String, ClientRecord> fetchAllClients() throws NullCursorException {
+    final HashMap<String, ClientRecord> map = new HashMap<String, ClientRecord>();
+    final Cursor cur = db.fetchAllClients();
     try {
-      cur = db.fetchAll();
-      if (cur == null || !cur.moveToFirst()) {
+      if (!cur.moveToFirst()) {
         return Collections.unmodifiableMap(map);
       }
+
       while (!cur.isAfterLast()) {
         ClientRecord clientRecord = recordFromCursor(cur);
         map.put(clientRecord.guid, clientRecord);
         cur.moveToNext();
       }
-
       return Collections.unmodifiableMap(map);
     } finally {
-      if (cur != null) {
-        cur.close();
-      }
+      cur.close();
     }
   }
 
-  protected ClientRecord recordFromCursor(Cursor cur) {
+  public List<Command> fetchAllCommands() throws NullCursorException {
+    final List<Command> commands = new ArrayList<Command>();
+    final Cursor cur = db.fetchAllCommands();
+    try {
+      if (!cur.moveToFirst()) {
+        return Collections.unmodifiableList(commands);
+      }
+
+      while (!cur.isAfterLast()) {
+        Command command = commandFromCursor(cur);
+        commands.add(command);
+        cur.moveToNext();
+      }
+      return Collections.unmodifiableList(commands);
+    } finally {
+      cur.close();
+    }
+  }
+
+  public List<Command> fetchCommandsForClient(String accountGUID) throws NullCursorException {
+    final List<Command> commands = new ArrayList<Command>();
+    final Cursor cur = db.fetchCommandsForClient(accountGUID);
+    try {
+      if (!cur.moveToFirst()) {
+        return Collections.unmodifiableList(commands);
+      }
+
+      while(!cur.isAfterLast()) {
+        Command command = commandFromCursor(cur);
+        commands.add(command);
+        cur.moveToNext();
+      }
+      return Collections.unmodifiableList(commands);
+    } finally {
+      cur.close();
+    }
+  }
+
+  protected static ClientRecord recordFromCursor(Cursor cur) {
     String accountGUID = RepoUtils.getStringFromCursor(cur, ClientsDatabase.COL_ACCOUNT_GUID);
     String clientName = RepoUtils.getStringFromCursor(cur, ClientsDatabase.COL_NAME);
     String clientType = RepoUtils.getStringFromCursor(cur, ClientsDatabase.COL_TYPE);
     ClientRecord record = new ClientRecord(accountGUID);
     record.name = clientName;
     record.type = clientType;
     return record;
   }
 
+  protected static Command commandFromCursor(Cursor cur) {
+    String commandType = RepoUtils.getStringFromCursor(cur, ClientsDatabase.COL_COMMAND);
+    JSONArray commandArgs = RepoUtils.getJSONArrayFromCursor(cur, ClientsDatabase.COL_ARGS);
+    return new Command(commandType, commandArgs);
+  }
+
   public int clientsCount() {
-    Cursor cur;
     try {
-      cur = db.fetchAll();
+      final Cursor cur = db.fetchAllClients();
+      try {
+        return cur.getCount();
+      } finally {
+        cur.close();
+      }
     } catch (NullCursorException e) {
       return 0;
     }
-    try {
-      return cur.getCount();
-    } finally {
-      cur.close();
-    }
+
   }
 
   private String getProfileId() {
     return ClientsDatabaseAccessor.PROFILE_ID;
   }
 
-  public void wipe() {
-    db.wipe();
+  public void wipeDB() {
+    db.wipeDB();
+  }
+
+  public void wipeClientsTable() {
+    db.wipeClientsTable();
+  }
+
+  public void wipeCommandsTable() {
+    db.wipeCommandsTable();
   }
 
   public void close() {
     db.close();
   }
 }
--- a/mobile/android/base/sync/repositories/domain/ClientRecord.java
+++ b/mobile/android/base/sync/repositories/domain/ClientRecord.java
@@ -57,16 +57,19 @@ public class ClientRecord extends Record
     }
   }
 
   @Override
   protected void populatePayload(ExtendedJSONObject payload) {
     putPayload(payload, "id",   this.guid);
     putPayload(payload, "name", this.name);
     putPayload(payload, "type", this.type);
+    if (this.commands != null) {
+      payload.put("commands",  this.commands);
+    }
   }
 
   @Override
   public boolean equals(Object o) {
     if (!(o instanceof ClientRecord) || !super.equals(o)) {
       return false;
     }
 
--- a/mobile/android/base/sync/stage/SyncClientsEngineStage.java
+++ b/mobile/android/base/sync/stage/SyncClientsEngineStage.java
@@ -2,62 +2,68 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.stage;
 
 import java.io.UnsupportedEncodingException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 import org.mozilla.gecko.sync.CommandProcessor;
+import org.mozilla.gecko.sync.CommandProcessor.Command;
 import org.mozilla.gecko.sync.CryptoRecord;
 import org.mozilla.gecko.sync.ExtendedJSONObject;
 import org.mozilla.gecko.sync.GlobalSession;
 import org.mozilla.gecko.sync.HTTPFailureException;
 import org.mozilla.gecko.sync.Logger;
 import org.mozilla.gecko.sync.NoCollectionKeysSetException;
 import org.mozilla.gecko.sync.Utils;
 import org.mozilla.gecko.sync.crypto.CryptoException;
 import org.mozilla.gecko.sync.crypto.KeyBundle;
 import org.mozilla.gecko.sync.delegates.ClientsDataDelegate;
 import org.mozilla.gecko.sync.net.BaseResource;
 import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest;
 import org.mozilla.gecko.sync.net.SyncStorageRecordRequest;
 import org.mozilla.gecko.sync.net.SyncStorageResponse;
 import org.mozilla.gecko.sync.net.WBOCollectionRequestDelegate;
 import org.mozilla.gecko.sync.net.WBORequestDelegate;
+import org.mozilla.gecko.sync.repositories.NullCursorException;
 import org.mozilla.gecko.sync.repositories.android.ClientsDatabaseAccessor;
 import org.mozilla.gecko.sync.repositories.android.RepoUtils;
 import org.mozilla.gecko.sync.repositories.domain.ClientRecord;
 import org.mozilla.gecko.sync.repositories.domain.ClientRecordFactory;
 import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
 
 import ch.boye.httpclientandroidlib.HttpStatus;
 
 public class SyncClientsEngineStage implements GlobalSyncStage {
-  public static final String LOG_TAG = "SyncClientsEngineStage";
-  public static final String COLLECTION_NAME = "clients";
-  public static final int CLIENTS_TTL_REFRESH = 604800000; // 7 days
+  private static final String LOG_TAG = "SyncClientsEngineStage";
+
+  public static final String COLLECTION_NAME       = "clients";
+  public static final int CLIENTS_TTL_REFRESH      = 604800000;   // 7 days in milliseconds.
   public static final int MAX_UPLOAD_FAILURE_COUNT = 5;
 
   protected final GlobalSession session;
   protected final ClientRecordFactory factory = new ClientRecordFactory();
   protected ClientUploadDelegate clientUploadDelegate;
   protected ClientDownloadDelegate clientDownloadDelegate;
 
   // Be sure to use this safely via getClientsDatabaseAccessor/closeDataAccessor.
   protected ClientsDatabaseAccessor db;
 
   protected volatile boolean shouldWipe;
   protected volatile boolean commandsProcessedShouldUpload;
   protected final AtomicInteger uploadAttemptsCount = new AtomicInteger();
+  protected final List<ClientRecord> toUpload = new ArrayList<ClientRecord>();
 
   public SyncClientsEngineStage(GlobalSession session) {
     if (session == null) {
       throw new IllegalArgumentException("session must not be null.");
     }
     this.session = session;
   }
 
@@ -111,16 +117,19 @@ public class SyncClientsEngineStage impl
     @Override
     public void handleRequestSuccess(SyncStorageResponse response) {
 
       // Hang onto the server's last modified timestamp to use
       // in X-If-Unmodified-Since for upload.
       session.config.persistServerClientsTimestamp(response.normalizedWeaveTimestamp());
       BaseResource.consumeEntity(response);
 
+      // Wipe the clients table if it still hasn't been wiped but needs to be.
+      wipeAndStore(null);
+
       // If we successfully downloaded all records but ours was not one of them
       // then reset the timestamp.
       if (!localAccountGUIDDownloaded) {
         Logger.info(LOG_TAG, "Local client GUID does not exist on the server. Upload timestamp will be reset.");
         session.config.persistServerClientRecordTimestamp(0);
       }
       localAccountGUIDDownloaded = false;
 
@@ -134,16 +143,23 @@ public class SyncClientsEngineStage impl
       }
 
       Logger.debug(LOG_TAG, "Database contains " + clientsCount + " clients.");
       Logger.debug(LOG_TAG, "Server response asserts " + response.weaveRecords() + " records.");
 
       // TODO: persist the response timestamp to know whether to download next time (Bug 726055).
       clientUploadDelegate = new ClientUploadDelegate();
       clientsDelegate.setClientsCount(clientsCount);
+
+      // If we upload remote records, checkAndUpload() will be called upon
+      // upload success in the delegate. Otherwise call checkAndUpload() now.
+      if (toUpload.size() > 0) {
+        uploadRemoteRecords(response.normalizedWeaveTimestamp());
+        return;
+      }
       checkAndUpload();
     }
 
     @Override
     public void handleRequestFailure(SyncStorageResponse response) {
       BaseResource.consumeEntity(response); // We don't need the response at all, and any exception handling shouldn't need the response body.
       localAccountGUIDDownloaded = false;
 
@@ -174,89 +190,116 @@ public class SyncClientsEngineStage impl
       try {
         r = (ClientRecord) factory.createRecord(record.decrypt());
         if (clientsDelegate.isLocalGUID(r.guid)) {
           Logger.info(LOG_TAG, "Local client GUID exists on server and was downloaded");
 
           localAccountGUIDDownloaded = true;
           session.config.persistServerClientRecordTimestamp(r.lastModified);
           processCommands(r.commands);
+        } else {
+          // Only need to store record if it isn't our local one.
+          wipeAndStore(r);
+          addCommands(r);
         }
         RepoUtils.logClient(r);
       } catch (Exception e) {
         session.abort(e, "Exception handling client WBO.");
         return;
       }
-      wipeAndStore(r);
     }
 
     @Override
     public KeyBundle keyBundle() {
       try {
         return session.keyBundleForCollection(COLLECTION_NAME);
       } catch (NoCollectionKeysSetException e) {
         session.abort(e, "No collection keys set.");
         return null;
       }
     }
   }
 
   public class ClientUploadDelegate extends WBORequestDelegate {
     protected static final String LOG_TAG = "ClientUploadDelegate";
+    public Long currentlyUploadingRecordTimestamp;
+    public boolean currentlyUploadingLocalRecord;
 
     @Override
     public String credentials() {
       return session.credentials();
     }
 
+    private void setUploadDetails(boolean isLocalRecord) {
+      // Use the timestamp for the whole collection per Sync storage 1.1 spec.
+      currentlyUploadingRecordTimestamp = session.config.getPersistedServerClientsTimestamp();
+      currentlyUploadingLocalRecord = isLocalRecord;
+    }
+
     @Override
     public String ifUnmodifiedSince() {
-      // Use the timestamp for the whole collection per Sync storage 1.1 spec.
-      Long timestampInMilliseconds = session.config.getPersistedServerClientsTimestamp();
+      Long timestampInMilliseconds = currentlyUploadingRecordTimestamp;
 
       // It's the first upload so we don't care about X-If-Unmodified-Since.
       if (timestampInMilliseconds == 0) {
         return null;
       }
 
       return Utils.millisecondsToDecimalSecondsString(timestampInMilliseconds);
     }
 
     @Override
     public void handleRequestSuccess(SyncStorageResponse response) {
       Logger.debug(LOG_TAG, "Upload succeeded.");
-      try {
-        commandsProcessedShouldUpload = false;
-        uploadAttemptsCount.set(0);
+      uploadAttemptsCount.set(0);
 
-        // Persist the timestamp for the record we just uploaded,
-        // and bump the collection timestamp, too.
-        long timestamp = response.normalizedWeaveTimestamp();
-        session.config.persistServerClientRecordTimestamp(timestamp);
-        session.config.persistServerClientsTimestamp(timestamp);
-        BaseResource.consumeEntity(response);
+      // X-Weave-Timestamp is the modified time of uploaded records.
+      // Always persist this.
+      final long responseTimestamp = response.normalizedWeaveTimestamp();
+      Logger.trace(LOG_TAG, "Timestamp from header is: " + responseTimestamp);
 
-        Logger.debug(LOG_TAG, "Timestamp is " + timestamp);
-      } catch (Exception e) {
-        session.abort(e, "Unable to fetch timestamp.");
+      if (responseTimestamp == -1) {
+        final String message = "Response did not contain a valid timestamp.";
+        session.abort(new RuntimeException(message), message);
         return;
       }
+
+      BaseResource.consumeEntity(response);
+      session.config.persistServerClientsTimestamp(responseTimestamp);
+
+      // If we're not uploading our record, we're done here; just
+      // clean up and finish.
+      if (!currentlyUploadingLocalRecord) {
+        // TODO: check failed uploads in body.
+        clearRecordsToUpload();
+        checkAndUpload();
+        return;
+      }
+
+      // If we're processing our record, we have a little more cleanup
+      // to do.
+      commandsProcessedShouldUpload = false;
+      session.config.persistServerClientRecordTimestamp(responseTimestamp);
       session.advance();
     }
 
     @Override
     public void handleRequestFailure(SyncStorageResponse response) {
       int statusCode = response.getStatusCode();
 
       // If upload failed because of `ifUnmodifiedSince` then there are new
       // commands uploaded to our record. We must download and process them first.
       if (!commandsProcessedShouldUpload ||
           statusCode == HttpStatus.SC_PRECONDITION_FAILED ||
           uploadAttemptsCount.incrementAndGet() > MAX_UPLOAD_FAILURE_COUNT) {
+
         Logger.debug(LOG_TAG, "Client upload failed. Aborting sync.");
+        if (!currentlyUploadingLocalRecord) {
+          clearRecordsToUpload(); // These will be redownloaded.
+        }
         BaseResource.consumeEntity(response); // The exception thrown should need the response body.
         session.abort(new HTTPFailureException(response), "Client upload failed.");
         return;
       }
       Logger.trace(LOG_TAG, "Retrying upload…");
       // Preconditions:
       // commandsProcessedShouldUpload == true &&
       // statusCode != 412 &&
@@ -293,17 +336,17 @@ public class SyncClientsEngineStage impl
   @Override
   public void resetLocal() {
     // Clear timestamps and local data.
     session.config.persistServerClientRecordTimestamp(0L);   // TODO: roll these into one.
     session.config.persistServerClientsTimestamp(0L);
 
     session.getClientsDelegate().setClientsCount(0);
     try {
-      getClientsDatabaseAccessor().wipe();
+      getClientsDatabaseAccessor().wipeDB();
     } finally {
       closeDataAccessor();
     }
   }
 
   @Override
   public void wipeLocal() throws Exception {
     // Nothing more to do.
@@ -351,81 +394,161 @@ public class SyncClientsEngineStage impl
     if (commands == null ||
         commands.size() == 0) {
       return;
     }
 
     commandsProcessedShouldUpload = true;
     CommandProcessor processor = CommandProcessor.getProcessor();
 
-    // TODO: Bug 715792 - Process commands here.
-    for (int i = 0; i < commands.size(); i++) {
-      processor.processCommand(new ExtendedJSONObject((JSONObject) commands.get(i)));
+    for (Object o : commands) {
+      processor.processCommand(new ExtendedJSONObject((JSONObject) o));
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  protected void addCommands(ClientRecord record) throws NullCursorException {
+    Logger.trace(LOG_TAG, "Adding commands to " + record.guid);
+    List<Command> commands = db.fetchCommandsForClient(record.guid);
+
+    if (commands == null || commands.size() == 0) {
+      Logger.trace(LOG_TAG, "No commands to add.");
+      return;
+    }
+
+    for (Command command : commands) {
+      JSONObject jsonCommand = command.asJSONObject();
+      if (record.commands == null) {
+        record.commands = new JSONArray();
+      }
+      record.commands.add(jsonCommand);
     }
+    toUpload.add(record);
+  }
+
+  @SuppressWarnings("unchecked")
+  protected void uploadRemoteRecords(long timestamp) {
+    Logger.trace(LOG_TAG, "In uploadRemoteRecords. Uploading " + toUpload.size() + " records" );
+
+    if (toUpload.size() == 1) {
+      ClientRecord record = toUpload.get(0);
+      Logger.debug(LOG_TAG, "Only 1 remote record to upload.");
+      Logger.debug(LOG_TAG, "Record last modified: " + record.lastModified);
+      CryptoRecord cryptoRecord = encryptClientRecord(record);
+      if (cryptoRecord != null) {
+        clientUploadDelegate.setUploadDetails(false);
+        this.uploadClientRecord(cryptoRecord);
+      }
+      return;
+    }
+
+    JSONArray cryptoRecords = new JSONArray();
+    for (ClientRecord record : toUpload) {
+      Logger.trace(LOG_TAG, "Record " + record.guid + " is being uploaded" );
+
+      CryptoRecord cryptoRecord = encryptClientRecord(record);
+      cryptoRecords.add(cryptoRecord.toJSONObject());
+    }
+    Logger.debug(LOG_TAG, "Uploading records: " + cryptoRecords.size());
+    clientUploadDelegate.setUploadDetails(false);
+    this.uploadClientRecords(cryptoRecords);
   }
 
   protected void checkAndUpload() {
     if (!shouldUpload()) {
       Logger.debug(LOG_TAG, "Not uploading client record.");
       session.advance();
       return;
     }
 
+    final ClientRecord localClient = newLocalClientRecord(session.getClientsDelegate());
+    clientUploadDelegate.setUploadDetails(true);
+    CryptoRecord cryptoRecord = encryptClientRecord(localClient);
+    if (cryptoRecord != null) {
+      this.uploadClientRecord(cryptoRecord);
+    }
+  }
+
+  protected CryptoRecord encryptClientRecord(ClientRecord recordToUpload) {
     // Generate CryptoRecord from ClientRecord to upload.
     final String encryptionFailure = "Couldn't encrypt new client record.";
-    final ClientRecord localClient = newLocalClientRecord(session.getClientsDelegate());
+
     try {
-      CryptoRecord cryptoRecord = localClient.getEnvelope();
+      CryptoRecord cryptoRecord = recordToUpload.getEnvelope();
       cryptoRecord.keyBundle = clientUploadDelegate.keyBundle();
-      cryptoRecord.encrypt();
-      this.uploadClientRecord(cryptoRecord);
+      return cryptoRecord.encrypt();
     } catch (UnsupportedEncodingException e) {
       session.abort(e, encryptionFailure + " Unsupported encoding.");
     } catch (CryptoException e) {
       session.abort(e, encryptionFailure);
     }
+    return null;
+  }
+
+  public void clearRecordsToUpload() {
+    try {
+      db.wipeCommandsTable();
+      toUpload.clear();
+    } finally {
+      db.close();
+    }
   }
 
   protected void downloadClientRecords() {
     shouldWipe = true;
     clientDownloadDelegate = makeClientDownloadDelegate();
 
     try {
-      URI getURI = session.config.collectionURI(COLLECTION_NAME, true);
-
-      SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(getURI);
+      final URI getURI = session.config.collectionURI(COLLECTION_NAME, true);
+      final SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(getURI);
       request.delegate = clientDownloadDelegate;
 
       Logger.trace(LOG_TAG, "Downloading client records.");
       request.get();
     } catch (URISyntaxException e) {
       session.abort(e, "Invalid URI.");
     }
   }
 
+  protected void uploadClientRecords(JSONArray records) {
+    Logger.trace(LOG_TAG, "Uploading client records " + records.toJSONString());
+    try {
+      final URI postURI = session.config.collectionURI(COLLECTION_NAME, false);
+      final SyncStorageRecordRequest request = new SyncStorageRecordRequest(postURI);
+      request.delegate = clientUploadDelegate;
+      request.post(records);
+    } catch (URISyntaxException e) {
+      session.abort(e, "Invalid URI.");
+    } catch (Exception e) {
+      session.abort(e, "Unable to parse body.");
+    }
+  }
+
   /**
    * Upload a client record via HTTP POST to the parent collection.
    */
   protected void uploadClientRecord(CryptoRecord record) {
     Logger.debug(LOG_TAG, "Uploading client record " + record.guid);
     try {
-      URI postURI = session.config.collectionURI(COLLECTION_NAME);
-      SyncStorageRecordRequest request = new SyncStorageRecordRequest(postURI);
+      final URI postURI = session.config.collectionURI(COLLECTION_NAME);
+      final SyncStorageRecordRequest request = new SyncStorageRecordRequest(postURI);
       request.delegate = clientUploadDelegate;
       request.post(record);
     } catch (URISyntaxException e) {
       session.abort(e, "Invalid URI.");
     }
   }
 
   protected ClientDownloadDelegate makeClientDownloadDelegate() {
     return new ClientDownloadDelegate();
   }
 
   protected void wipeAndStore(ClientRecord record) {
     ClientsDatabaseAccessor db = getClientsDatabaseAccessor();
     if (shouldWipe) {
-      db.wipe();
+      db.wipeClientsTable();
       shouldWipe = false;
     }
-    db.store(record);
+    if (record != null) {
+      db.store(record);
+    }
   }
 }