Bug 1197370 - Add chown method to adb.py, refactor chmod r=bc
authorEdwin Gao <egao@mozilla.com>
Fri, 12 Oct 2018 16:58:35 +0000
changeset 496647 53dfca556ff165528d145ebcaf64186ca274ccd3
parent 496646 2064477895c3d93ec55cab9dede17ba01a434b21
child 496648 74b2087ee696eb7369b65727b81fc67121789f7d
push id9984
push userffxbld-merge
push dateMon, 15 Oct 2018 21:07:35 +0000
treeherdermozilla-beta@183d27ea8570 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersbc
bugs1197370
milestone64.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1197370 - Add chown method to adb.py, refactor chmod r=bc Behavior changes: - added method for ADBDevice class called chown - when initializing ADBDevice class, check if recursive flag is supported (similar to chmod -R) Other changes: - handling for situation where recursive is desired but -R flag is not supported is in place - changed behavior above situation to mirror chmod (creation of temporary file based on `self.ls` output, then executing script on device using adb Unit Tests: - unit tests to exercise attributes and common paths created. Would need further expansion of tests at some point. - additional mocking fixtures created. Differential Revision: https://phabricator.services.mozilla.com/D8128
testing/mozbase/mozdevice/mozdevice/adb.py
testing/mozbase/mozdevice/tests/conftest.py
testing/mozbase/mozdevice/tests/manifest.ini
testing/mozbase/mozdevice/tests/test_chown.py
--- a/testing/mozbase/mozdevice/mozdevice/adb.py
+++ b/testing/mozbase/mozdevice/mozdevice/adb.py
@@ -668,22 +668,33 @@ class ADBDevice(ADBCommand):
         # Do we have chmod -R?
         try:
             self._chmod_R = False
             re_recurse = re.compile(r'[-]R')
             chmod_output = self.shell_output("chmod --help", timeout=timeout)
             match = re_recurse.search(chmod_output)
             if match:
                 self._chmod_R = True
-        except (ADBError, ADBTimeoutError) as e:
-            self._logger.debug('Check chmod -R: %s' % e)
+        except ADBError as e:
+            self._logger.debug("Check chmod -R: {}".format(e))
             match = re_recurse.search(e.message)
             if match:
                 self._chmod_R = True
-        self._logger.info("Native chmod -R support: %s" % self._chmod_R)
+        self._logger.info("Native chmod -R support: {}".format(self._chmod_R))
+
+        # Do we have chown -R?
+        try:
+            self._chown_R = False
+            chown_output = self.shell_output("chown --help", timeout=timeout)
+            match = re_recurse.search(chown_output)
+            if match:
+                self._chown_R = True
+        except ADBError as e:
+            self._logger.debug("Check chown -R: {}".format(e))
+        self._logger.info("Native chown -R support: {}".format(self._chown_R))
 
         try:
             cleared = self.shell_bool('logcat -P ""', timeout=timeout)
         except ADBError:
             cleared = False
         if not cleared:
             self._logger.info("Unable to turn off logcat chatty")
 
@@ -1605,16 +1616,47 @@ class ADBDevice(ADBCommand):
         :type timeout: integer or None
         :raises: * ADBTimeoutError
                  * ADBError"""
 
         rv = self.command_output(["remount"], timeout=timeout)
         if not rv.startswith("remount succeeded"):
             raise ADBError("Unable to remount device")
 
+    def batch_execute(self, commands, timeout=None, root=False):
+        """Writes commands to a temporary file then executes on the device.
+
+        :param list commands_list: List of commands to be run by the shell.
+        :param timeout: The maximum time in
+            seconds for any spawned adb process to complete before throwing
+            an ADBTimeoutError.
+            This timeout is per adb call. The total time spent
+            may exceed this value. If it is not specified, the value
+            set in the ADBDevice constructor is used.
+        :type timeout: integer or None
+        :param bool root: Flag specifying if the command should
+            be executed as root.
+        :raises: * ADBTimeoutError
+                 * ADBRootError
+                 * ADBError
+        """
+        try:
+            tmpf = tempfile.NamedTemporaryFile(delete=False)
+            tmpf.write('\n'.join(commands))
+            tmpf.close()
+            script = '/sdcard/{}'.format(os.path.basename(tmpf.name))
+            self.push(tmpf.name, script)
+            self.shell_output('sh {}'.format(script), timeout=timeout,
+                              root=root)
+        finally:
+            if tmpf:
+                os.unlink(tmpf.name)
+            if script:
+                self.rm(script, timeout=timeout, root=root)
+
     def chmod(self, path, recursive=False, mask="777", timeout=None, root=False):
         """Recursively changes the permissions of a directory on the
         device.
 
         :param str path: The directory name on the device.
         :param bool recursive: Flag specifying if the command should be
             executed recursively.
         :param str mask: The octal permissions.
@@ -1645,54 +1687,88 @@ class ADBDevice(ADBCommand):
                            (path, recursive, mask, root))
         if self.is_path_internal_storage(path, timeout=timeout):
             # External storage on Android is case-insensitive and permissionless
             # therefore even with the proper privileges it is not possible
             # to change modes.
             self._logger.warning('Ignoring attempt to chmod external storage')
             return
 
-        if not recursive:
-            self.shell_output("chmod %s %s" % (mask, path),
-                              timeout=timeout, root=root)
+        # build up the command to be run based on capabilities.
+        command = ['chmod']
+
+        if recursive and self._chmod_R:
+            command.append('-R')
+
+        command.append(mask)
+
+        if recursive and not self._chmod_R:
+            paths = self.ls(path, recursive=True, timeout=timeout, root=root)
+            base = ' '.join(command)
+            commands = [' '.join([base, entry]) for entry in paths]
+            self.batch_execute(commands, timeout, root)
+        else:
+            command.append(path)
+            self.shell_output(cmd=' '.join(command), timeout=timeout, root=root)
+
+    def chown(self, path, owner, group=None, recursive=False, timeout=None, root=False):
+        """Run the chown command on the provided path.
+
+        :param str path: path name on the device.
+        :param str owner: new owner of the path.
+        :param group: optional parameter specifying the new group the path
+                      should belong to.
+        :type group: str or None
+        :param bool recursive: optional value specifying whether the command should
+                    operate on files and directories recursively.
+        :param timeout: The maximum time in
+            seconds for any spawned adb process to complete before throwing
+            an ADBTimeoutError.
+            This timeout is per adb call. The total time spent
+            may exceed this value. If it is not specified, the value
+            set in the ADBDevice constructor is used.
+        :type timeout: integer or None
+        :param bool root: Flag specifying if the command should
+            be executed as root.
+        :raises: * ADBTimeoutError
+                 * ADBRootError
+                 * ADBError
+        """
+        path = posixpath.normpath(path.strip())
+        if self.is_path_internal_storage(path, timeout=timeout):
+            self._logger.warning('Ignoring attempt to chown external storage')
             return
 
-        if self._chmod_R:
-            try:
-                self.shell_output("chmod -R %s %s" % (mask, path),
-                                  timeout=timeout, root=root)
-            except ADBError as e:
-                if e.message.find('No such file or directory') == -1:
-                    raise
-                self._logger.warning('chmod -R %s %s: Ignoring Error: %s' %
-                                     (mask, path, e.message))
-            return
-        # Obtain a list of the directories and files which match path
-        # and construct a shell script which explictly calls chmod on
-        # each of them.
-        entries = self.ls(path, recursive=recursive, timeout=timeout,
-                          root=root)
-        tmpf = None
-        chmodsh = None
-        try:
-            tmpf = tempfile.NamedTemporaryFile(delete=False)
-            for entry in entries:
-                tmpf.write('chmod %s %s\n' % (mask, entry))
-            tmpf.close()
-            chmodsh = '/data/local/tmp/%s' % os.path.basename(tmpf.name)
-            self.push(tmpf.name, chmodsh)
-            self.shell_output('chmod 777 %s' % chmodsh, timeout=timeout,
-                              root=root)
-            self.shell_output('sh -c %s' % chmodsh, timeout=timeout,
-                              root=root)
-        finally:
-            if tmpf:
-                os.unlink(tmpf.name)
-            if chmodsh:
-                self.rm(chmodsh, timeout=timeout, root=root)
+        # build up the command to be run based on capabilities.
+        command = ['chown']
+
+        if recursive and self._chown_R:
+            command.append('-R')
+
+        if group:
+            # officially supported notation is : but . has been checked with
+            # sdk 17 and it works.
+            command.append('{owner}.{group}'.format(owner=owner, group=group))
+        else:
+            command.append(owner)
+
+        if recursive and not self._chown_R:
+            # recursive desired, but chown -R is not supported natively.
+            # like with chmod, get the list of subpaths, put them into a script
+            # then run it with adb with one call.
+            paths = self.ls(path, recursive=True, timeout=timeout, root=root)
+            base = ' '.join(command)
+            commands = [' '.join([base, entry]) for entry in paths]
+
+            self.batch_execute(commands, timeout, root)
+        else:
+            # recursive or not, and chown -R is supported natively.
+            # command can simply be run as provided by the user.
+            command.append(path)
+            self.shell_output(cmd=' '.join(command), timeout=timeout, root=root)
 
     def _test_path(self, argument, path, timeout=None, root=False):
         """Performs path and file type checking.
 
         :param str argument: Command line argument to the test command.
         :param str path: The path or filename on the device.
         :param timeout: The maximum time in
             seconds for any spawned adb process to complete before
--- a/testing/mozbase/mozdevice/tests/conftest.py
+++ b/testing/mozbase/mozdevice/tests/conftest.py
@@ -71,21 +71,83 @@ def mock_shell_output(monkeypatch):
         """
         if 'pm list package error' in cmd:
             return 'Error: Could not access the Package Manager'
         elif 'pm list package none' in cmd:
             return ''
         elif 'pm list package' in cmd:
             apps = ["org.mozilla.fennec", "org.mozilla.geckoview_example"]
             return ('package:{}\n' * len(apps)).format(*apps)
+        else:
+            print(str(cmd))
+            return str(cmd)
 
     monkeypatch.setattr(mozdevice.ADBDevice, 'shell_output', shell_output_wrapper)
 
 
 @pytest.fixture(autouse=True)
+def mock_is_path_internal_storage(monkeypatch):
+    """Monkeypatches the ADBDevice.is_path_internal_storage() method call.
+
+    Instead of returning the outcome of whether the path provided is
+    internal storage or external, this will always return True.
+
+    :param object monkeypatch: pytest provided fixture for mocking.
+    """
+    def is_path_internal_storage_wrapper(object, path, timeout=None):
+        """Actual monkeypatch implementation of the is_path_internal_storage() call.
+
+        :param str path: The path to test.
+        :param timeout: The maximum time in
+            seconds for any spawned adb process to complete before
+            throwing an ADBTimeoutError.  This timeout is per adb call. The
+            total time spent may exceed this value. If it is not
+            specified, the value set in the ADBDevice constructor is used.
+        :returns: boolean
+
+        :raises: * ADBTimeoutError
+                 * ADBError
+        """
+        if 'internal_storage' in path:
+            return True
+        return False
+
+    monkeypatch.setattr(mozdevice.ADBDevice,
+                        'is_path_internal_storage', is_path_internal_storage_wrapper)
+
+
+@pytest.fixture(autouse=True)
+def mock_shell_bool(monkeypatch):
+    """Monkeypatches the ADBDevice.shell_bool() method call.
+
+    Instead of returning the output of an adb call, this method will
+    return appropriate string output. Content of the string output is
+    in line with the calling method's expectations.
+
+    :param object monkeypatch: pytest provided fixture for mocking.
+    """
+    def shell_bool_wrapper(object, cmd, env=None, cwd=None, timeout=None, root=False):
+        """Actual monkeypatch implementation of the shell_bool method call.
+
+        :param object object: placeholder object representing ADBDevice
+        :param str cmd: command to be executed
+        :param env: contains the environment variable
+        :type env: dict or None
+        :param cwd: The directory from which to execute.
+        :type cwd: str or None
+        :param timeout: unused parameter tp represent timeout threshold
+        :returns: string - string representation of a simulated call to adb
+        """
+        print(cmd)
+        return str(cmd)
+
+    monkeypatch.setattr(mozdevice.ADBDevice, 'shell_bool', shell_bool_wrapper)
+
+
+@pytest.fixture(autouse=True)
 def mock_adb_object():
     """Patches the __init__ method call when instantiating ADBAndroid.
 
     This is the key to test abstract methods implemented in ADBDevice.
     ADBAndroid normally requires instantiated objects including but not
     limited to ADBDevice in order to execute its commands.
 
     With a pytest-mock patch, we are able to mock the initialization of
--- a/testing/mozbase/mozdevice/tests/manifest.ini
+++ b/testing/mozbase/mozdevice/tests/manifest.ini
@@ -1,5 +1,6 @@
 [DEFAULT]
 subsuite = mozbase, os == "linux"
 skip-if = python == 3
 [test_socket_connection.py]
 [test_is_app_installed.py]
+[test_chown.py]
new file mode 100644
--- /dev/null
+++ b/testing/mozbase/mozdevice/tests/test_chown.py
@@ -0,0 +1,68 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import
+
+import logging
+
+from mock import patch
+
+import mozunit
+import pytest
+
+
+@pytest.mark.parametrize('boolean_value', [True, False])
+def test_set_chown_r_attribute(mock_adb_object, redirect_stdout_and_assert, boolean_value):
+    mock_adb_object._chown_R = boolean_value
+    assert mock_adb_object._chown_R == boolean_value
+
+
+def test_chown_path_internal(mock_adb_object, redirect_stdout_and_assert):
+    """Tests whether attempt to chown internal path is ignored"""
+    with patch.object(logging, 'getLogger') as mock_log:
+        mock_adb_object._logger = mock_log
+
+    testing_parameters = {
+        "owner": "someuser",
+        "path": "internal_storage",
+    }
+    expected = 'Ignoring attempt to chown external storage'
+    mock_adb_object.chown(**testing_parameters)
+    assert ''.join(mock_adb_object._logger.method_calls[0][1]) != ''
+    assert ''.join(mock_adb_object._logger.method_calls[0][1]) == expected
+
+
+def test_chown_one_path(mock_adb_object, redirect_stdout_and_assert):
+    """Tests the path where only one path is provided."""
+    # set up mock logging and self._chown_R attribute.
+    with patch.object(logging, 'getLogger') as mock_log:
+        mock_adb_object._logger = mock_log
+    mock_adb_object._chown_R = True
+
+    testing_parameters = {
+        "owner": "someuser",
+        "path": "/system",
+    }
+    command = 'chown {owner} {path}'.format(**testing_parameters)
+    testing_parameters['text'] = command
+    redirect_stdout_and_assert(mock_adb_object.chown, **testing_parameters)
+
+
+def test_chown_one_path_with_group(mock_adb_object, redirect_stdout_and_assert):
+    """Tests the path where group is provided."""
+    # set up mock logging and self._chown_R attribute.
+    with patch.object(logging, 'getLogger') as mock_log:
+        mock_adb_object._logger = mock_log
+    mock_adb_object._chown_R = True
+
+    testing_parameters = {
+        "owner": "someuser",
+        "path": "/system",
+        "group": "group_2",
+    }
+    command = 'chown {owner}.{group} {path}'.format(**testing_parameters)
+    testing_parameters['text'] = command
+    redirect_stdout_and_assert(mock_adb_object.chown, **testing_parameters)
+
+
+if __name__ == '__main__':
+    mozunit.main()