Version: 1.0
Date: October 2025
Status: Planning Document
This document outlines a comprehensive, multi-phase approach to introduce functional testing to the Watcher project, adapting proven patterns from OpenStack Nova and Placement while accounting for Watcher's unique architecture. The plan includes test reorganization, fixture development, base test infrastructure, Gabbi-based declarative API testing, and contributor documentation.
The functional test suite will support two complementary testing approaches:
- Python Functional Tests: For complex workflows, service integration, and multi-step scenarios
- Gabbi Tests: For declarative API contract testing, microversion validation, and HTTP behavior
Both approaches will reuse common fixtures (database, configuration, policy) to ensure consistency and minimize duplication.
- Introduction
- Current State Analysis
- Goals and Objectives
- Watcher vs Nova: Key Differences
- Multi-Phase Implementation Plan
- Gabbi Test Integration
- Fixture Requirements Analysis
- Base Test Class Design
- Regression Test Infrastructure
- Contributor Documentation
- Testing Strategy
- Timeline and Milestones
Functional tests sit between unit tests and integration tests:
- Unit Tests: Test individual functions/classes in isolation with extensive mocking
- Functional Tests: Test multiple components working together with minimal mocking
- Integration Tests: Test the complete system with real external dependencies
- Catch integration bugs that unit tests miss
- Test real workflows (audit → strategy → action plan → applier)
- Validate RPC interactions between services
- Test database migrations with real data
- Verify API contracts with real WSGI application
- Regression protection for complex bugs
In functional tests:
- ✅ Use real Watcher code (API, decision engine, applier)
- ✅ Use real database operations (SQLite in-memory)
- ✅ Use real RPC messaging (oslo.messaging fake driver)
- ✅ Use real WSGI application (wsgi-intercept)
- ❌ Mock external services (Nova, Glance, Ceilometer, Gnocchi, Placement)
- ❌ Mock network I/O to external systems
watcher/tests/
├── __init__.py
├── base.py # Base test class (TestCase, BaseTestCase)
├── conf_fixture.py # ConfFixture, ConfReloadFixture
├── policy_fixture.py # PolicyFixture
├── fakes.py # Fake objects (FakePecanRequest, FakeService, etc.)
├── fixtures/
│ ├── __init__.py
│ └── watcher.py # StandardLogging, KeystoneClient
├── config.py # Pecan test app config
├── db/
│ ├── base.py # Database fixture and DbTestCase
│ └── utils.py # Test data creation helpers
├── objects/
│ └── utils.py # Object creation helpers
├── api/ # API tests
├── common/ # Common utility tests
├── decision_engine/ # Decision engine tests
├── applier/ # Applier tests
└── ... (various test modules)
Already Available:
-
watcher/tests/fixtures/watcher.py:StandardLogging- Logging setup with OS_DEBUG supportKeystoneClient- Mock Keystone client
-
watcher/tests/conf_fixture.py:ConfFixture- Configuration management with SQLite defaultsConfReloadFixture- Configuration reloading support
-
watcher/tests/policy_fixture.py:PolicyFixture- Policy enforcement testing
-
watcher/tests/db/base.py:Database- Database fixture with schema cachingDbTestCase- Base class for DB tests
-
watcher/tests/db/utils.py:get_test_goal(),create_test_goal()get_test_audit(),create_test_audit()get_test_audit_template(),create_test_audit_template()get_test_action_plan(),create_test_action_plan()get_test_action(),create_test_action()get_test_strategy(),create_test_strategy()get_test_service(),create_test_service()- Plus many more helpers for all Watcher objects
-
watcher/tests/objects/utils.py:- Object-level test data creation helpers (similar to db/utils.py)
-
watcher/tests/fakes.py:FakePecanRequest,FakePecanResponseFakeServiceFakeResponsefor HTTP mocking
-
watcher/tests/base.py:BaseTestCase- Base with StandardLoggingTestCase- Full unit test setup with context, policy, config
Strengths:
- ✅ Comprehensive unit test coverage
- ✅ Excellent test data creation helpers (db/utils.py, objects/utils.py)
- ✅ Proper logging fixture with OS_DEBUG support
- ✅ Configuration fixtures already in place
- ✅ Database fixture with schema caching
- ✅ Policy testing support
- ✅ Good test organization by module
- ✅ Fake objects for common test scenarios
Gaps:
- ❌ All tests are unit tests (extensive mocking)
- ❌ No functional test infrastructure
- ❌ No fixtures for external services (Nova, Gnocchi)
- ❌ No RPC fixture (messaging already configured but no capture)
- ❌ No notification capture fixture
- ❌ No API fixture for functional testing (wsgi-intercept)
- ❌ No service fixtures for starting DE/Applier
- ❌ No regression test framework
- ❌ Limited end-to-end testing
Important: Watcher already has excellent infrastructure that we can reuse:
- Database fixture exists and works well
- Test data helpers are comprehensive
- Configuration management is solid
- Logging setup is production-ready
Strategy: Extract and enhance existing code rather than rebuild from scratch.
Oslo Libraries (already in use):
- oslo.config - Configuration management
- oslo.db - Database abstraction
- oslo.messaging - RPC and notifications
- oslo.log - Logging
- oslo.policy - Policy enforcement
External Services (need fixtures):
- Nova - Compute operations (get instances, migrate, etc.)
- Glance - Image service (rarely used directly)
- Ceilometer - Metrics collection (legacy, optional)
- Gnocchi - Metrics storage and aggregation
- Placement - Resource provider inventory
- Neutron - Network information (optional)
- Cinder - Volume information (optional)
API Framework:
- Pecan/WSGI - Different from Nova's approach
- oslo.policy for RBAC
- Uses API paste configuration
- Reorganize tests to support both unit and functional testing
- Create fixture infrastructure for external services
- Build base test classes for functional tests
- Integrate Gabbi tests for declarative API testing
- Establish regression test framework with documentation
- Document functional testing practices for contributors
- Tests reorganized into
unit/andfunctional/subdirectories - Functional base test class with all required fixtures
- At least 5 fixtures for external services
- Gabbi test infrastructure operational with example YAML tests
- At least 5 Gabbi YAML test files covering major API endpoints
- At least 3 end-to-end Python functional tests
- Regression test framework with README and example
- Contributor documentation explaining functional vs unit tests and Gabbi tests
- CI jobs running both Python and Gabbi functional tests
- All existing tests still pass after reorganization
Important: Following OpenStack conventions:
- Keep
__init__.pyfiles empty - No__all__exports per OpenStack style - Explicit imports - Import directly:
from watcher.tests.fixtures import database - Shared fixtures - Design fixtures to work in both unit and functional tests where appropriate
- Minimal mocking in base classes - Only mock what's truly necessary for all tests
Watcher already has excellent test infrastructure that should be extracted and enhanced for reuse. This is covered in the new Phase 0 below.
Shared (both unit + functional):
StandardLogging- Both need proper logging with OS_DEBUG supportDatabase- Both use SQLite in-memory (unit tests can use it too)ConfFixture/ConfReloadFixture- Both need configuration managementPolicyFixture- Both test policy enforcement
Functional-only:
RPCFixture- Unit tests mock RPC calls directly withmock.patchNotificationFixture- Unit tests verify notifications are called, not contentNovaFixture,GnocchiFixture- Unit tests mock client methods directlyAPIFixture- Unit tests use Pecan's test client, not full WSGIServiceFixture- Unit tests don't start actual services
Key Principle: Unit tests mock external dependencies directly at call sites. Functional tests use fixtures that implement limited but realistic behavior.
| Aspect | Nova | Watcher |
|---|---|---|
| Database | API DB + multiple cell DBs | Single database |
| Services | compute, conductor, scheduler | watcher-api, watcher-decision-engine, watcher-applier |
| API Framework | Custom WSGI + Paste | Pecan + Paste |
| Primary Function | VM lifecycle management | Infrastructure optimization |
| External Deps | Glance, Neutron, Cinder, Placement | Nova, Gnocchi/Ceilometer, Placement |
| Concurrency | Eventlet (being removed) | Eventlet (will need threading) |
Nova Needs:
- PlacementFixture (resource providers)
- CinderFixture (volumes)
- NeutronFixture (networks, ports)
- GlanceFixture (images)
Watcher Needs:
- PlacementFixture (resource providers, same as Nova)
- NovaFixture (instances, hosts, compute services)
- GnocchiFixture (metrics, measures, aggregation)
- CeilometerFixture (legacy metrics, optional)
Nova:
# Uses custom OSAPIFixture with wsgi-intercept
app = nova.api.openstack.compute.APIRouterV21()Watcher:
# Uses Pecan framework
from watcher.api import app as pecan_app
app = pecan_app.setup_app()Goal: Extract reusable fixtures before reorganization, clean up imports, create helper functions.
Extract watcher/tests/db/base.py:Database to watcher/tests/fixtures/database.py:
Changes:
- Create
watcher/tests/fixtures/database.pywith enhanced Database fixture - Update
watcher/tests/db/base.pyto import from new location - Keep
__init__.pyfiles empty (OpenStack style - no__all__)
Note: Database fixture should work for both unit and functional tests, so place in shared location.
Create watcher/tests/helpers.py with common helper functions that work in both unit and functional tests.
These helpers wrap existing db/utils.py and objects/utils.py functions with:
- Better defaults
- Simplified APIs
- Automatic cleanup registration (for functional tests)
- Wait helpers for async operations
watcher/tests/helpers.py:
"""Helper functions for Watcher tests.
These helpers work in both unit and functional tests.
Usage:
from watcher.tests import helpers
# In unit test
audit = helpers.create_test_audit(self.context)
# In functional test
audit = helpers.create_test_audit(self.context, name='my-audit')
result = helpers.wait_for_audit_state(
self.api, audit.uuid, 'SUCCEEDED')
"""
import time
from oslo_utils import uuidutils
from watcher import objects
from watcher.tests.db import utils as db_utils
def create_test_audit(context, **kwargs):
"""Create a test audit with sensible defaults.
Args:
context: Request context
**kwargs: Override any audit fields
Returns:
Audit object
"""
defaults = {
'uuid': uuidutils.generate_uuid(),
'name': 'test-audit',
'audit_type': 'ONESHOT',
'state': 'PENDING',
'goal_id': kwargs.pop('goal_id', 1),
}
defaults.update(kwargs)
audit = db_utils.create_test_audit(**defaults)
return objects.Audit.get_by_uuid(context, audit.uuid)
def create_test_action_plan(context, **kwargs):
"""Create a test action plan.
Args:
context: Request context
**kwargs: Override any action plan fields.
If audit_id not provided, creates an audit.
Returns:
ActionPlan object
"""
defaults = {
'uuid': uuidutils.generate_uuid(),
'state': 'RECOMMENDED',
}
# Auto-create audit if needed
if 'audit_id' not in kwargs:
audit = create_test_audit(context)
defaults['audit_id'] = audit.id
defaults.update(kwargs)
action_plan = db_utils.create_test_action_plan(**defaults)
return objects.ActionPlan.get_by_uuid(context, action_plan.uuid)
def wait_for_audit_state(api_client, audit_uuid, expected_state,
timeout=30, fail_states=None):
"""Wait for audit to reach expected state.
For functional tests that need to wait for async operations.
Args:
api_client: API client instance (e.g., self.api)
audit_uuid: UUID of audit to monitor
expected_state: State to wait for (e.g., 'SUCCEEDED')
timeout: Maximum time to wait in seconds
fail_states: States indicating failure (default: FAILED, CANCELLED)
Returns:
Final audit dict
Raises:
AssertionError: If timeout or failure state reached
"""
if fail_states is None:
fail_states = ['FAILED', 'CANCELLED']
end_time = time.time() + timeout
while time.time() < end_time:
audit = api_client.get_audit(audit_uuid)
if audit['state'] == expected_state:
return audit
if audit['state'] in fail_states:
raise AssertionError(
'Audit %s failed with state: %s' %
(audit_uuid, audit['state']))
time.sleep(0.1)
raise AssertionError(
'Timeout waiting for audit %s to reach %s' %
(audit_uuid, expected_state))
def wait_for_action_plan_state(api_client, action_plan_uuid, expected_state,
timeout=30, fail_states=None):
"""Wait for action plan to reach expected state.
Args:
api_client: API client instance
action_plan_uuid: UUID of action plan
expected_state: State to wait for
timeout: Maximum wait time
fail_states: Failure states
Returns:
Final action plan dict
"""
if fail_states is None:
fail_states = ['FAILED', 'CANCELLED']
end_time = time.time() + timeout
while time.time() < end_time:
ap = api_client.get_action_plan(action_plan_uuid)
if ap['state'] == expected_state:
return ap
if ap['state'] in fail_states:
raise AssertionError(
'Action plan %s failed with state: %s' %
(action_plan_uuid, ap['state']))
time.sleep(0.1)
raise AssertionError(
'Timeout waiting for action plan %s to reach %s' %
(action_plan_uuid, expected_state))- Keep
watcher/tests/fixtures/__init__.pyempty (no exports) - Document fixture usage with module-level docstrings
- Ensure all existing fixtures follow OpenStack patterns
- Update a few unit tests to validate helpers work
Validation:
- Run full unit test suite:
tox -e py3 - Verify no imports broke
- Check that helpers work correctly
Goal: Reorganize existing tests without breaking anything.
Changes:
OLD: NEW:
watcher/tests/ watcher/tests/
├── base.py ├── __init__.py
├── common/ ├── unit/ # All existing tests
├── decision_engine/ │ ├── __init__.py
├── applier/ │ ├── base.py # Moved from tests/
├── api/ │ ├── common/
├── db/ │ ├── decision_engine/
└── ... │ ├── applier/
│ ├── api/
│ ├── db/
│ └── ...
├── local_fixtures/ # New (empty for now)
│ └── __init__.py
└── functional/ # New (empty for now)
└── __init__.py
Implementation Steps:
- Create new directory structure:
mkdir -p watcher/tests/unit
mkdir -p watcher/tests/local_fixtures
mkdir -p watcher/tests/functional- Move all existing tests:
# Move base.py
git mv watcher/tests/base.py watcher/tests/unit/base.py
# Move all test modules
for dir in watcher/tests/*/; do
if [[ "$dir" != *"__pycache__"* ]]; then
git mv "$dir" watcher/tests/unit/
fi
done- Update imports in moved test files:
# OLD: from watcher.tests import base
# NEW: from watcher.tests.unit import base- Update tox.ini to handle new structure:
[testenv]
# Run unit tests by default
commands =
stestr run --test-path=./watcher/tests/unit {posargs}
[testenv:functional]
# New environment for functional tests
commands =
stestr run --test-path=./watcher/tests/functional {posargs}-
Create init.py files with proper exports
-
Run full test suite to ensure nothing breaks
Commit Message:
Reorganize tests to support functional testing
Move all existing tests from watcher/tests/ to watcher/tests/unit/
to make room for functional tests. Create empty functional/ and
local_fixtures/ directories for future work.
This is a pure reorganization commit with no functional changes.
All existing tests should continue to pass.
Related-Bug: #XXXXXXX
Goal: Create core fixtures needed by all functional tests.
Files to Create:
watcher/tests/local_fixtures/__init__.py:
"""Local fixtures for Watcher functional tests.
Named 'local_fixtures' to avoid conflicts with the 'fixtures' package.
Per OpenStack style, this file is kept empty. Import fixtures directly:
from watcher.tests.local_fixtures import database
from watcher.tests.local_fixtures import rpc
"""
# Empty per OpenStack convention - import fixtures directly from their moduleswatcher/tests/local_fixtures/conf.py:
"""Configuration fixture for Watcher tests."""
from oslo_config import cfg
from oslo_config import fixture as config_fixture
from watcher import conf as watcher_conf
CONF = cfg.CONF
class ConfFixture(config_fixture.Config):
"""Fixture to manage global conf settings for tests."""
def setUp(self):
super(ConfFixture, self).setUp()
# Default group
self.conf.set_default('debug', True)
self.conf.set_default('host', 'test-host')
# Database group
self.conf.set_default('connection', 'sqlite://', group='database')
self.conf.set_default('sqlite_synchronous', False, group='database')
# API group
self.conf.set_default('host', '127.0.0.1', group='api')
self.conf.set_default('port', 9322, group='api')
self.conf.set_default('max_limit', 1000, group='api')
# Watcher-specific settings
self.conf.set_default('conductor_topic', 'watcher.decision.control',
group='watcher_decision_engine')
self.conf.set_default('conductor_topic', 'watcher.applier.control',
group='watcher_applier')
# Disable periodic tasks in tests
self.conf.set_default('periodic_interval', 0,
group='watcher_decision_engine')
# Parse config
watcher_conf.parse_args([], default_config_files=[], configure_db=False,
init_rpc=False)watcher/tests/local_fixtures/database.py:
"""Database fixture for Watcher tests."""
import fixtures
from oslo_config import cfg
from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import test_fixtures as db_fixtures
from watcher.db.sqlalchemy import api as db_api
from watcher.db.sqlalchemy import migration
CONF = cfg.CONF
DB_SCHEMA = {} # Schema cache for speed
class Database(fixtures.Fixture):
"""Create a database fixture with SQLite.
Uses in-memory SQLite with schema caching for fast test execution.
"""
def setUp(self):
super(Database, self).setUp()
# Create new enginefacade for this test
new_engine = enginefacade.transaction_context()
# Replace global context manager
self.useFixture(
db_fixtures.ReplaceEngineFacadeFixture(
db_api.get_context_manager(), new_engine))
# Configure database
db_api.configure(CONF)
self.get_engine = db_api.get_engine
self._apply_schema()
self.addCleanup(self.cleanup)
def _apply_schema(self):
"""Apply database schema (cached for speed)."""
global DB_SCHEMA
if not DB_SCHEMA:
# First test: run migrations and cache
engine = self.get_engine()
conn = engine.connect()
# Run migrations
migration.upgrade('head')
# Cache schema
DB_SCHEMA['schema'] = "".join(
line for line in conn.connection.iterdump())
conn.close()
else:
# Subsequent tests: use cached schema
engine = self.get_engine()
conn = engine.connect()
conn.connection.executescript(DB_SCHEMA['schema'])
conn.close()
def cleanup(self):
"""Dispose engine."""
engine = self.get_engine()
engine.dispose()Commit Message:
Add configuration and database fixtures for functional tests
Introduce ConfFixture and Database fixtures as foundation for
functional testing. These fixtures provide:
- Test-specific configuration with sensible defaults
- In-memory SQLite database with schema caching
- Automatic cleanup after each test
The Database fixture caches the schema after the first test runs
migrations, making subsequent tests much faster.
Part of functional test infrastructure implementation.
Related-Bug: #XXXXXXX
watcher/tests/local_fixtures/rpc.py:
"""RPC fixture for Watcher tests."""
import fixtures
from oslo_config import cfg
import oslo_messaging as messaging
from oslo_messaging import conffixture as messaging_conffixture
from unittest import mock
from watcher.common import rpc
CONF = cfg.CONF
class RPCFixture(fixtures.Fixture):
"""Set up RPC with fake:// transport for testing."""
def __init__(self, *exmods):
super(RPCFixture, self).__init__()
self.exmods = list(exmods)
self._buses = {}
def _fake_create_transport(self, url):
"""Create or return cached fake transport."""
# Collapse all connections to single bus for testing
url = None
if url not in self._buses:
self._buses[url] = messaging.get_rpc_transport(
CONF, url=url,
allowed_remote_exmods=rpc.get_allowed_exmods())
return self._buses[url]
def setUp(self):
super(RPCFixture, self).setUp()
self.addCleanup(rpc.cleanup)
# Configure fake transport
self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
self.messaging_conf.transport_url = 'fake:/'
self.useFixture(self.messaging_conf)
# Patch transport creation
self.useFixture(fixtures.MonkeyPatch(
'watcher.common.rpc.create_transport',
self._fake_create_transport))
# Initialize RPC
with mock.patch('watcher.common.rpc.get_transport_url') as mock_gtu:
mock_gtu.return_value = None
rpc.init(CONF)
# Cleanup in-flight messages between tests
def cleanup_rpc_messages():
if hasattr(messaging._drivers, 'impl_fake'):
messaging._drivers.impl_fake.FakeExchangeManager._exchanges = {}
self.addCleanup(cleanup_rpc_messages)
class CastAsCallFixture(fixtures.Fixture):
"""Make RPC casts behave as calls for synchronous testing.
This converts fire-and-forget casts into synchronous calls,
making tests deterministic.
"""
def setUp(self):
super(CastAsCallFixture, self).setUp()
# Stub out cast to use call instead
self.useFixture(fixtures.MonkeyPatch(
'oslo_messaging.RPCClient.cast',
messaging.RPCClient.call))watcher/tests/local_fixtures/notifications.py:
"""Notification fixture for Watcher tests."""
import collections
import functools
import threading
import fixtures
from oslo_log import log as logging
import oslo_messaging
from oslo_serialization import jsonutils
from oslo_utils import timeutils
from watcher.common import rpc
LOG = logging.getLogger(__name__)
class _Sub(object):
"""Subscription helper for waiting on notifications."""
def __init__(self):
self._cond = threading.Condition()
self._notifications = []
def received(self, notification):
with self._cond:
self._notifications.append(notification)
self._cond.notify_all()
def wait_n(self, n, event_type, timeout):
"""Wait until at least n notifications received."""
with timeutils.StopWatch(timeout) as timer:
with self._cond:
while len(self._notifications) < n:
if timer.expired():
raise AssertionError(
"Notification %s not received within %s seconds" %
(event_type, timeout))
self._cond.wait(timer.leftover())
return list(self._notifications)
class FakeVersionedNotifier(object):
"""Captures versioned notifications for verification."""
def __init__(self, transport, publisher_id, serializer=None):
self.transport = transport
self.publisher_id = publisher_id
self._serializer = serializer or \
oslo_messaging.serializer.NoOpSerializer()
self.versioned_notifications = []
self.subscriptions = collections.defaultdict(_Sub)
# Create notification methods
for priority in ['debug', 'info', 'warn', 'error', 'critical']:
setattr(self, priority,
functools.partial(self._notify, priority.upper()))
def prepare(self, publisher_id=None):
if publisher_id is None:
publisher_id = self.publisher_id
return self.__class__(self.transport, publisher_id,
serializer=self._serializer)
def _notify(self, priority, ctxt, event_type, payload):
"""Capture notification."""
payload = self._serializer.serialize_entity(ctxt, payload)
jsonutils.to_primitive(payload)
notification = {
'publisher_id': self.publisher_id,
'priority': priority,
'event_type': event_type,
'payload': payload,
}
self.versioned_notifications.append(notification)
self.subscriptions[event_type].received(notification)
def wait_for_versioned_notifications(self, event_type, n_events=1,
timeout=10.0):
"""Wait for notifications with timeout."""
return self.subscriptions[event_type].wait_n(
n_events, event_type, timeout)
def reset(self):
self.versioned_notifications.clear()
self.subscriptions.clear()
class NotificationFixture(fixtures.Fixture):
"""Fixture to capture oslo.messaging notifications."""
def __init__(self, test):
self.test = test
def setUp(self):
super(NotificationFixture, self).setUp()
self.addCleanup(self.reset)
# Create fake notifier
self.fake_versioned_notifier = FakeVersionedNotifier(
rpc.NOTIFIER.transport,
rpc.NOTIFIER.publisher_id,
serializer=getattr(rpc.NOTIFIER, '_serializer', None))
# Stub out global notifier
self.test.useFixture(fixtures.MonkeyPatch(
'watcher.common.rpc.NOTIFIER',
self.fake_versioned_notifier))
def reset(self):
self.fake_versioned_notifier.reset()
def wait_for_versioned_notifications(self, event_type, n_events=1,
timeout=10.0):
return self.fake_versioned_notifier.wait_for_versioned_notifications(
event_type, n_events, timeout)
@property
def versioned_notifications(self):
return self.fake_versioned_notifier.versioned_notificationswatcher/tests/local_fixtures/nova.py:
"""Nova fixture for Watcher tests."""
import copy
import fixtures
from oslo_utils import uuidutils
from watcher.tests.local_fixtures import conf as conf_fixture
class NovaFixture(fixtures.Fixture):
"""Mock Nova API for Watcher tests.
Provides stateful mocking of Nova compute operations including:
- Listing compute nodes and services
- Getting instance details
- Live migration operations
- Cold migration operations
- Instance actions
"""
# Pre-defined test instances
INSTANCE_1 = {
'id': '73b09e16-35b7-4922-804e-e8f5d9b740fc',
'name': 'instance-1',
'status': 'ACTIVE',
'OS-EXT-SRV-ATTR:host': 'compute-1',
'OS-EXT-SRV-ATTR:hypervisor_hostname': 'compute-1',
'flavor': {'id': '1', 'name': 'm1.small'},
'tenant_id': 'test-project',
}
INSTANCE_2 = {
'id': 'cef19ce0-0ca2-11e6-a747-00012c99e920',
'name': 'instance-2',
'status': 'ACTIVE',
'OS-EXT-SRV-ATTR:host': 'compute-2',
'OS-EXT-SRV-ATTR:hypervisor_hostname': 'compute-2',
'flavor': {'id': '2', 'name': 'm1.medium'},
'tenant_id': 'test-project',
}
# Pre-defined compute nodes
COMPUTE_NODE_1 = {
'id': 1,
'hypervisor_hostname': 'compute-1',
'state': 'up',
'status': 'enabled',
'vcpus': 16,
'vcpus_used': 2,
'memory_mb': 32768,
'memory_mb_used': 4096,
'local_gb': 500,
'local_gb_used': 50,
}
COMPUTE_NODE_2 = {
'id': 2,
'hypervisor_hostname': 'compute-2',
'state': 'up',
'status': 'enabled',
'vcpus': 16,
'vcpus_used': 4,
'memory_mb': 32768,
'memory_mb_used': 8192,
'local_gb': 500,
'local_gb_used': 100,
}
def __init__(self, test):
super(NovaFixture, self).__init__()
self.test = test
self._instances = {}
self._compute_nodes = {}
self._services = {}
def setUp(self):
super(NovaFixture, self).setUp()
# Add default instances and compute nodes
self._instances[self.INSTANCE_1['id']] = copy.deepcopy(self.INSTANCE_1)
self._instances[self.INSTANCE_2['id']] = copy.deepcopy(self.INSTANCE_2)
self._compute_nodes[self.COMPUTE_NODE_1['id']] = copy.deepcopy(self.COMPUTE_NODE_1)
self._compute_nodes[self.COMPUTE_NODE_2['id']] = copy.deepcopy(self.COMPUTE_NODE_2)
# Mock nova client
self.test.useFixture(fixtures.MonkeyPatch(
'watcher.common.clients.nova.NovaClient.get_compute_node_list',
self.get_compute_node_list))
self.test.useFixture(fixtures.MonkeyPatch(
'watcher.common.clients.nova.NovaClient.get_instance_list',
self.get_instance_list))
self.test.useFixture(fixtures.MonkeyPatch(
'watcher.common.clients.nova.NovaClient.get_instance',
self.get_instance))
self.test.useFixture(fixtures.MonkeyPatch(
'watcher.common.clients.nova.NovaClient.live_migrate',
self.live_migrate))
def get_compute_node_list(self):
"""Mock getting compute node list."""
return list(self._compute_nodes.values())
def get_instance_list(self):
"""Mock getting instance list."""
return list(self._instances.values())
def get_instance(self, instance_id):
"""Mock getting single instance."""
if instance_id not in self._instances:
raise Exception('InstanceNotFound: %s' % instance_id)
return copy.deepcopy(self._instances[instance_id])
def live_migrate(self, instance_id, dest_hostname, block_migration=False):
"""Mock live migration."""
if instance_id not in self._instances:
raise Exception('InstanceNotFound: %s' % instance_id)
instance = self._instances[instance_id]
instance['OS-EXT-SRV-ATTR:host'] = dest_hostname
instance['OS-EXT-SRV-ATTR:hypervisor_hostname'] = dest_hostname
return True
def add_instance(self, instance_dict):
"""Add a custom instance to the fixture."""
instance_id = instance_dict.get('id') or uuidutils.generate_uuid()
instance_dict['id'] = instance_id
self._instances[instance_id] = instance_dict
return instance_dict
def add_compute_node(self, node_dict):
"""Add a custom compute node to the fixture."""
node_id = node_dict.get('id') or len(self._compute_nodes) + 1
node_dict['id'] = node_id
self._compute_nodes[node_id] = node_dict
return node_dictwatcher/tests/local_fixtures/gnocchi.py:
"""Gnocchi fixture for Watcher tests."""
import copy
import fixtures
from oslo_utils import uuidutils
class GnocchiFixture(fixtures.Fixture):
"""Mock Gnocchi API for Watcher tests.
Provides stateful mocking of Gnocchi metric operations:
- Resource listing and details
- Metric measures
- Aggregation operations
"""
# Pre-defined resources (instances)
RESOURCE_INSTANCE_1 = {
'id': '73b09e16-35b7-4922-804e-e8f5d9b740fc',
'type': 'instance',
'project_id': 'test-project',
'host': 'compute-1',
'metrics': {
'cpu_util': 'cpu-metric-1',
'memory.usage': 'memory-metric-1',
}
}
# Pre-defined measures
CPU_MEASURES = [
{'timestamp': '2025-10-07T10:00:00', 'value': 25.5},
{'timestamp': '2025-10-07T10:01:00', 'value': 30.2},
{'timestamp': '2025-10-07T10:02:00', 'value': 28.7},
]
MEMORY_MEASURES = [
{'timestamp': '2025-10-07T10:00:00', 'value': 2048},
{'timestamp': '2025-10-07T10:01:00', 'value': 2100},
{'timestamp': '2025-10-07T10:02:00', 'value': 2050},
]
def __init__(self, test):
super(GnocchiFixture, self).__init__()
self.test = test
self._resources = {}
self._measures = {}
def setUp(self):
super(GnocchiFixture, self).setUp()
# Add default resources
res_id = self.RESOURCE_INSTANCE_1['id']
self._resources[res_id] = copy.deepcopy(self.RESOURCE_INSTANCE_1)
# Add default measures
self._measures['cpu-metric-1'] = copy.deepcopy(self.CPU_MEASURES)
self._measures['memory-metric-1'] = copy.deepcopy(self.MEMORY_MEASURES)
# Mock gnocchi client
self.test.useFixture(fixtures.MonkeyPatch(
'watcher.common.clients.gnocchi.GnocchiClient.get_resources',
self.get_resources))
self.test.useFixture(fixtures.MonkeyPatch(
'watcher.common.clients.gnocchi.GnocchiClient.get_measures',
self.get_measures))
self.test.useFixture(fixtures.MonkeyPatch(
'watcher.common.clients.gnocchi.GnocchiClient.aggregate_measures',
self.aggregate_measures))
def get_resources(self, resource_type='instance', details=True):
"""Mock getting resources."""
resources = [r for r in self._resources.values()
if r['type'] == resource_type]
return resources
def get_measures(self, metric_id, start=None, stop=None, aggregation='mean'):
"""Mock getting metric measures."""
if metric_id not in self._measures:
return []
return copy.deepcopy(self._measures[metric_id])
def aggregate_measures(self, metrics, aggregation='mean', start=None, stop=None):
"""Mock aggregating measures."""
# Simple average for testing
all_values = []
for metric_id in metrics:
if metric_id in self._measures:
all_values.extend([m['value'] for m in self._measures[metric_id]])
if not all_values:
return []
avg_value = sum(all_values) / len(all_values)
return [{'timestamp': '2025-10-07T10:00:00', 'value': avg_value}]
def add_resource(self, resource_dict):
"""Add custom resource to fixture."""
res_id = resource_dict.get('id') or uuidutils.generate_uuid()
resource_dict['id'] = res_id
self._resources[res_id] = resource_dict
return resource_dict
def set_measures(self, metric_id, measures):
"""Set measures for a metric."""
self._measures[metric_id] = copy.deepcopy(measures)watcher/tests/local_fixtures/api.py:
"""API fixture for Watcher tests."""
import fixtures
from oslo_config import cfg
from oslo_utils.fixture import uuidsentinel
from wsgi_intercept import interceptor
from watcher.api import app as watcher_app
from watcher.tests.local_fixtures import conf as conf_fixture
from watcher.tests.functional.api import client
CONF = cfg.CONF
class APIFixture(fixtures.Fixture):
"""Create a Watcher API server as a fixture.
Runs real Pecan WSGI application using wsgi-intercept.
"""
def __init__(self, api_version='v1'):
super(APIFixture, self).__init__()
self.api_version = api_version
def setUp(self):
super(APIFixture, self).setUp()
# Unique hostname for wsgi-intercept
hostname = str(uuidsentinel.watcher_api_host)
port = 9322
endpoint = 'http://%s:%s/' % (hostname, port)
# Set debug mode
self.useFixture(conf_fixture.ConfPatcher(debug=True))
# Disable auth for testing
self.useFixture(conf_fixture.ConfPatcher(
auth_strategy='noauth',
group='api'))
# Load Pecan WSGI app
app_conf = {
'app': {
'root': 'watcher.api.controllers.root.RootController',
'modules': ['watcher.api'],
}
}
app = watcher_app.setup_app(config=app_conf)
# Install wsgi-intercept
intercept = interceptor.RequestsInterceptor(
lambda: app, url=endpoint)
intercept.install_intercept()
self.addCleanup(intercept.uninstall_intercept)
# Create API clients
base_url = 'http://%(host)s:%(port)s/%(version)s' % {
'host': hostname,
'port': port,
'version': self.api_version
}
self.api = client.WatcherApiClient('user', base_url,
project_id='test-project')
self.admin_api = client.WatcherApiClient('admin', base_url,
project_id='test-project',
is_admin=True)watcher/tests/functional/api/client.py:
"""API client for Watcher functional tests."""
import requests
from oslo_serialization import jsonutils
class WatcherApiClient(object):
"""Simple HTTP client for Watcher API."""
def __init__(self, user_id, base_url, project_id='test-project',
is_admin=False):
self.user_id = user_id
self.project_id = project_id
self.is_admin = is_admin
self.base_url = base_url
def _request(self, method, url, body=None, **kwargs):
"""Make HTTP request."""
headers = {
'X-Auth-Token': 'fake-token',
'X-User-Id': self.user_id,
'X-Project-Id': self.project_id,
'X-Roles': 'admin' if self.is_admin else 'member',
'Content-Type': 'application/json',
'Accept': 'application/json',
}
headers.update(kwargs.get('headers', {}))
full_url = self.base_url + url
if method == 'GET':
response = requests.get(full_url, headers=headers)
elif method == 'POST':
response = requests.post(full_url, json=body, headers=headers)
elif method == 'PATCH':
response = requests.patch(full_url, json=body, headers=headers)
elif method == 'DELETE':
response = requests.delete(full_url, headers=headers)
else:
raise ValueError('Unsupported method: %s' % method)
return response
def get(self, url, **kwargs):
"""GET request."""
return self._request('GET', url, **kwargs)
def post(self, url, body, **kwargs):
"""POST request."""
return self._request('POST', url, body=body, **kwargs)
def patch(self, url, body, **kwargs):
"""PATCH request."""
return self._request('PATCH', url, body=body, **kwargs)
def delete(self, url, **kwargs):
"""DELETE request."""
return self._request('DELETE', url, **kwargs)
# Helper methods for common operations
def create_audit(self, audit_dict):
"""Create an audit."""
response = self.post('/audits', audit_dict)
response.raise_for_status()
return response.json()
def get_audit(self, audit_uuid):
"""Get audit details."""
response = self.get('/audits/%s' % audit_uuid)
response.raise_for_status()
return response.json()
def list_audits(self, **filters):
"""List audits."""
response = self.get('/audits', params=filters)
response.raise_for_status()
return response.json()['audits']
def delete_audit(self, audit_uuid):
"""Delete an audit."""
response = self.delete('/audits/%s' % audit_uuid)
response.raise_for_status()
def get_action_plan(self, action_plan_uuid):
"""Get action plan details."""
response = self.get('/action_plans/%s' % action_plan_uuid)
response.raise_for_status()
return response.json()
def list_action_plans(self, **filters):
"""List action plans."""
response = self.get('/action_plans', params=filters)
response.raise_for_status()
return response.json()['action_plans']
def start_action_plan(self, action_plan_uuid):
"""Start action plan execution."""
body = [{'op': 'replace', 'path': '/state', 'value': 'TRIGGERED'}]
response = self.patch('/action_plans/%s' % action_plan_uuid, body)
response.raise_for_status()
return response.json()watcher/tests/local_fixtures/service.py:
"""Service fixture for Watcher tests."""
import fixtures
import threading
from unittest import mock
from watcher.common import context
class ServiceFixture(fixtures.Fixture):
"""Run a Watcher service as a test fixture.
Starts services in background threads for functional testing.
"""
def __init__(self, binary, host=None, **kwargs):
"""Initialize service fixture.
Args:
binary: Service binary name (watcher-decision-engine, watcher-applier)
host: Host name for the service
**kwargs: Additional service arguments
"""
super(ServiceFixture, self).__init__()
self.binary = binary
self.host = host or 'test-host'
self.kwargs = kwargs
def setUp(self):
super(ServiceFixture, self).setUp()
# Create admin context
self.ctxt = context.make_context(is_admin=True)
# Mock context creation
mock_ctx = mock.MagicMock(return_value=self.ctxt)
self.useFixture(fixtures.MonkeyPatch(
'watcher.common.context.make_context',
mock_ctx))
# Import and start the appropriate service
if self.binary == 'watcher-decision-engine':
from watcher.decision_engine import manager as de_manager
self.manager = de_manager.DecisionEngineManager()
elif self.binary == 'watcher-applier':
from watcher.applier import manager as applier_manager
self.manager = applier_manager.ApplierManager()
else:
raise ValueError('Unknown binary: %s' % self.binary)
# Start in background thread
self.thread = threading.Thread(
target=self._run_service,
daemon=True)
self.thread.start()
self.addCleanup(self._cleanup)
def _run_service(self):
"""Run service in thread."""
try:
# Service managers typically have a run() or start() method
if hasattr(self.manager, 'run'):
self.manager.run()
elif hasattr(self.manager, 'start'):
self.manager.start()
except Exception:
# Expected when service is stopped
pass
def _cleanup(self):
"""Stop service and join thread."""
if hasattr(self.manager, 'stop'):
self.manager.stop()
# Give thread time to finish
self.thread.join(timeout=5)watcher/tests/functional/base.py:
"""Base classes for Watcher functional tests."""
import fixtures
from oslo_config import cfg
from oslo_log import log as logging
from oslotest import base
from watcher.common import context
from watcher.tests import local_fixtures as watcher_fixtures
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class WatcherFunctionalTestCase(base.BaseTestCase):
"""Base class for Watcher functional tests.
Provides:
- Database (SQLite in-memory)
- RPC (oslo.messaging fake driver)
- Notifications
- API server
- External service mocks (Nova, Gnocchi, etc.)
Functional tests should inherit from this class and can selectively
override service setup if needed.
"""
# Class attributes
USES_DB = True
STUB_RPC = True
START_DECISION_ENGINE = False
START_APPLIER = False
def setUp(self):
super(WatcherFunctionalTestCase, self).setUp()
# Configuration
self.useFixture(watcher_fixtures.ConfFixture(CONF))
# Database
if self.USES_DB:
self.useFixture(watcher_fixtures.Database())
# RPC and notifications
if self.STUB_RPC:
self.useFixture(watcher_fixtures.RPCFixture())
# Use fake notification driver
CONF.set_default('driver', ['test'],
group='oslo_messaging_notifications')
# Make RPC casts synchronous for deterministic tests
self.useFixture(watcher_fixtures.CastAsCallFixture())
# Notification capture
self.notifier = self.useFixture(
watcher_fixtures.NotificationFixture(self))
# External services
self.nova = self.useFixture(watcher_fixtures.NovaFixture(self))
self.gnocchi = self.useFixture(watcher_fixtures.GnocchiFixture(self))
# Placement fixture can be added when needed:
# self.placement = self.useFixture(PlacementFixture())
# API
self.api_fixture = self.useFixture(watcher_fixtures.APIFixture())
self.api = self.api_fixture.api
self.admin_api = self.api_fixture.admin_api
# Start services if requested
if self.START_DECISION_ENGINE:
self.start_service('watcher-decision-engine')
if self.START_APPLIER:
self.start_service('watcher-applier')
# Create admin context for tests
self.context = context.make_context(is_admin=True)
def flags(self, **kw):
"""Override flag variables for a test.
Example:
self.flags(periodic_interval=10,
group='watcher_decision_engine')
"""
group = kw.pop('group', None)
for k, v in kw.items():
CONF.set_override(k, v, group)
self.addCleanup(CONF.clear_override, k, group)
def start_service(self, binary, host=None, **kwargs):
"""Start a Watcher service.
Args:
binary: Service name (watcher-decision-engine, watcher-applier)
host: Host name for the service
**kwargs: Additional arguments for service
Returns:
ServiceFixture instance
"""
if host is not None:
self.flags(host=host)
svc = self.useFixture(
watcher_fixtures.ServiceFixture(binary, host, **kwargs))
return svcGoal: Add Gabbi-based declarative API testing framework.
Gabbi tests provide declarative YAML-based HTTP testing, ideal for API behavior and microversion testing. They complement Python-based functional tests by focusing on API contracts.
Files to Create:
watcher/tests/functional/test_api_gabbi.py:
"""Gabbi test loader for Watcher API tests.
Loads declarative YAML tests from the gabbits/ directory.
"""
import os
from gabbi import driver
from oslotest import output
import wsgi_intercept
from watcher.tests.functional.fixtures import capture
from watcher.tests.functional.fixtures import gabbi as gabbi_fixtures
# Enforce strict response headers (native str)
wsgi_intercept.STRICT_RESPONSE_HEADERS = True
# Directory containing YAML test files
TESTS_DIR = 'gabbits'
def load_tests(loader, tests, pattern):
"""Provide TestSuite to the discovery process.
This is the standard Python unittest load_tests protocol.
Called by test runners (stestr, unittest discover).
:param loader: unittest.TestLoader
:param tests: Existing TestSuite (ignored)
:param pattern: Pattern for test discovery (ignored)
:returns: TestSuite containing Gabbi tests
"""
test_dir = os.path.join(os.path.dirname(__file__), TESTS_DIR)
# Per-test fixtures (clean output/logging per test)
inner_fixtures = [
output.CaptureOutput,
capture.Logging,
]
# Build test suite from YAML files
return driver.build_tests(
test_dir, # Directory with YAML files
loader, # unittest.TestLoader
host=None, # No real host (wsgi-intercept)
test_loader_name=__name__, # Module name for test naming
intercept=gabbi_fixtures.setup_app, # App factory function
inner_fixtures=inner_fixtures, # Per-test fixtures
fixture_module=gabbi_fixtures # Module with GabbiFixture classes
)watcher/tests/functional/fixtures/gabbi.py:
"""Gabbi fixtures for Watcher API testing.
Provides GabbiFixture classes for declarative YAML-based API tests.
"""
import os
from gabbi import fixture
from oslo_config import cfg
from oslo_config import fixture as config_fixture
from oslo_log.fixture import logging_error
from oslo_policy import opts as policy_opts
from oslo_utils import uuidutils
from oslotest import output
from watcher.api import app as watcher_app
from watcher.common import context
from watcher import conf as watcher_conf
from watcher.tests import fixtures as db_fixtures
from watcher.tests.functional.fixtures import capture
from watcher.tests.unit import policy_fixture
# Global CONF for setup_app workaround
# (gabbi requires app factory to be zero-argument function)
CONF = None
def setup_app():
"""App factory for gabbi.
Called by gabbi to get the WSGI application under test.
Uses wsgi-intercept to route HTTP calls to in-process app.
"""
global CONF
# Create Pecan WSGI application
# Note: Watcher uses custom Pecan config, not deploy.loadapp like Placement
from watcher.api import config as api_config
import pecan
pecan_config = pecan.configuration.conf_from_dict(api_config.PECAN_CONFIG)
app_conf = dict(pecan_config.app)
# Disable auth for testing
app_conf['enable_acl'] = False
app = pecan.make_app(
app_conf.pop('root'),
logging=getattr(pecan_config, 'logging', {}),
debug=True,
**app_conf
)
return app
class APIFixture(fixture.GabbiFixture):
"""Base fixture for Watcher Gabbi tests.
Sets up:
- Configuration
- Database (SQLite in-memory)
- Policy
- Logging and output capture
- Environment variables for test data
This fixture runs once per YAML file (start_fixture before first test,
stop_fixture after last test).
"""
def start_fixture(self):
"""Called once before any tests in a YAML file run."""
global CONF
# Set up logging and output capture
self.standard_logging_fixture = capture.Logging()
self.standard_logging_fixture.setUp()
self.output_stream_fixture = output.CaptureOutput()
self.output_stream_fixture.setUp()
self.logging_error_fixture = (
logging_error.get_logging_handle_error_fixture())
self.logging_error_fixture.setUp()
self.warnings_fixture = capture.WarningsFixture()
self.warnings_fixture.setUp()
# Create isolated config (don't use global CONF)
self.conf_fixture = config_fixture.Config(cfg.ConfigOpts())
self.conf_fixture.setUp()
watcher_conf.register_opts(self.conf_fixture.conf)
# Configure API with no auth
self.conf_fixture.config(group='api', auth_strategy='noauth')
self.conf_fixture.config(group='api', host='127.0.0.1')
self.conf_fixture.config(group='api', port=9322)
# Configure policy (no scope enforcement for tests)
policy_opts.set_defaults(self.conf_fixture.conf)
self.conf_fixture.config(
group='oslo_policy',
enforce_scope=False,
enforce_new_defaults=False,
)
# Set up database
self.placement_db_fixture = db_fixtures.Database(
self.conf_fixture, set_config=True)
self.placement_db_fixture.setUp()
# Create context for fixture data creation
self.context = context.make_context(is_admin=True)
# Empty config files list (don't read /etc/watcher/watcher.conf)
self.conf_fixture.conf([], default_config_files=[])
# Set up policy fixture
self.policy_fixture = policy_fixture.PolicyFixture(
self.conf_fixture)
self.policy_fixture.setUp()
# Set up environment variables for use in YAML tests
# These are substituted into test data via $ENVIRON['VAR_NAME']
self._setup_environment_variables()
# Store config globally for setup_app()
CONF = self.conf_fixture.conf
def _setup_environment_variables(self):
"""Set environment variables for YAML test data."""
# Audit related
os.environ['AUDIT_UUID'] = uuidutils.generate_uuid()
os.environ['AUDIT_UUID_2'] = uuidutils.generate_uuid()
os.environ['AUDIT_NAME'] = 'test-audit-%s' % uuidutils.generate_uuid()[:8]
# Action plan related
os.environ['ACTION_PLAN_UUID'] = uuidutils.generate_uuid()
os.environ['ACTION_UUID'] = uuidutils.generate_uuid()
# Strategy and goal
os.environ['STRATEGY_UUID'] = uuidutils.generate_uuid()
os.environ['GOAL_UUID'] = uuidutils.generate_uuid()
os.environ['GOAL_NAME'] = 'test-goal'
# Service
os.environ['SERVICE_NAME'] = 'watcher-decision-engine'
os.environ['SERVICE_HOST'] = 'test-host'
# Project and user
os.environ['PROJECT_ID'] = uuidutils.generate_uuid()
os.environ['USER_ID'] = uuidutils.generate_uuid()
# Compute resources (for strategies)
os.environ['INSTANCE_UUID'] = uuidutils.generate_uuid()
os.environ['COMPUTE_NODE_UUID'] = uuidutils.generate_uuid()
os.environ['SRC_NODE'] = 'compute-1'
os.environ['DEST_NODE'] = 'compute-2'
def stop_fixture(self):
"""Called after all tests in a YAML file complete."""
global CONF
# Clean up all fixtures in reverse order
self.placement_db_fixture.cleanUp()
self.policy_fixture.cleanUp()
self.warnings_fixture.cleanUp()
self.logging_error_fixture.cleanUp()
self.output_stream_fixture.cleanUp()
self.standard_logging_fixture.cleanUp()
self.conf_fixture.cleanUp()
CONF = Nonewatcher/tests/functional/fixtures/capture.py:
"""Fixtures for capturing logs and filtering warnings.
Similar to Placement's capture fixtures, adapted for Watcher.
"""
import logging
import warnings
import fixtures
from oslo_log.fixture import logging_error as log_fixture
from oslotest import log
from sqlalchemy import exc as sqla_exc
class NullHandler(logging.Handler):
"""Custom NullHandler that formats records.
Used to detect formatting errors in debug logs even when
logs aren't captured.
"""
def handle(self, record):
self.format(record)
def emit(self, record):
pass
def createLock(self):
self.lock = None
class Logging(log.ConfigureLogging):
"""Logging fixture for tests.
- Captures logs for later inspection
- Ensures DEBUG logs are formatted even if not captured
"""
def __init__(self):
super(Logging, self).__init__()
# Default to INFO if not otherwise set
if self.level is None:
self.level = logging.INFO
def setUp(self):
super(Logging, self).setUp()
if self.level > logging.DEBUG:
handler = NullHandler()
self.useFixture(fixtures.LogHandler(handler, nuke_handlers=False))
handler.setLevel(logging.DEBUG)
class WarningsFixture(fixtures.Fixture):
"""Filter or escalate certain warnings during test runs.
Add additional entries as required. Remove when obsolete.
"""
def setUp(self):
super(WarningsFixture, self).setUp()
self._original_warning_filters = warnings.filters[:]
warnings.simplefilter("once", DeprecationWarning)
# Ignore policy scope warnings (new RBAC system)
warnings.filterwarnings(
'ignore',
message="Policy .* failed scope check",
category=UserWarning)
# Escalate invalid UUID warnings to errors
warnings.filterwarnings('error', message=".*invalid UUID.*")
# Prevent introducing unmapped columns
warnings.filterwarnings(
'error',
category=sqla_exc.SAWarning)
# Configure SQLAlchemy warnings
warnings.filterwarnings(
'ignore',
category=sqla_exc.SADeprecationWarning)
warnings.filterwarnings(
'error',
module='watcher',
category=sqla_exc.SADeprecationWarning)
self.addCleanup(self._reset_warning_filters)
def _reset_warning_filters(self):
warnings.filters[:] = self._original_warning_filtersFiles to Create:
Create watcher/tests/functional/gabbits/ directory with example YAML tests:
watcher/tests/functional/gabbits/basic-http.yaml:
# Basic HTTP behavior tests for Watcher API
fixtures:
- APIFixture
defaults:
request_headers:
x-auth-token: admin
accept: application/json
openstack-api-version: infra-optim 1.0
tests:
- name: 404 at unknown endpoint
GET: /barnabas
status: 404
- name: 200 at API root
GET: /
status: 200
response_json_paths:
$.versions[0].id: v1
- name: 200 at v1 root
GET: /v1
status: 200
response_json_paths:
$.id: v1
$.media_types[0].base: application/jsonwatcher/tests/functional/gabbits/audit-lifecycle.yaml:
# Audit lifecycle API tests
fixtures:
- APIFixture
defaults:
request_headers:
x-auth-token: admin
accept: application/json
content-type: application/json
openstack-api-version: infra-optim 1.0
tests:
- name: list audits empty
GET: /v1/audits
response_json_paths:
$.audits: []
- name: create audit
POST: /v1/audits
data:
name: $ENVIRON['AUDIT_NAME']
audit_type: ONESHOT
goal: dummy
status: 201
response_headers:
location: //v1/audits/[a-f0-9-]+/
response_json_paths:
$.uuid: /^[a-f0-9-]+$/
$.name: $ENVIRON['AUDIT_NAME']
$.audit_type: ONESHOT
$.state: PENDING
- name: get audit
GET: $LOCATION
response_json_paths:
$.uuid: $HISTORY['create audit'].$RESPONSE['$.uuid']
$.name: $ENVIRON['AUDIT_NAME']
$.state: PENDING
$.goal_uuid: /^[a-f0-9-]+$/
- name: list audits has one
GET: /v1/audits
response_json_paths:
$.audits[0].uuid: $HISTORY['create audit'].$RESPONSE['$.uuid']
- name: patch audit name
PATCH: /v1/audits/$HISTORY['create audit'].$RESPONSE['$.uuid']
request_headers:
content-type: application/json
data:
- op: replace
path: /name
value: updated-audit-name
status: 200
response_json_paths:
$.name: updated-audit-name
- name: delete audit
DELETE: /v1/audits/$HISTORY['create audit'].$RESPONSE['$.uuid']
status: 204
- name: get deleted audit 404
GET: /v1/audits/$HISTORY['create audit'].$RESPONSE['$.uuid']
status: 404watcher/tests/functional/gabbits/microversions.yaml:
# Microversion testing
fixtures:
- APIFixture
defaults:
request_headers:
x-auth-token: admin
accept: application/json
tests:
- name: no version header defaults to 1.0
GET: /v1
response_headers:
openstack-api-version: "infra-optim 1.0"
- name: explicit version 1.0
GET: /v1
request_headers:
openstack-api-version: "infra-optim 1.0"
response_headers:
openstack-api-version: "infra-optim 1.0"
- name: latest version
GET: /v1
request_headers:
openstack-api-version: "infra-optim latest"
response_headers:
# Check that response has a valid version
openstack-api-version: /infra-optim \d+\.\d+/
- name: invalid version rejected
GET: /v1
request_headers:
openstack-api-version: "infra-optim 999.999"
status: 406watcher/tests/functional/fixtures/gabbi.py (additions):
# Add to existing gabbi.py file
class AuditFixture(APIFixture):
"""APIFixture with pre-created audit data.
Creates:
- A goal
- A strategy
- An audit template
- A pending audit
Useful for action plan and applier tests.
"""
def start_fixture(self):
# Call parent to set up base infrastructure
super(AuditFixture, self).start_fixture()
# Import helpers
from watcher.tests.db import utils as db_utils
from watcher import objects
# Create goal
goal = db_utils.create_test_goal(
name=os.environ['GOAL_NAME'],
uuid=os.environ['GOAL_UUID']
)
# Create strategy
strategy = db_utils.create_test_strategy(
name='dummy',
uuid=os.environ['STRATEGY_UUID'],
goal_id=goal.id
)
# Create audit template
audit_template = db_utils.create_test_audit_template(
name='test-template',
goal_id=goal.id,
strategy_id=strategy.id
)
# Create audit
audit = db_utils.create_test_audit(
uuid=os.environ['AUDIT_UUID'],
name=os.environ['AUDIT_NAME'],
audit_type='ONESHOT',
state='PENDING',
goal_id=goal.id,
strategy_id=strategy.id,
audit_template_id=audit_template.id
)
# Store IDs for tests
os.environ['AUDIT_TEMPLATE_UUID'] = audit_template.uuid
class ActionPlanFixture(AuditFixture):
"""APIFixture with pre-created action plan data.
Extends AuditFixture and adds:
- A recommended action plan
- Actions in the plan
"""
def start_fixture(self):
# Call parent to set up audit
super(ActionPlanFixture, self).start_fixture()
from watcher.tests.db import utils as db_utils
# Get audit ID from environment
audit_obj = objects.Audit.get_by_uuid(
self.context, os.environ['AUDIT_UUID'])
# Create action plan
action_plan = db_utils.create_test_action_plan(
uuid=os.environ['ACTION_PLAN_UUID'],
audit_id=audit_obj.id,
state='RECOMMENDED'
)
# Create action
action = db_utils.create_test_action(
uuid=os.environ['ACTION_UUID'],
action_plan_id=action_plan.id,
action_type='migrate',
state='PENDING',
input_parameters={
'migration_type': 'live',
'source_node': os.environ['SRC_NODE'],
'destination_node': os.environ['DEST_NODE'],
'resource_id': os.environ['INSTANCE_UUID'],
}
)Commit Message:
Add Gabbi-based declarative API testing infrastructure
Introduce Gabbi test framework for declarative YAML-based API
testing. Gabbi tests complement Python functional tests by
focusing on:
- API behavior and contracts
- HTTP status codes and headers
- Microversion negotiation
- Request/response JSON structure
Key components:
- test_api_gabbi.py: Gabbi test loader
- fixtures/gabbi.py: APIFixture, AuditFixture, ActionPlanFixture
- fixtures/capture.py: Logging and warning capture
- gabbits/*.yaml: Example declarative tests
Gabbi tests use wsgi-intercept to route HTTP calls to an
in-process Pecan WSGI application, providing fast execution
without network overhead.
Part of functional test infrastructure implementation.
Related-Bug: #XXXXXXX
watcher/tests/functional/test_api_audits.py:
"""Functional tests for Watcher audit API."""
from oslo_utils import uuidutils
from watcher.tests.functional import base
class TestAuditAPI(base.WatcherFunctionalTestCase):
"""Test audit API operations with real database and API."""
def test_create_audit(self):
"""Test creating an audit via API."""
# Create audit request
audit_dict = {
'audit_type': 'ONESHOT',
'goal': 'server_consolidation',
'name': 'test-audit',
}
# Create via API
audit = self.api.create_audit(audit_dict)
# Verify response
self.assertIsNotNone(audit.get('uuid'))
self.assertEqual('test-audit', audit['name'])
self.assertEqual('PENDING', audit['state'])
# Verify in database
from watcher.common import context
from watcher.objects import audit as audit_obj
db_audit = audit_obj.Audit.get_by_uuid(
self.context, audit['uuid'])
self.assertEqual('test-audit', db_audit.name)
self.assertEqual('server_consolidation', db_audit.goal.name)
def test_list_audits(self):
"""Test listing audits via API."""
# Create two audits
audit1 = self.api.create_audit({
'audit_type': 'ONESHOT',
'goal': 'server_consolidation',
'name': 'audit-1',
})
audit2 = self.api.create_audit({
'audit_type': 'CONTINUOUS',
'goal': 'thermal_optimization',
'name': 'audit-2',
})
# List all audits
audits = self.api.list_audits()
# Verify
self.assertEqual(2, len(audits))
audit_uuids = [a['uuid'] for a in audits]
self.assertIn(audit1['uuid'], audit_uuids)
self.assertIn(audit2['uuid'], audit_uuids)
def test_delete_audit(self):
"""Test deleting an audit via API."""
# Create audit
audit = self.api.create_audit({
'audit_type': 'ONESHOT',
'goal': 'server_consolidation',
'name': 'test-audit',
})
audit_uuid = audit['uuid']
# Delete
self.api.delete_audit(audit_uuid)
# Verify deleted
from watcher import exception
from watcher.objects import audit as audit_obj
self.assertRaises(
exception.AuditNotFound,
audit_obj.Audit.get_by_uuid,
self.context, audit_uuid)
def test_create_audit_with_notification(self):
"""Test that audit creation emits notification."""
# Create audit
audit_dict = {
'audit_type': 'ONESHOT',
'goal': 'server_consolidation',
'name': 'test-audit',
}
audit = self.api.create_audit(audit_dict)
# Wait for notification
notifications = self.notifier.wait_for_versioned_notifications(
'audit.create', n_events=1, timeout=10.0)
# Verify notification content
self.assertEqual(1, len(notifications))
payload = notifications[0]['payload']
self.assertEqual(audit['uuid'],
payload['watcher_object.data']['uuid'])
self.assertEqual('test-audit',
payload['watcher_object.data']['name'])watcher/tests/functional/test_workflows.py:
"""End-to-end workflow functional tests."""
import time
from watcher.tests.functional import base
class TestAuditWorkflow(base.WatcherFunctionalTestCase):
"""Test complete audit → strategy → action plan workflow."""
START_DECISION_ENGINE = True
START_APPLIER = True
def test_oneshot_audit_workflow(self):
"""Test complete workflow for ONESHOT audit.
This test exercises:
1. Creating an audit via API
2. Decision engine picking up audit
3. Strategy execution
4. Action plan creation
5. Applier executing actions
"""
# Step 1: Create audit
audit_dict = {
'audit_type': 'ONESHOT',
'goal': 'dummy', # Use dummy strategy for testing
'name': 'workflow-test',
}
audit = self.api.create_audit(audit_dict)
audit_uuid = audit['uuid']
# Step 2: Wait for audit to complete
# Decision engine should pick it up and execute strategy
for i in range(50): # 5 seconds max
audit = self.api.get_audit(audit_uuid)
if audit['state'] in ('SUCCEEDED', 'FAILED', 'CANCELLED'):
break
time.sleep(0.1)
self.assertEqual('SUCCEEDED', audit['state'],
'Audit did not complete successfully')
# Step 3: Verify action plan was created
action_plans = self.api.list_action_plans(audit_uuid=audit_uuid)
self.assertGreater(len(action_plans), 0,
'No action plan created')
action_plan = action_plans[0]
self.assertIsNotNone(action_plan.get('uuid'))
# Step 4: Trigger action plan
self.api.start_action_plan(action_plan['uuid'])
# Step 5: Wait for action plan execution
for i in range(50):
action_plan = self.api.get_action_plan(action_plan['uuid'])
if action_plan['state'] in ('SUCCEEDED', 'FAILED', 'CANCELLED'):
break
time.sleep(0.1)
self.assertEqual('SUCCEEDED', action_plan['state'],
'Action plan did not execute successfully')
# Step 6: Verify notifications
# Should have: audit.create, audit.update (x2), action_plan.create, etc.
all_notifications = self.notifier.versioned_notifications
audit_notifications = [n for n in all_notifications
if n['event_type'].startswith('audit.')]
self.assertGreater(len(audit_notifications), 0)watcher/tests/functional/regressions/
├── __init__.py
├── README.rst
└── test_bug_example.py # Stub example
watcher/tests/functional/regressions/README.rst:
================================
Tests for Specific Regressions
================================
This directory contains regression tests for specific bugs reported in Launchpad.
Each test is designed to reproduce a bug and verify that it has been fixed.
Purpose
=======
Regression tests serve as long-term protection against bugs reoccurring. When a
significant bug is fixed, we create a functional test that:
1. Reproduces the exact conditions that triggered the bug
2. Verifies the bug is fixed
3. Prevents the bug from being reintroduced
These tests are MORE important than regular functional tests because they represent
real-world problems that affected users.
When to Create a Regression Test
=================================
Create a regression test when:
- A bug requires complex setup (multiple services, specific state)
- The bug involves interaction between multiple components
- The bug is non-obvious and could easily be reintroduced
- The bug caused significant user impact
Do NOT create regression tests for:
- Simple one-line fixes that are covered by unit tests
- Bugs in test code itself
- Documentation bugs
Writing Regression Tests
=========================
File Naming
-----------
Regression test files MUST be named: ``test_bug_<launchpad_id>.py``
Example: ``test_bug_1234567.py`` for bug #1234567
Class Naming
------------
Use a descriptive class name that explains what the bug was:
.. code-block:: python
# Good
class TestAuditFailsWithEmptyStrategy(base.WatcherFunctionalTestCase):
"""Regression test for bug #1234567."""
# Bad
class TestBug1234567(base.WatcherFunctionalTestCase):
"""Test for bug."""
Test Structure
--------------
Each regression test should have:
1. **Comprehensive docstring** explaining:
- What the bug was
- How to reproduce it
- What the fix was
- Why this test prevents regression
2. **Self-contained setup** with explicit fixtures
3. **Minimal inheritance** - inherit directly from base test class
4. **Clear test steps** with comments
Example Template
================
.. code-block:: python
"""Regression test for bug #1234567.
Description of what the bug was and how it manifested to users.
"""
from watcher.tests.functional import base
class TestDescriptiveName(base.WatcherFunctionalTestCase):
"""Regression test for bug #1234567.
Before the fix: Describe broken behavior
After the fix: Describe correct behavior
Root cause: Explain technical cause
The test verifies: What this test checks
"""
def setUp(self):
super(TestDescriptiveName, self).setUp()
# Explicit fixture setup
# Any special configuration
def test_specific_scenario(self):
"""Test the specific scenario that triggered the bug."""
# Step 1: Setup condition
# Step 2: Trigger bug scenario
# Step 3: Verify fix works
pass
Writing Tests Before the Bug is Fixed
======================================
When possible, write the test to demonstrate the bug BEFORE fixing it:
1. Write test that reproduces broken behavior
2. Assert the current (broken) behavior
3. Comment out expected (correct) assertions
4. Commit with "Related-Bug: #XXXXXX"
5. Fix the bug in production code
6. Update test: swap assertions (broken → commented, expected → active)
7. Commit with "Closes-Bug: #XXXXXX"
Example:
.. code-block:: python
def test_audit_with_empty_strategy(self):
"""Test audit doesn't fail with empty strategy."""
audit = self.api.create_audit({'goal': 'dummy'})
# BUG: Currently fails with 500 error
# This demonstrates the broken behavior:
response = self.api.get_audit(audit['uuid'])
self.assertEqual(500, response.status_code)
# EXPECTED (commented out until bug is fixed):
# response = self.api.get_audit(audit['uuid'])
# self.assertEqual(200, response.status_code)
# self.assertEqual('SUCCEEDED', response.json()['state'])
Then after the fix:
.. code-block:: python
def test_audit_with_empty_strategy(self):
"""Test audit doesn't fail with empty strategy."""
audit = self.api.create_audit({'goal': 'dummy'})
# BUG FIXED: Now returns success
# Old assertion (demonstrated broken behavior):
# response = self.api.get_audit(audit['uuid'])
# self.assertEqual(500, response.status_code)
# Correct behavior after fix:
response = self.api.get_audit(audit['uuid'])
self.assertEqual(200, response.status_code)
self.assertEqual('SUCCEEDED', response.json()['state'])
Stability Over Reuse
====================
Regression tests prioritize STABILITY over code reuse:
**Good:**
- Explicit fixture setup in setUp()
- Minimal inheritance (just base test class)
- Self-contained test methods
- Clear, verbose assertions
**Bad:**
- Deep inheritance from other test classes
- Hidden fixture dependencies
- Relying on helper methods that might change
- Terse, unclear assertions
The goal is that regression tests should continue to work even if other
test infrastructure changes significantly.
Example: Explicit Fixtures
---------------------------
.. code-block:: python
# Good - Explicit and stable
class TestBug123(base.WatcherFunctionalTestCase):
def setUp(self):
super(TestBug123, self).setUp()
# Clear what fixtures this test uses
self.useFixture(watcher_fixtures.NovaFixture(self))
self.useFixture(watcher_fixtures.GnocchiFixture(self))
# Bad - Hidden dependencies
class TestBug123(SomeOtherTestClass):
# What fixtures does SomeOtherTestClass set up?
# If it changes, this test breaks even though bug hasn't regressed
pass
Running Regression Tests
=========================
Run all regression tests:
.. code-block:: bash
tox -e functional -- watcher.tests.functional.regressions
Run specific regression test:
.. code-block:: bash
tox -e functional -- watcher.tests.functional.regressions.test_bug_1234567
With debug logging:
.. code-block:: bash
OS_DEBUG=1 tox -e functional -- watcher.tests.functional.regressions.test_bug_1234567watcher/tests/functional/regressions/test_bug_example.py:
"""Example regression test (stub).
This is a template showing how to write regression tests for Watcher.
Replace this with actual bug regression tests.
"""
from watcher.tests.functional import base
class TestExampleRegression(base.WatcherFunctionalTestCase):
"""Example regression test structure.
This stub demonstrates the structure of a regression test.
In a real regression test, you would:
1. Describe the bug in detail in the docstring
2. Explain how to reproduce it
3. Document the fix
4. Write test that verifies fix prevents regression
Example:
--------
Bug #1234567: Audit fails when goal has no strategies
Before the fix: Creating an audit with a goal that has no available
strategies would cause the decision engine to crash with an
unhandled exception.
After the fix: The audit completes with state FAILED and an
appropriate error message.
Root cause: The strategy selector didn't handle the case where
no strategies were available for a goal.
The test verifies: Creating an audit with an empty strategy list
results in FAILED state with proper error message, not a crash.
"""
def setUp(self):
super(TestExampleRegression, self).setUp()
# Explicit fixture setup
# For regression tests, make all fixtures explicit even if
# the base class provides them. This ensures long-term stability.
# Example: Override configuration for this specific test
self.flags(some_option='specific_value',
group='watcher_decision_engine')
def test_example_scenario(self):
"""Test the specific scenario that triggered the bug.
This is where you reproduce the exact conditions that caused
the bug and verify that it's fixed.
"""
# Step 1: Set up preconditions
# Create any necessary database records, etc.
# Step 2: Trigger the scenario that caused the bug
# E.g., create an audit, start an action plan, etc.
# Step 3: Verify the fix works
# Assert the correct behavior, not the broken behavior
# For demonstration purposes only:
self.skipTest("This is an example stub, not a real test")doc/source/contributor/functional-testing.rst:
==================
Functional Testing
==================
This guide explains how to write and run functional tests for Watcher,
including both Python-based and Gabbi (YAML-based) tests.
What Are Functional Tests?
===========================
Functional tests are integration tests that verify multiple components
working together with minimal mocking. They sit between unit tests and
full integration tests:
+----------------+------------------+-------------------+------------------+
| Test Type | Scope | Mocking | Speed |
+================+==================+===================+==================+
| Unit | Single function | Extensive mocking | Very fast |
| | or class | | |
+----------------+------------------+-------------------+------------------+
| Functional | Multiple | External services | Fast |
| | components | only | |
+----------------+------------------+-------------------+------------------+
| Integration | Complete system | Minimal or none | Slow |
+----------------+------------------+-------------------+------------------+
When to Write Functional Tests
===============================
Write functional tests when:
- Testing workflows spanning multiple components
- Verifying RPC interactions between services
- Testing database migrations with real data
- Reproducing complex bugs (regression tests)
- Validating API contracts with real WSGI application
When to Write Unit Tests Instead
=================================
Write unit tests when:
- Testing a single function or method
- Testing edge cases and error conditions
- Testing algorithmic logic
- Mock dependencies are simple and clear
Key Differences from Unit Tests
================================
Functional vs Unit Tests
-------------------------
+------------------+----------------------------+----------------------------+
| Aspect | Unit Tests | Functional Tests |
+==================+============================+============================+
| **Location** | ``watcher/tests/unit/`` | ``watcher/tests/functional/``|
+------------------+----------------------------+----------------------------+
| **Base Class** | ``watcher.tests.unit.base``| ``watcher.tests.functional``|
| | ``.WatcherTestCase`` | ``.base.WatcherFunctional`` |
| | | ``TestCase`` |
+------------------+----------------------------+----------------------------+
| **Mocking** | Extensive - mock | Minimal - only external |
| | everything except the | services (Nova, Gnocchi, |
| | code under test | etc.) |
+------------------+----------------------------+----------------------------+
| **Database** | Mocked or no database | Real SQLite in-memory |
+------------------+----------------------------+----------------------------+
| **RPC** | Mocked | Real oslo.messaging fake |
| | | driver |
+------------------+----------------------------+----------------------------+
| **API** | Mock API calls | Real Pecan WSGI app via |
| | | wsgi-intercept |
+------------------+----------------------------+----------------------------+
| **Services** | Not started | Can start decision engine, |
| | | applier services |
+------------------+----------------------------+----------------------------+
Writing Your First Functional Test
===================================
Basic Template
--------------
.. code-block:: python
"""Functional tests for audit operations."""
from watcher.tests.functional import base
class TestAuditOperations(base.WatcherFunctionalTestCase):
"""Test audit creation and execution."""
def test_create_audit(self):
"""Test creating an audit via API."""
# Create audit
audit = self.api.create_audit({
'audit_type': 'ONESHOT',
'goal': 'server_consolidation',
'name': 'test-audit',
})
# Verify
self.assertEqual('PENDING', audit['state'])
self.assertEqual('test-audit', audit['name'])
Test Structure
--------------
1. Import from ``watcher.tests.functional.base``
2. Inherit from ``WatcherFunctionalTestCase``
3. Use ``self.api`` or ``self.admin_api`` for API operations
4. Use ``self.context`` for direct database operations
5. Use fixtures (``self.nova``, ``self.gnocchi``) for external services
Available Test Fixtures
========================
The base functional test class provides these fixtures automatically:
API Clients
-----------
- ``self.api`` - Regular user API client
- ``self.admin_api`` - Admin user API client
Database
--------
- In-memory SQLite database with real Watcher schema
- Automatically cleaned up after each test
RPC
---
- ``oslo.messaging`` with ``fake://`` transport
- Synchronous for deterministic testing
External Services
-----------------
- ``self.nova`` - Nova API mock (``NovaFixture``)
- ``self.gnocchi`` - Gnocchi API mock (``GnocchiFixture``)
Notifications
-------------
- ``self.notifier`` - Notification capture fixture
Starting Services
-----------------
Set class attributes to auto-start services:
.. code-block:: python
class TestWithServices(base.WatcherFunctionalTestCase):
START_DECISION_ENGINE = True
START_APPLIER = True
Or start manually:
.. code-block:: python
def setUp(self):
super().setUp()
self.start_service('watcher-decision-engine')
Working with External Service Fixtures
=======================================
Nova Fixture
------------
The Nova fixture provides mocked Nova API operations:
.. code-block:: python
def test_with_nova_instances(self):
"""Test using Nova fixture."""
# Use pre-defined instances
instances = self.nova.get_instance_list()
self.assertEqual(2, len(instances))
# Add custom instance
custom_instance = {
'id': 'custom-id',
'name': 'custom-instance',
'status': 'ACTIVE',
'OS-EXT-SRV-ATTR:host': 'custom-host',
}
self.nova.add_instance(custom_instance)
# Simulate live migration
self.nova.live_migrate('instance-id', 'dest-host')
Gnocchi Fixture
---------------
The Gnocchi fixture provides mocked metrics:
.. code-block:: python
def test_with_gnocchi_metrics(self):
"""Test using Gnocchi fixture."""
# Get measures
measures = self.gnocchi.get_measures('cpu-metric-1')
self.assertGreater(len(measures), 0)
# Set custom measures
custom_measures = [
{'timestamp': '2025-10-07T10:00:00', 'value': 50.0},
{'timestamp': '2025-10-07T10:01:00', 'value': 55.0},
]
self.gnocchi.set_measures('custom-metric', custom_measures)
Verifying Notifications
========================
Use the notification fixture to verify events:
.. code-block:: python
def test_audit_notification(self):
"""Test that audit creation emits notification."""
# Perform operation
audit = self.api.create_audit({'goal': 'dummy'})
# Wait for notification
notifications = self.notifier.wait_for_versioned_notifications(
'audit.create', n_events=1, timeout=10.0)
# Verify
self.assertEqual(1, len(notifications))
self.assertEqual(audit['uuid'],
notifications[0]['payload']['uuid'])
Configuration Overrides
=======================
Override configuration for specific tests:
.. code-block:: python
def test_with_custom_config(self):
"""Test with custom configuration."""
# Override for this test only
self.flags(period_interval=60,
group='watcher_decision_engine')
# Configuration automatically restored after test
Running Functional Tests
=========================
Run All Functional Tests
-------------------------
.. code-block:: bash
tox -e functional
Run Specific Test Module
-------------------------
.. code-block:: bash
tox -e functional -- watcher.tests.functional.test_api_audits
Run Specific Test
-----------------
.. code-block:: bash
tox -e functional -- watcher.tests.functional.test_api_audits.TestAuditAPI.test_create_audit
With Debug Logging
------------------
.. code-block:: bash
OS_DEBUG=1 tox -e functional -- watcher.tests.functional.test_api_audits
Best Practices
==============
DO
--
- ✅ Test complete workflows
- ✅ Use real Watcher code (API, DB, RPC)
- ✅ Use fixtures for external services
- ✅ Verify notifications when appropriate
- ✅ Test both success and failure scenarios
- ✅ Add regression tests for complex bugs
DON'T
-----
- ❌ Mock Watcher's own code
- ❌ Test implementation details
- ❌ Write tests that depend on timing
- ❌ Leave resources uncleaned (fixtures handle this)
- ❌ Skip error cases
Example: Complete Workflow Test
================================
.. code-block:: python
"""Example of comprehensive workflow test."""
import time
from watcher.tests.functional import base
class TestCompleteWorkflow(base.WatcherFunctionalTestCase):
"""Test end-to-end audit workflow."""
START_DECISION_ENGINE = True
START_APPLIER = True
def test_audit_to_execution(self):
"""Test complete workflow from audit to action execution."""
# Step 1: Create audit
audit = self.api.create_audit({
'audit_type': 'ONESHOT',
'goal': 'dummy',
'name': 'workflow-test',
})
audit_uuid = audit['uuid']
# Step 2: Wait for audit to complete
for i in range(50):
audit = self.api.get_audit(audit_uuid)
if audit['state'] in ('SUCCEEDED', 'FAILED'):
break
time.sleep(0.1)
self.assertEqual('SUCCEEDED', audit['state'])
# Step 3: Get action plan
action_plans = self.api.list_action_plans(audit_uuid=audit_uuid)
self.assertGreater(len(action_plans), 0)
# Step 4: Execute action plan
action_plan = action_plans[0]
self.api.start_action_plan(action_plan['uuid'])
# Step 5: Wait for completion
for i in range(50):
action_plan = self.api.get_action_plan(action_plan['uuid'])
if action_plan['state'] in ('SUCCEEDED', 'FAILED'):
break
time.sleep(0.1)
self.assertEqual('SUCCEEDED', action_plan['state'])
# Step 6: Verify notifications
notifications = self.notifier.versioned_notifications
audit_events = [n for n in notifications
if n['event_type'].startswith('audit.')]
self.assertGreater(len(audit_events), 0)
Troubleshooting
===============
Test Hangs
----------
If a test hangs:
1. Check if you're waiting for a service that isn't started
2. Verify RPC ``CastAsCallFixture`` is enabled (it is by default)
3. Check for deadlocks in service interaction
Database Errors
---------------
If you see database errors:
1. Ensure ``USES_DB = True`` in your test class
2. Check that migrations are up to date
3. Verify schema cache isn't corrupted (delete and regenerate)
Import Errors
-------------
If fixtures can't be imported:
1. Check that you're importing from ``watcher.tests.local_fixtures``
2. Verify ``__init__.py`` exports the fixture
3. Ensure fixture file is in the correct location
Gabbi Tests (YAML-Based API Testing)
======================================
In addition to Python functional tests, Watcher supports Gabbi tests for
declarative API testing.
What Are Gabbi Tests?
---------------------
Gabbi tests are YAML-based HTTP API tests. They provide a simple, declarative
way to test REST APIs without writing Python code.
When to Use Gabbi Tests
------------------------
Use Gabbi tests for:
- **API behavior**: HTTP status codes, headers, response structure
- **Microversion testing**: Version-specific API behavior
- **Simple CRUD operations**: Create, read, update, delete endpoints
- **Error responses**: Invalid requests and error handling
Use Python functional tests for:
- **Complex workflows**: Multi-step operations spanning services
- **Asynchronous operations**: Waiting for state changes
- **Service integration**: Decision engine and applier interaction
- **Complex assertions**: Beyond HTTP response validation
Writing Gabbi Tests
-------------------
Create a YAML file in ``watcher/tests/functional/gabbits/``:
.. code-block:: yaml
# audit-create.yaml
fixtures:
- APIFixture
defaults:
request_headers:
x-auth-token: admin
accept: application/json
content-type: application/json
openstack-api-version: infra-optim 1.0
tests:
- name: create audit
POST: /v1/audits
data:
name: test-audit
audit_type: ONESHOT
goal: dummy
status: 201
response_headers:
location: //v1/audits/[a-f0-9-]+/
response_json_paths:
$.name: test-audit
$.state: PENDING
**YAML Structure:**
- ``fixtures``: List of fixture class names (from ``fixtures/gabbi.py``)
- ``defaults``: Default headers for all tests in file
- ``tests``: Sequential list of test cases
**Test Case Fields:**
- ``name``: Descriptive test name
- ``GET/POST/PUT/PATCH/DELETE``: HTTP method and URL
- ``request_headers``: Request headers (optional)
- ``data``: Request body as JSON (optional)
- ``status``: Expected HTTP status code
- ``response_headers``: Expected response headers (regex allowed)
- ``response_json_paths``: JSONPath assertions on response body
Using Environment Variables
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Fixtures set environment variables that tests can reference:
.. code-block:: yaml
tests:
- name: create audit with UUID
POST: /v1/audits
data:
name: $ENVIRON['AUDIT_NAME']
uuid: $ENVIRON['AUDIT_UUID']
audit_type: ONESHOT
goal: dummy
status: 201
Referencing Previous Tests
~~~~~~~~~~~~~~~~~~~~~~~~~~~
Tests can reference responses from earlier tests:
.. code-block:: yaml
tests:
- name: create audit
POST: /v1/audits
data:
name: test-audit
audit_type: ONESHOT
goal: dummy
status: 201
- name: get created audit
GET: $LOCATION # Uses Location header from previous test
response_json_paths:
# Reference previous test's response
$.uuid: $HISTORY['create audit'].$RESPONSE['$.uuid']
$.name: test-audit
Gabbi Fixtures
--------------
Gabbi fixtures extend ``gabbi.fixture.GabbiFixture`` and run once per YAML file.
**Available Fixtures:**
- ``APIFixture``: Base fixture, empty database
- ``AuditFixture``: Pre-creates goal, strategy, audit
- ``ActionPlanFixture``: Pre-creates goal, strategy, audit, action plan
**Example Using Pre-Created Data:**
.. code-block:: yaml
# action-plan.yaml
fixtures:
- ActionPlanFixture # Has pre-created action plan
tests:
- name: get action plan
GET: /v1/action_plans/$ENVIRON['ACTION_PLAN_UUID']
status: 200
response_json_paths:
$.uuid: $ENVIRON['ACTION_PLAN_UUID']
$.state: RECOMMENDED
Testing Microversions
----------------------
Test microversion-specific behavior:
.. code-block:: yaml
# microversions.yaml
tests:
- name: old version rejects new field
POST: /v1/audits
request_headers:
openstack-api-version: "infra-optim 1.4"
data:
audit_type: ONESHOT
goal: dummy
new_field: value # Added in 1.5
status: 400
- name: new version accepts new field
POST: /v1/audits
request_headers:
openstack-api-version: "infra-optim 1.5"
data:
audit_type: ONESHOT
goal: dummy
new_field: value
status: 201
Running Gabbi Tests
-------------------
Run all gabbi tests:
.. code-block:: bash
tox -e functional -- test_api_gabbi
Run specific YAML file:
.. code-block:: bash
tox -e functional -- test_api_gabbi.AuditLifecycleGabbits
With debug logging:
.. code-block:: bash
OS_DEBUG=1 tox -e functional -- test_api_gabbi
Gabbi Test Organization
-----------------------
**File Naming:**
- Use descriptive names: ``audit-lifecycle.yaml``, not ``test1.yaml``
- One file per API resource or concept
- Tests in a file run sequentially; files run in parallel
**Test Naming:**
- Use descriptive names that explain what is tested
- Good: ``"create audit with invalid goal returns 400"``
- Bad: ``"test 1"``
**File Organization:**
.. code-block:: text
watcher/tests/functional/gabbits/
├── basic-http.yaml # Basic API behavior
├── audit-lifecycle.yaml # Audit CRUD
├── action-plan.yaml # Action plan operations
├── microversions.yaml # Version testing
├── goal.yaml # Goal endpoints
├── strategy.yaml # Strategy endpoints
└── service.yaml # Service endpoints
Gabbi vs Python Tests
---------------------
**Use Gabbi when:**
- Testing single API endpoints
- Verifying HTTP status codes and headers
- Testing microversions
- Checking error responses
- Tests fit declarative format
**Use Python when:**
- Testing multi-step workflows
- Need to wait for async operations
- Complex state management required
- Service-to-service interaction
- Need full Python capabilities
Further Reading
===============
- :doc:`regression-testing` - Writing regression tests
- :doc:`/contributor/testing` - General testing guidelines
- `Gabbi Documentation <https://gabbi.readthedocs.io/>`_
- `OpenStack Functional Testing Guide <https://docs.openstack.org/nova/latest/contributor/testing/functional-tests.html>`_
- `Placement Gabbi Tests <https://opendev.org/openstack/placement/src/branch/master/placement/tests/functional/gabbits>`_Update tox.ini:
[tox]
minversion = 3.18.0
envlist = py3,functional,pep8
skipsdist = False
[testenv]
usedevelop = True
install_command = pip install -c{env:TOX_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/master} {opts} {packages}
setenv =
VIRTUAL_ENV={envdir}
LANGUAGE=en_US
LC_ALL=en_US.utf-8
OS_STDOUT_CAPTURE=1
OS_STDERR_CAPTURE=1
OS_TEST_TIMEOUT=160
PYTHONDONTWRITEBYTECODE=1
deps =
-r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
passenv =
OS_DEBUG
commands =
stestr run --test-path=./watcher/tests/unit {posargs}
[testenv:functional{,-py310,-py311,-py312}]
description =
Run functional tests for Watcher.
setenv =
{[testenv]setenv}
deps =
{[testenv]deps}
commands =
stestr run --test-path=./watcher/tests/functional {posargs}
stestr slowest
[testenv:functional-regression]
description =
Run regression tests only.
setenv =
{[testenv:functional]setenv}
deps =
{[testenv:functional]deps}
commands =
stestr run --test-path=./watcher/tests/functional/regressions {posargs}Update .zuul.yaml:
- job:
name: watcher-functional
parent: openstack-tox-functional-py312
description: |
Run functional tests for the Watcher project.
required-projects:
- openstack/watcher
irrelevant-files:
- ^.*\.rst$
- ^doc/.*$
- ^releasenotes/.*$
- ^watcher/locale/.*$
vars:
zuul_work_dir: src/opendev.org/openstack/watcher
tox_envlist: functional
timeout: 1800
- job:
name: watcher-functional-regression
parent: watcher-functional
description: |
Run regression tests for the Watcher project.
vars:
tox_envlist: functional-regression
- project:
check:
jobs:
- watcher-functional
gate:
jobs:
- watcher-functional
periodic:
jobs:
- watcher-functional-regressionGabbi is a YAML-based declarative HTTP testing framework used extensively in OpenStack, particularly in the Placement service. It provides a clean, readable way to test REST APIs without writing Python code.
- API Contract Testing: Verify API behavior, status codes, headers
- Microversion Testing: Test API version negotiation and version-specific behavior
- Declarative Syntax: Easy to read and write, even for non-Python developers
- Fast Execution: Uses wsgi-intercept for in-process HTTP calls
- Sequential Tests: Tests in a YAML file run in order, allowing state progression
- Proven Pattern: Widely used in OpenStack (Placement, Ironic, Cyborg)
| Aspect | Gabbi Tests | Python Functional Tests |
|---|---|---|
| Format | YAML | Python code |
| Use Case | API contracts, HTTP behavior | Complex workflows, integration |
| State Management | Environment variables, $HISTORY | Python objects, fixtures |
| Learning Curve | Low (declarative) | Medium (requires Python) |
| Expressiveness | Limited to HTTP assertions | Full Python capabilities |
| Best For | API behavior, microversions | Multi-service interactions |
┌─────────────────────────────────────────────────────┐
│ Gabbi YAML Test Files (gabbits/*.yaml) │
│ - Declarative HTTP test definitions │
│ - Environment variable substitution │
│ - Sequential test ordering within files │
└──────────────────────────┬──────────────────────────┘
│
┌──────────────────────────▼──────────────────────────┐
│ Test Loader (test_api_gabbi.py) │
│ - gabbi.driver.build_tests() │
│ - Discovers YAML files in gabbits/ │
│ - Creates Python test cases │
└──────────────────────────┬──────────────────────────┘
│
┌──────────────────────────▼──────────────────────────┐
│ Gabbi Fixture Layer (fixtures/gabbi.py) │
│ - APIFixture (base, no pre-created data) │
│ - AuditFixture (with goals, strategies) │
│ - ActionPlanFixture (with action plans) │
└──────────────────────────┬──────────────────────────┘
│
┌────────────────────┼────────────────────┐
│ │ │
┌─────▼──────┐ ┌─────────▼────────┐ ┌───────▼──────┐
│ Config │ │ Database │ │ Pecan App │
│ (noauth) │ │ (SQLite in-mem) │ │ (wsgi- │
│ │ │ │ │ intercept) │
└────────────┘ └──────────────────┘ └──────────────┘
Gabbi fixtures reuse components from Python functional tests:
Shared Components:
Databasefixture (from watcher.tests.fixtures)PolicyFixture(from watcher.tests.unit.policy_fixture)ConfFixture(oslo.config fixture)- Test data helpers (watcher.tests.db.utils)
Gabbi-Specific:
capture.Logging- Log capture for gabbi testscapture.WarningsFixture- Warning filteringgabbi.APIFixture- GabbiFixture base class- Environment variable setup
Why This Approach:
- ✅ Avoids duplication of database/config setup
- ✅ Ensures consistency between test types
- ✅ Leverages existing test data creation helpers
- ✅ Simplifies maintenance
watcher/tests/functional/
├── test_api_gabbi.py # Gabbi test loader (load_tests protocol)
├── test_api_audits.py # Python functional tests for audits
├── test_workflows.py # Python functional tests for workflows
├── fixtures/
│ ├── gabbi.py # Gabbi-specific fixtures (APIFixture, etc.)
│ └── capture.py # Logging/warning fixtures for gabbi
├── gabbits/ # Gabbi YAML test files
│ ├── basic-http.yaml
│ ├── audit-lifecycle.yaml
│ ├── action-plan.yaml
│ ├── microversions.yaml
│ ├── goal.yaml
│ ├── strategy.yaml
│ └── service.yaml
└── regressions/ # Python regression tests
└── test_bug_*.py
Organization Principles:
- Gabbi tests: One YAML file per API resource or concept
- Python tests: One file per major functional area
- Regressions: Bug-specific regression tests (Python only)
GabbiFixture (from gabbi package)
│
├── APIFixture (base)
│ - Database
│ - Config (noauth)
│ - Policy
│ - Environment variables (UUIDs)
│ - No pre-created data
│
├── AuditFixture (extends APIFixture)
│ - Pre-creates: goal, strategy, audit template, audit
│ - For tests that need existing audit data
│
└── ActionPlanFixture (extends AuditFixture)
- Pre-creates: action plan, actions
- For tests that need existing action plan data
Fixture Selection Guidelines:
| Test Scenario | Use Fixture | Reason |
|---|---|---|
| Create goal | APIFixture | No pre-data needed |
| Create audit | APIFixture | No pre-data needed |
| List audits | APIFixture or AuditFixture | Depends on test |
| Create action plan | AuditFixture | Needs existing audit |
| Start action plan | ActionPlanFixture | Needs existing plan |
| Microversion tests | APIFixture | Independent of data |
Challenge: Watcher uses Pecan, not a deploy.loadapp() pattern like Placement.
Solution: Custom setup_app() function:
def setup_app():
"""Create Pecan WSGI app for gabbi tests."""
from watcher.api import config as api_config
import pecan
# Load Pecan config
pecan_config = pecan.configuration.conf_from_dict(
api_config.PECAN_CONFIG)
app_conf = dict(pecan_config.app)
# Disable ACL for testing
app_conf['enable_acl'] = False
# Create app
app = pecan.make_app(
app_conf.pop('root'),
logging=getattr(pecan_config, 'logging', {}),
debug=True,
**app_conf
)
return appKey Points:
- Disables ACL (
enable_acl=False) - policy still enforced via PolicyFixture - Uses debug mode for better error messages
- Returns raw Pecan app (no additional middleware needed for tests)
gabbits/audit-lifecycle.yaml:
# Fixtures to use (from fixtures/gabbi.py)
fixtures:
- APIFixture
# Default headers for all tests
defaults:
request_headers:
x-auth-token: admin
accept: application/json
content-type: application/json
openstack-api-version: infra-optim 1.0
# Sequential tests (run in order)
tests:
- name: list audits empty
GET: /v1/audits
response_json_paths:
$.audits: []
- name: create audit
POST: /v1/audits
data:
name: $ENVIRON['AUDIT_NAME'] # Environment variable
audit_type: ONESHOT
goal: dummy
status: 201
response_headers:
location: //v1/audits/[a-f0-9-]+/ # Regex match
response_json_paths:
$.name: $ENVIRON['AUDIT_NAME']
$.state: PENDING
- name: get audit
GET: $LOCATION # Uses Location header from previous test
response_json_paths:
# Reference previous test response
$.uuid: $HISTORY['create audit'].$RESPONSE['$.uuid']
$.name: $ENVIRON['AUDIT_NAME']Key Features:
- Environment Variables:
$ENVIRON['AUDIT_NAME']- set by fixture - Response References:
$LOCATION- uses previous response Location header - History:
$HISTORY['test-name'].$RESPONSE['$.uuid']- reference prior test data - JSONPath:
$.audits[0].uuid- assert JSON structure - Regex:
/^[a-f0-9-]+$/- pattern matching
.stestr.conf update:
[DEFAULT]
test_path=./watcher/tests/unit
top_dir=./
# Gabbi test grouping
# Ensures tests from the same YAML file run in the same process
# (maintains test ordering within a file)
group_regex=watcher\.tests\.functional\.test_api_gabbi(?:\.|_)([^_]+)How It Works:
- Tests within a YAML file run sequentially (maintains order)
- Different YAML files run in parallel (for speed)
- Pattern extracts YAML filename from test name
- All tests with same capture group run together
Example:
Test name: watcher.tests.functional.test_api_gabbi.AuditLifecycleGabbits.test_001_list_audits_empty
Captures: AuditLifecycleGabbits (from audit-lifecycle.yaml)
All AuditLifecycleGabbits tests run in one process sequentially.
Gabbi tests can serve as executable API documentation:
- Clear test names: Describe what API does
- Complete examples: Show request/response structure
- Version tests: Document microversion behavior
- Error cases: Show error responses
Example:
tests:
- name: create audit with invalid goal returns 400
DESC: |
When creating an audit with a non-existent goal,
the API returns 400 Bad Request with a clear error message.
POST: /v1/audits
data:
audit_type: ONESHOT
goal: non-existent-goal
status: 400
response_json_paths:
$.errors[0].code: InvalidGoalNew section in doc/source/contributor/functional-testing.rst:
Gabbi Tests
===========
Gabbi tests are declarative YAML-based API tests. Use them for:
- API behavior verification
- Microversion testing
- HTTP status code checks
- Response header validation
Writing Gabbi Tests
-------------------
Create a YAML file in watcher/tests/functional/gabbits/:
.. code-block:: yaml
fixtures:
- APIFixture
defaults:
request_headers:
x-auth-token: admin
accept: application/json
tests:
- name: create audit
POST: /v1/audits
data:
audit_type: ONESHOT
goal: dummy
status: 201
Running Gabbi Tests
-------------------
.. code-block:: bash
# Run all gabbi tests
tox -e functional -- test_api_gabbi
# Run specific YAML file
tox -e functional -- test_api_gabbi.AuditLifecycleGabbitsGoal: Test that new API features are only available in appropriate microversions.
gabbits/microversions.yaml:
fixtures:
- APIFixture
tests:
- name: old microversion rejects new field
DESC: |
Feature X was added in microversion 1.5.
Requests with older versions should reject the new field.
POST: /v1/audits
request_headers:
openstack-api-version: "infra-optim 1.4"
data:
audit_type: ONESHOT
goal: dummy
new_field: value # Only valid in 1.5+
status: 400
response_json_paths:
$.errors[0].detail: /.*new_field.*not supported.*version 1.4/
- name: new microversion accepts new field
POST: /v1/audits
request_headers:
openstack-api-version: "infra-optim 1.5"
data:
audit_type: ONESHOT
goal: dummy
new_field: value # Valid in 1.5+
status: 201For Watcher Project:
- Faster API Testing: Declarative tests are quick to write
- Better Coverage: Easy to add tests for all API endpoints
- Microversion Validation: Explicit version behavior testing
- Documentation: Tests serve as API usage examples
- Reduced Maintenance: Less Python code to maintain
- OpenStack Alignment: Follows Placement/Ironic patterns
For Contributors:
- Low Barrier: No Python expertise needed for basic API tests
- Clear Intent: YAML format is self-documenting
- Quick Feedback: Fast test execution
- Easy Review: Diff shows exactly what API behavior changed
Complementary, Not Replacement:
| Test Type | Best For | Example |
|---|---|---|
| Gabbi | API contracts, single requests | "POST /v1/audits returns 201" |
| Python | Multi-step workflows | "Audit → Strategy → Action Plan → Execute" |
| Gabbi | Microversion behavior | "Field X only in version 1.5+" |
| Python | Service integration | "Decision engine creates action plan via RPC" |
| Gabbi | Error responses | "Invalid goal returns 400" |
| Python | Complex state | "Action plan retries on failure" |
Recommendation: Use both test types, selecting the best tool for each scenario.
| Fixture | Priority | Complexity | Notes |
|---|---|---|---|
| ConfFixture | P0 | Low | Configuration management |
| Database | P0 | Medium | SQLite with schema caching |
| RPCFixture | P0 | Low | oslo.messaging fake driver |
| CastAsCallFixture | P0 | Low | Synchronous RPC |
| NotificationFixture | P0 | Medium | Notification capture with threading |
| APIFixture | P0 | High | Pecan WSGI with wsgi-intercept |
| ServiceFixture | P0 | Medium | Start DE/Applier services |
| NovaFixture | P0 | Medium | Mock Nova API |
| GnocchiFixture | P0 | Medium | Mock Gnocchi API |
| PlacementFixture | P1 | High | Can reuse from placement project |
| CeilometerFixture | P2 | Low | Optional, legacy |
-
API Framework (Pecan)
- Watcher uses Pecan instead of custom WSGI
- Need to use
watcher.api.app.setup_app()to get WSGI app - Auth middleware may differ from Nova
-
Services
- Decision Engine - runs strategies, creates action plans
- Applier - executes actions
- Both need ServiceFixture support
-
Data Model
- Audit templates
- Audits (ONESHOT, CONTINUOUS)
- Goals
- Strategies
- Action plans
- Actions
-
External Dependencies
- Nova - primary data source for compute
- Gnocchi - metrics and aggregation
- Placement - resource providers (less critical than for Nova)
- Ceilometer - legacy metrics (optional)
| Phase | Duration | Commits | Description |
|---|---|---|---|
| Phase 0 | 1 week | 3 | Extract existing fixtures, create helpers |
| Phase 1 | 1 week | 1 | Test reorganization |
| Phase 2 | 2 weeks | 3-4 | Core fixtures (conf, db, rpc, notifications) |
| Phase 3 | 2 weeks | 2 | API and service fixtures |
| Phase 4 | 1 week | 1 | Base functional test class |
| Phase 5 | 1.5 weeks | 3 | Gabbi test infrastructure |
| Phase 6 | 2 weeks | 2 | Example Python functional tests |
| Phase 7 | 1 week | 1 | Regression test framework |
| Phase 8 | 1.5 weeks | 1 | Contributor documentation (including Gabbi) |
| Phase 9 | 1 week | 1 | CI integration |
| Total | 14 weeks | 18-19 commits |
M0: Preparation (Week 1) - Existing fixtures extracted, helpers created
M1: Foundation (Week 4) - Test reorganization + core fixtures complete
M2: Infrastructure (Week 8) - All fixtures and base class complete
M3: Gabbi Integration (Week 9.5) - Gabbi tests operational
M4: Validation (Week 11.5) - Example tests demonstrate functionality
M5: Production Ready (Week 14) - Docs complete, CI running
Phase 1: Test Reorganization
-
Run full unit test suite:
tox -e py3 -
Verify 100% of tests still pass
-
Check import paths are correct
Phase 2-3: Fixture Development
-
Write unit tests for each fixture
-
Test fixtures independently
-
Verify cleanup happens correctly
Phase 4: Base Test Class
- Create simple smoke test
- Verify all fixtures initialize
- Test service startup
Phase 5: Example Tests
- Run examples repeatedly (check for flakiness)
- Measure test execution time
- Validate notifications captured correctly
Phase 6-7: Documentation
- Review with contributors
- Test examples in documentation
- Verify README instructions
Phase 8: CI Integration
- Run in Zuul
- Check job timeout (should be < 30 minutes)
- Verify reporting works
- All existing unit tests pass after reorganization
- Functional tests complete in < 5 minutes locally
- Zero test flakiness (run 100 times, 100% pass rate)
- CI job completes in < 30 minutes
- Code coverage for functional tests > 60%
# Watcher's API app setup differs from Nova
from watcher.api import app as watcher_app
# Pecan configuration
app_conf = {
'app': {
'root': 'watcher.api.controllers.root.RootController',
'modules': ['watcher.api'],
'debug': True,
}
}
# Create WSGI app
app = watcher_app.setup_app(config=app_conf)# Decision Engine Manager
from watcher.decision_engine import manager as de_manager
decision_engine_mgr = de_manager.DecisionEngineManager()
# Applier Manager
from watcher.applier import manager as applier_manager
applier_mgr = applier_manager.ApplierManager()# Use SQLite iterdump for schema caching
def _cache_schema(connection):
schema_sql = "".join(line for line in connection.iterdump())
return schema_sql
# Apply cached schema
def _apply_cached_schema(connection, schema_sql):
connection.executescript(schema_sql)# Use standard threading, not eventlet (being removed from OpenStack)
import threading
import queue
# For services
thread = threading.Thread(target=service.start, daemon=True)
thread.start()
# For synchronization
condition = threading.Condition()
with condition:
condition.wait(timeout=10)- Review and Approve Plan: Get feedback from Watcher team
- Create Story/Tasks: Break down into Launchpad stories
- Begin Implementation: Start with Phase 1 (test reorganization)
- Iterative Review: Review each commit before proceeding
- Documentation Updates: Keep docs in sync with implementation
- Should we support Ceilometer fixture or focus only on Gnocchi?
- Do we need Placement fixture immediately or can it wait?
- Should functional tests run in gate (check/gate) or only periodic?
- What's the minimum test coverage threshold for functional tests?
- Should we add performance/benchmark tests alongside functional tests?
End of Planning Document
This plan provides a comprehensive roadmap for introducing functional testing to Watcher using proven patterns from Nova, adapted for Watcher's unique architecture and requirements.