Skip to content

Commit

Permalink
Test for task category and category API
Browse files Browse the repository at this point in the history
  • Loading branch information
adamkankovsky committed Feb 6, 2024
1 parent f16571f commit 9bb5498
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Handle task category reporting.
#
# Copyright (C) 2024 Red Hat, Inc.
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions of
# the GNU General Public License v.2, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY expressed or implied, including the implied warranties of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details. You should have received a copy of the
# GNU General Public License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA. Any Red Hat trademarks that are incorporated in the
# source code or documentation are not subject to the GNU General Public
# License and may only be used or replicated with the express permission of
# Red Hat, Inc.
#
import unittest
from unittest.mock import Mock, call

from pyanaconda.core.constants import CATEGORY_SYSTEM, CATEGORY_STORAGE
from pyanaconda.installation import RunInstallationTask
from pyanaconda.installation_tasks import TaskQueue, Task

class TestRunInstallation(RunInstallationTask):

def __init__(self, payload, ksdata):
super().__init__(payload=payload, ksdata=ksdata)

def _prepare_configuration(self, payload, ksdata):
configuration_queue = TaskQueue("Configuration queue")

# connect progress reporting
configuration_queue.queue_started.connect(self._queue_started_cb)
configuration_queue.task_completed.connect(self._task_completed_cb)

# Creating task queues with categories
queue1 = TaskQueue(name="group2", task_category=CATEGORY_STORAGE)
task1 = Task("Test task 2", lambda: None)
queue1.append(task1)

configuration_queue.append(queue1)

return configuration_queue

def _prepare_installation(self, payload, ksdata):
installation_queue = TaskQueue("Installation queue")

# connect progress reporting
installation_queue.queue_started.connect(self._queue_started_cb)
installation_queue.task_completed.connect(self._task_completed_cb)

# Creating task queues with categories
queue1 = TaskQueue(name="group1", task_category=CATEGORY_SYSTEM)
task1 = Task("Test task 1", lambda: None)
queue1.append(task1)

installation_queue.append(queue1)

return installation_queue

class InstallManagerTestCase(unittest.TestCase):
"""Test the install category API"""

def test_task_category_reporting(self):
payload = Mock()
task = TestRunInstallation(ksdata=[], payload=payload)
interface = task.for_publication()

callback = Mock()
interface.CategoryChanged.connect(callback)
task.run()

callback.assert_has_calls([
call(CATEGORY_SYSTEM),
call(CATEGORY_STORAGE),
])
9 changes: 5 additions & 4 deletions tests/unit_tests/pyanaconda_tests/test_installation_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,10 @@ def test_task_with_no_args(self):
def test_empty_task_queue(self):
"""Check that an empty task queue works correctly."""
# first check if empty task queue works correctly
task_queue = TaskQueue("foo", status_message="foo status message")
task_queue = TaskQueue("foo", status_message="foo status message", task_category="foo category")
assert task_queue.name == "foo"
assert task_queue.status_message == "foo status message"
assert task_queue.task_category == "foo category"
assert task_queue.task_count == 0
assert task_queue.queue_count == 0
assert task_queue.summary == dedent("""
Expand Down Expand Up @@ -186,20 +187,20 @@ def queue_completed_cb(*args):
assert self._queue_completed_count == 0

# create the group 1
group1 = TaskQueue(name="group1", status_message="processing group1")
group1 = TaskQueue(name="group1", status_message="processing group1", task_category="group1 category")
task1 = Task("increment var 1", self._increment_var1)
group1.append(task1)

# create the group 2
group2 = TaskQueue(name="group2", status_message="processing group2")
group2 = TaskQueue(name="group2", status_message="processing group2", task_category="group2 category")
task2a = Task("increment var 2", self._increment_var2)
group2.append(task2a)

task2b = Task("increment var 2", self._increment_var2)
group2.append(task2b)

# create the group 3
group3 = TaskQueue(name="group3", status_message="processing group3 (empty)")
group3 = TaskQueue(name="group3", status_message="processing group3 (empty)", task_category="group3 category")

# create the top level queue
queue1 = TaskQueue(name="queue1")
Expand Down

0 comments on commit 9bb5498

Please sign in to comment.