| #!/usr/bin/python3 |
| |
| # This program is free software; you can redistribute it and/or modify it under |
| # the terms of the GNU Lesser General Public License as published by the Free |
| # Software Foundation; either version 3 of the License, or (at your option) any |
| # later version. See http://www.gnu.org/copyleft/lgpl.html for the full text |
| # of the license. |
| |
| __author__ = 'Bastien Nocera' |
| __email__ = 'hadess@hadess.net' |
| __copyright__ = '(c) 2021 Red Hat Inc.' |
| __license__ = 'LGPL 3+' |
| |
| import unittest |
| import sys |
| import subprocess |
| import fcntl |
| import os |
| import time |
| |
| import taptestrunner |
| |
| try: |
| # Do all non-standard imports here so we can skip the tests if any |
| # needed packages are not available. |
| import dbus |
| import dbus.mainloop.glib |
| import dbusmock |
| from gi.repository import GLib |
| from gi.repository import Gio |
| from gi.repository import GObject |
| |
| dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) |
| |
| # XDG_DESKTOP_PORTAL_PATH = os.path.expanduser("~/.cache/jhbuild/build/xdg-desktop-portal/xdg-desktop-portal") |
| XDG_DESKTOP_PORTAL_PATH = "@libexecdir@/xdg-desktop-portal" |
| |
| class TestPowerProfileMonitorPortal(dbusmock.DBusTestCase): |
| '''Test GPowerProfileMonitorPortal''' |
| |
| @classmethod |
| def setUpClass(klass): |
| klass.start_system_bus() |
| klass.dbus_con = klass.get_dbus(True) |
| # Start session bus so that xdg-desktop-portal can run on it |
| klass.start_session_bus() |
| |
| def setUp(self): |
| try: |
| Gio.PowerProfileMonitor |
| except AttributeError: |
| raise unittest.SkipTest('Power Profile Monitor not in ' |
| 'introspection data. Requires ' |
| 'GObject-Introspection > 1.69.0') |
| try: |
| (self.p_mock, self.obj_ppd) = self.spawn_server_template( |
| 'power_profiles_daemon', {}, stdout=subprocess.PIPE) |
| except ModuleNotFoundError: |
| raise unittest.SkipTest("power-profiles-daemon dbusmock template not " |
| "found. Requires dbusmock > 0.23.1.") |
| # set log to nonblocking |
| flags = fcntl.fcntl(self.p_mock.stdout, fcntl.F_GETFL) |
| fcntl.fcntl(self.p_mock.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK) |
| self.power_saver_enabled = False |
| self.dbus_props = dbus.Interface(self.obj_ppd, dbus.PROPERTIES_IFACE) |
| try: |
| self.xdp = subprocess.Popen([XDG_DESKTOP_PORTAL_PATH]) |
| except FileNotFoundError: |
| self.p_mock.terminate() |
| self.p_mock.wait() |
| raise unittest.SkipTest("xdg-desktop-portal not available") |
| |
| try: |
| self.wait_for_bus_object('org.freedesktop.portal.Desktop', |
| '/org/freedesktop/portal/desktop') |
| except: |
| self.p_mock.terminate() |
| self.p_mock.wait() |
| raise |
| # subprocess.Popen(['gdbus', 'monitor', '--session', '--dest', 'org.freedesktop.portal.Desktop']) |
| |
| os.environ['GTK_USE_PORTAL'] = "1" |
| self.power_profile_monitor = Gio.PowerProfileMonitor.dup_default() |
| assert("GPowerProfileMonitorPortal" in str(self.power_profile_monitor)) |
| self.power_profile_monitor.connect("notify::power-saver-enabled", self.power_saver_enabled_cb) |
| self.mainloop = GLib.MainLoop() |
| self.main_context = self.mainloop.get_context() |
| |
| def tearDown(self): |
| self.p_mock.terminate() |
| self.p_mock.wait() |
| |
| def assertEventually(self, condition, message=None, timeout=50): |
| '''Assert that condition function eventually returns True. |
| |
| Timeout is in deciseconds, defaulting to 50 (5 seconds). message is |
| printed on failure. |
| ''' |
| while timeout >= 0: |
| context = GLib.MainContext.default() |
| while context.iteration(False): |
| pass |
| if condition(): |
| break |
| timeout -= 1 |
| time.sleep(0.1) |
| else: |
| self.fail(message or 'timed out waiting for ' + str(condition)) |
| |
| def power_saver_enabled_cb(self, spec, data): |
| self.power_saver_enabled = self.power_profile_monitor.get_power_saver_enabled() |
| self.main_context.wakeup() |
| |
| def test_power_profile_power_saver_enabled_portal(self): |
| '''power-saver-enabled property''' |
| |
| self.assertEqual(self.power_profile_monitor.get_power_saver_enabled(), False) |
| self.dbus_props.Set('net.hadess.PowerProfiles', 'ActiveProfile', dbus.String('power-saver', variant_level=1)) |
| self.assertEventually(lambda: self.power_saver_enabled == True, "power-saver didn't become enabled", 10) |
| |
| self.dbus_props.Set('net.hadess.PowerProfiles', 'ActiveProfile', dbus.String('balanced', variant_level=1)) |
| self.assertEventually(lambda: self.power_saver_enabled == False, "power-saver didn't become disabled", 10) |
| |
| def test_power_profile_power_saver_enabled_portal_default(self): |
| '''power-saver-enabled property default value''' |
| |
| self.dbus_props.Set('net.hadess.PowerProfiles', 'ActiveProfile', dbus.String('power-saver', variant_level=1)) |
| |
| # Create a new power profile monitor and check its property value is |
| # correct by default. |
| new_power_profile_monitor = GObject.new(GObject.type_from_name('GPowerProfileMonitorPortal')) |
| new_power_profile_monitor.init() |
| self.assertTrue(new_power_profile_monitor.get_power_saver_enabled()) |
| |
| except ImportError as e: |
| @unittest.skip("Cannot import %s" % e.name) |
| class TestPowerProfileMonitorPortal(unittest.TestCase): |
| def test_power_profile_power_saver_enabled_portal(self): |
| pass |
| |
| if __name__ == '__main__': |
| unittest.main(testRunner=taptestrunner.TAPTestRunner()) |