testing/marionette/client/marionette/tests/unit/test_wait.py
author Ryan VanderMeulen <ryanvm@gmail.com>
Wed, 11 Feb 2015 12:15:22 -0500
changeset 228662 fe79362ffaaf4d0e71f2cd9773c328e7c6239001
parent 228660 9ef91d4fcf3b3d91872ca47572e0a2088728a686
child 230642 ce483404c6bff629f1646cd706f928b8d9805f31
permissions -rw-r--r--
Backed out 12 changesets (bug 1107336) for Marionette harness bustage on a CLOSED TREE. Backed out changeset 5075f0063d70 (bug 1107336) Backed out changeset 9ef91d4fcf3b (bug 1107336) Backed out changeset 995911340bf7 (bug 1107336) Backed out changeset 4817d3cd3810 (bug 1107336) Backed out changeset dad798a5e595 (bug 1107336) Backed out changeset 4f60c437140c (bug 1107336) Backed out changeset f92a5c6256fa (bug 1107336) Backed out changeset 399a436c6e5f (bug 1107336) Backed out changeset 0d04801bb2f0 (bug 1107336) Backed out changeset 60d9d4d20971 (bug 1107336) Backed out changeset 9c4b171c23be (bug 1107336) Backed out changeset a07bb95eb209 (bug 1107336)

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

import sys
import time

import errors
import wait

from marionette_test import MarionetteTestCase
from wait import Wait

class TickingClock(object):
    def __init__(self, incr=1):
        self.ticks = 0
        self.increment = incr

    def sleep(self, dur):
        self.ticks += self.increment

    @property
    def now(self):
        return self.ticks

class SequenceClock(object):
    def __init__(self, times):
        self.times = times
        self.i = 0

    @property
    def now(self):
        if len(self.times) > self.i:
            self.i += 1
        return self.times[self.i - 1]

    def sleep(self, dur):
        pass

class MockMarionette(object):
    def __init__(self):
        self.waited = 0
        self.timeout = None

    def exception(self, e=None, wait=1):
        self.wait()
        if self.waited == wait:
            if e is None:
                e = Exception
            raise e

    def true(self, wait=1):
        self.wait()
        if self.waited == wait:
            return True
        return None

    def false(self, wait=1):
        self.wait()
        return False

    def none(self, wait=1):
        self.wait()
        return None

    def value(self, value, wait=1):
        self.wait()
        if self.waited == wait:
            return value
        return None

    def wait(self):
        self.waited += 1

def at_third_attempt(clock, end):
    return clock.now == 2

def now(clock, end):
    return True

class SystemClockTest(MarionetteTestCase):
    def setUp(self):
        super(SystemClockTest, self).setUp()
        self.clock = wait.SystemClock()

    def test_construction_initializes_time(self):
        self.assertEqual(self.clock._time, time)

    def test_sleep(self):
        start = time.time()
        self.clock.sleep(0.1)
        end = time.time() - start
        self.assertGreater(end, 0)

    def test_time_now(self):
        self.assertIsNotNone(self.clock.now)

class FormalWaitTest(MarionetteTestCase):
    def setUp(self):
        super(FormalWaitTest, self).setUp()
        self.m = MockMarionette()
        self.m.timeout = 123

    def test_construction_with_custom_timeout(self):
        wt = Wait(self.m, timeout=42)
        self.assertEqual(wt.timeout, 42)

    def test_construction_with_custom_interval(self):
        wt = Wait(self.m, interval=42)
        self.assertEqual(wt.interval, 42)

    def test_construction_with_custom_clock(self):
        c = TickingClock(1)
        wt = Wait(self.m, clock=c)
        self.assertEqual(wt.clock, c)

    def test_construction_with_custom_exception(self):
        wt = Wait(self.m, ignored_exceptions=Exception)
        self.assertIn(Exception, wt.exceptions)
        self.assertEqual(len(wt.exceptions), 1)

    def test_construction_with_custom_exception_list(self):
        exc = [Exception, ValueError]
        wt = Wait(self.m, ignored_exceptions=exc)
        for e in exc:
            self.assertIn(e, wt.exceptions)
        self.assertEqual(len(wt.exceptions), len(exc))

    def test_construction_with_custom_exception_tuple(self):
        exc = (Exception, ValueError)
        wt = Wait(self.m, ignored_exceptions=exc)
        for e in exc:
            self.assertIn(e, wt.exceptions)
        self.assertEqual(len(wt.exceptions), len(exc))

    def test_duplicate_exceptions(self):
        wt = Wait(self.m, ignored_exceptions=[Exception, Exception])
        self.assertIn(Exception, wt.exceptions)
        self.assertEqual(len(wt.exceptions), 1)

    def test_default_timeout(self):
        self.assertEqual(wait.DEFAULT_TIMEOUT, 5)

    def test_default_interval(self):
        self.assertEqual(wait.DEFAULT_INTERVAL, 0.1)

    def test_end_property(self):
        wt = Wait(self.m)
        self.assertIsNotNone(wt.end)

    def test_marionette_property(self):
        wt = Wait(self.m)
        self.assertEqual(wt.marionette, self.m)

    def test_clock_property(self):
        wt = Wait(self.m)
        self.assertIsInstance(wt.clock, wait.SystemClock)

    def test_timeout_inherited_from_marionette(self):
        wt = Wait(self.m)
        self.assertEqual(wt.timeout * 1000.0, self.m.timeout)

    def test_timeout_uses_default_if_marionette_timeout_is_none(self):
        self.m.timeout = None
        wt = Wait(self.m)
        self.assertEqual(wt.timeout, wait.DEFAULT_TIMEOUT)

class PredicatesTest(MarionetteTestCase):
    def test_until(self):
        c = wait.SystemClock()
        self.assertFalse(wait.until_pred(c, sys.maxint))
        self.assertTrue(wait.until_pred(c, 0))

class WaitUntilTest(MarionetteTestCase):
    def setUp(self):
        super(WaitUntilTest, self).setUp()

        self.m = MockMarionette()
        self.clock = TickingClock()
        self.wt = Wait(self.m, timeout=10, interval=1, clock=self.clock)

    def test_true(self):
        r = self.wt.until(lambda x: x.true())
        self.assertTrue(r)
        self.assertEqual(self.clock.ticks, 0)

    def test_true_within_timeout(self):
        r = self.wt.until(lambda x: x.true(wait=5))
        self.assertTrue(r)
        self.assertEqual(self.clock.ticks, 4)

    def test_timeout(self):
        with self.assertRaises(errors.TimeoutException):
            r = self.wt.until(lambda x: x.true(wait=15))
        self.assertEqual(self.clock.ticks, 10)

    def test_exception_raises_immediately(self):
        with self.assertRaises(TypeError):
            self.wt.until(lambda x: x.exception(e=TypeError))
        self.assertEqual(self.clock.ticks, 0)

    def test_ignored_exception(self):
        self.wt.exceptions = (TypeError,)
        with self.assertRaises(errors.TimeoutException):
            self.wt.until(lambda x: x.exception(e=TypeError))

    def test_ignored_exception_wrapped_in_timeoutexception(self):
        self.wt.exceptions = (TypeError,)

        exc = None
        try:
            self.wt.until(lambda x: x.exception(e=TypeError))
        except Exception as e:
            exc = e

        s = str(exc)
        self.assertIsNotNone(exc)
        self.assertIsInstance(exc, errors.TimeoutException)
        self.assertIn(", caused by %r" % TypeError, s)
        self.assertIn("self.wt.until(lambda x: x.exception(e=TypeError))", s)

    def test_ignored_exception_after_timeout_is_not_raised(self):
        with self.assertRaises(errors.TimeoutException):
            r = self.wt.until(lambda x: x.exception(wait=15))
        self.assertEqual(self.clock.ticks, 10)

    def test_keyboard_interrupt(self):
        with self.assertRaises(KeyboardInterrupt):
            self.wt.until(lambda x: x.exception(e=KeyboardInterrupt))

    def test_system_exit(self):
        with self.assertRaises(SystemExit):
            self.wt.until(lambda x: x.exception(SystemExit))

    def test_true_condition_returns_immediately(self):
        r = self.wt.until(lambda x: x.true())
        self.assertIsInstance(r, bool)
        self.assertTrue(r)
        self.assertEqual(self.clock.ticks, 0)

    def test_value(self):
        r = self.wt.until(lambda x: "foo")
        self.assertEqual(r, "foo")
        self.assertEqual(self.clock.ticks, 0)

    def test_custom_predicate(self):
        r = self.wt.until(lambda x: x.true(wait=2), is_true=at_third_attempt)
        self.assertTrue(r)
        self.assertEqual(self.clock.ticks, 1)

    def test_custom_predicate_times_out(self):
        with self.assertRaises(errors.TimeoutException):
            self.wt.until(lambda x: x.true(wait=4), is_true=at_third_attempt)

        self.assertEqual(self.clock.ticks, 2)

    def test_timeout_elapsed_duration(self):
        with self.assertRaisesRegexp(errors.TimeoutException,
                                     "Timed out after 2.0 seconds"):
            self.wt.until(lambda x: x.true(wait=4), is_true=at_third_attempt)

    def test_timeout_elapsed_rounding(self):
        wt = Wait(self.m, clock=SequenceClock([1, 0.01, 1]), timeout=0)
        with self.assertRaisesRegexp(errors.TimeoutException,
                                     "Timed out after 1.0 seconds"):
            wt.until(lambda x: x.true(), is_true=now)

    def test_message(self):
        self.wt.exceptions = (TypeError,)
        exc = None
        try:
            self.wt.until(lambda x: x.exception(e=TypeError), message="hooba")
        except errors.TimeoutException as e:
            exc = e

        result = str(exc)
        self.assertIn("seconds with message: hooba, caused by", result)

    def test_no_message(self):
        self.wt.exceptions = (TypeError,)
        exc = None
        try:
            self.wt.until(lambda x: x.exception(e=TypeError), message="")
        except errors.TimeoutException as e:
            exc = e

        result = str(exc)
        self.assertIn("seconds, caused by", result)

    def test_message_has_none_as_its_value(self):
        self.wt.exceptions = (TypeError,)
        exc = None
        try:
            self.wt.until(False, None, None)
        except errors.TimeoutException as e:
            exc = e

        result = str(exc)
        self.assertNotIn("with message:", result)
        self.assertNotIn("secondsNone", result)