Skip to content

Instantly share code, notes, and snippets.

@SeanMooney
Created October 7, 2025 12:02
Show Gist options
  • Select an option

  • Save SeanMooney/43afa55282d2286a312eae7f3c7709e2 to your computer and use it in GitHub Desktop.

Select an option

Save SeanMooney/43afa55282d2286a312eae7f3c7709e2 to your computer and use it in GitHub Desktop.
A comprehensive, multi-phase approach to introduce functional testing to the Watcher project

Watcher Functional Test Infrastructure Plan

Version: 1.0
Date: October 2025
Status: Planning Document

Executive Summary

This document outlines a comprehensive, multi-phase approach to introduce functional testing to the Watcher project, adapting proven patterns from OpenStack Nova and Placement while accounting for Watcher's unique architecture. The plan includes test reorganization, fixture development, base test infrastructure, Gabbi-based declarative API testing, and contributor documentation.

The functional test suite will support two complementary testing approaches:

  1. Python Functional Tests: For complex workflows, service integration, and multi-step scenarios
  2. Gabbi Tests: For declarative API contract testing, microversion validation, and HTTP behavior

Both approaches will reuse common fixtures (database, configuration, policy) to ensure consistency and minimize duplication.

Table of Contents

  1. Introduction
  2. Current State Analysis
  3. Goals and Objectives
  4. Watcher vs Nova: Key Differences
  5. Multi-Phase Implementation Plan
  6. Gabbi Test Integration
  7. Fixture Requirements Analysis
  8. Base Test Class Design
  9. Regression Test Infrastructure
  10. Contributor Documentation
  11. Testing Strategy
  12. Timeline and Milestones

Introduction

What are Functional Tests?

Functional tests sit between unit tests and integration tests:

  • Unit Tests: Test individual functions/classes in isolation with extensive mocking
  • Functional Tests: Test multiple components working together with minimal mocking
  • Integration Tests: Test the complete system with real external dependencies

Why Functional Tests for Watcher?

  1. Catch integration bugs that unit tests miss
  2. Test real workflows (audit → strategy → action plan → applier)
  3. Validate RPC interactions between services
  4. Test database migrations with real data
  5. Verify API contracts with real WSGI application
  6. Regression protection for complex bugs

Key Principle: Minimize Mocks, Maximize Reality

In functional tests:

  • ✅ Use real Watcher code (API, decision engine, applier)
  • ✅ Use real database operations (SQLite in-memory)
  • ✅ Use real RPC messaging (oslo.messaging fake driver)
  • ✅ Use real WSGI application (wsgi-intercept)
  • ❌ Mock external services (Nova, Glance, Ceilometer, Gnocchi, Placement)
  • ❌ Mock network I/O to external systems

Current State Analysis

Existing Test Structure

watcher/tests/
├── __init__.py
├── base.py                    # Base test class (TestCase, BaseTestCase)
├── conf_fixture.py            # ConfFixture, ConfReloadFixture  
├── policy_fixture.py          # PolicyFixture
├── fakes.py                   # Fake objects (FakePecanRequest, FakeService, etc.)
├── fixtures/
│   ├── __init__.py
│   └── watcher.py            # StandardLogging, KeystoneClient
├── config.py                  # Pecan test app config
├── db/
│   ├── base.py               # Database fixture and DbTestCase
│   └── utils.py              # Test data creation helpers
├── objects/
│   └── utils.py              # Object creation helpers
├── api/                       # API tests
├── common/                    # Common utility tests
├── decision_engine/           # Decision engine tests
├── applier/                   # Applier tests
└── ... (various test modules)

Existing Fixtures and Helpers

Already Available:

  1. watcher/tests/fixtures/watcher.py:

    • StandardLogging - Logging setup with OS_DEBUG support
    • KeystoneClient - Mock Keystone client
  2. watcher/tests/conf_fixture.py:

    • ConfFixture - Configuration management with SQLite defaults
    • ConfReloadFixture - Configuration reloading support
  3. watcher/tests/policy_fixture.py:

    • PolicyFixture - Policy enforcement testing
  4. watcher/tests/db/base.py:

    • Database - Database fixture with schema caching
    • DbTestCase - Base class for DB tests
  5. watcher/tests/db/utils.py:

    • get_test_goal(), create_test_goal()
    • get_test_audit(), create_test_audit()
    • get_test_audit_template(), create_test_audit_template()
    • get_test_action_plan(), create_test_action_plan()
    • get_test_action(), create_test_action()
    • get_test_strategy(), create_test_strategy()
    • get_test_service(), create_test_service()
    • Plus many more helpers for all Watcher objects
  6. watcher/tests/objects/utils.py:

    • Object-level test data creation helpers (similar to db/utils.py)
  7. watcher/tests/fakes.py:

    • FakePecanRequest, FakePecanResponse
    • FakeService
    • FakeResponse for HTTP mocking
  8. watcher/tests/base.py:

    • BaseTestCase - Base with StandardLogging
    • TestCase - Full unit test setup with context, policy, config

Current Test Infrastructure

Strengths:

  • ✅ Comprehensive unit test coverage
  • ✅ Excellent test data creation helpers (db/utils.py, objects/utils.py)
  • ✅ Proper logging fixture with OS_DEBUG support
  • ✅ Configuration fixtures already in place
  • ✅ Database fixture with schema caching
  • ✅ Policy testing support
  • ✅ Good test organization by module
  • ✅ Fake objects for common test scenarios

Gaps:

  • ❌ All tests are unit tests (extensive mocking)
  • ❌ No functional test infrastructure
  • ❌ No fixtures for external services (Nova, Gnocchi)
  • ❌ No RPC fixture (messaging already configured but no capture)
  • ❌ No notification capture fixture
  • ❌ No API fixture for functional testing (wsgi-intercept)
  • ❌ No service fixtures for starting DE/Applier
  • ❌ No regression test framework
  • ❌ Limited end-to-end testing

Key Insight: Leverage Existing Code

Important: Watcher already has excellent infrastructure that we can reuse:

  • Database fixture exists and works well
  • Test data helpers are comprehensive
  • Configuration management is solid
  • Logging setup is production-ready

Strategy: Extract and enhance existing code rather than rebuild from scratch.

Dependencies Analysis

Oslo Libraries (already in use):

  • oslo.config - Configuration management
  • oslo.db - Database abstraction
  • oslo.messaging - RPC and notifications
  • oslo.log - Logging
  • oslo.policy - Policy enforcement

External Services (need fixtures):

  • Nova - Compute operations (get instances, migrate, etc.)
  • Glance - Image service (rarely used directly)
  • Ceilometer - Metrics collection (legacy, optional)
  • Gnocchi - Metrics storage and aggregation
  • Placement - Resource provider inventory
  • Neutron - Network information (optional)
  • Cinder - Volume information (optional)

API Framework:

  • Pecan/WSGI - Different from Nova's approach
  • oslo.policy for RBAC
  • Uses API paste configuration

Goals and Objectives

Primary Goals

  1. Reorganize tests to support both unit and functional testing
  2. Create fixture infrastructure for external services
  3. Build base test classes for functional tests
  4. Integrate Gabbi tests for declarative API testing
  5. Establish regression test framework with documentation
  6. Document functional testing practices for contributors

Success Criteria

  • Tests reorganized into unit/ and functional/ subdirectories
  • Functional base test class with all required fixtures
  • At least 5 fixtures for external services
  • Gabbi test infrastructure operational with example YAML tests
  • At least 5 Gabbi YAML test files covering major API endpoints
  • At least 3 end-to-end Python functional tests
  • Regression test framework with README and example
  • Contributor documentation explaining functional vs unit tests and Gabbi tests
  • CI jobs running both Python and Gabbi functional tests
  • All existing tests still pass after reorganization

Reusable Components Strategy

OpenStack Style Guidelines

Important: Following OpenStack conventions:

  1. Keep __init__.py files empty - No __all__ exports per OpenStack style
  2. Explicit imports - Import directly: from watcher.tests.fixtures import database
  3. Shared fixtures - Design fixtures to work in both unit and functional tests where appropriate
  4. Minimal mocking in base classes - Only mock what's truly necessary for all tests

Extracting and Enhancing Existing Fixtures

Watcher already has excellent test infrastructure that should be extracted and enhanced for reuse. This is covered in the new Phase 0 below.

Shared vs Specialized Fixtures

Shared (both unit + functional):

  • StandardLogging - Both need proper logging with OS_DEBUG support
  • Database - Both use SQLite in-memory (unit tests can use it too)
  • ConfFixture / ConfReloadFixture - Both need configuration management
  • PolicyFixture - Both test policy enforcement

Functional-only:

  • RPCFixture - Unit tests mock RPC calls directly with mock.patch
  • NotificationFixture - Unit tests verify notifications are called, not content
  • NovaFixture, GnocchiFixture - Unit tests mock client methods directly
  • APIFixture - Unit tests use Pecan's test client, not full WSGI
  • ServiceFixture - Unit tests don't start actual services

Key Principle: Unit tests mock external dependencies directly at call sites. Functional tests use fixtures that implement limited but realistic behavior.


Watcher vs Nova: Key Differences

Architectural Differences

Aspect Nova Watcher
Database API DB + multiple cell DBs Single database
Services compute, conductor, scheduler watcher-api, watcher-decision-engine, watcher-applier
API Framework Custom WSGI + Paste Pecan + Paste
Primary Function VM lifecycle management Infrastructure optimization
External Deps Glance, Neutron, Cinder, Placement Nova, Gnocchi/Ceilometer, Placement
Concurrency Eventlet (being removed) Eventlet (will need threading)

Fixture Requirements Comparison

Nova Needs:

  • PlacementFixture (resource providers)
  • CinderFixture (volumes)
  • NeutronFixture (networks, ports)
  • GlanceFixture (images)

Watcher Needs:

  • PlacementFixture (resource providers, same as Nova)
  • NovaFixture (instances, hosts, compute services)
  • GnocchiFixture (metrics, measures, aggregation)
  • CeilometerFixture (legacy metrics, optional)

API Testing Differences

Nova:

# Uses custom OSAPIFixture with wsgi-intercept
app = nova.api.openstack.compute.APIRouterV21()

Watcher:

# Uses Pecan framework
from watcher.api import app as pecan_app
app = pecan_app.setup_app()

Multi-Phase Implementation Plan

Phase 0: Extract and Enhance Existing Fixtures (3 commits - NEW)

Goal: Extract reusable fixtures before reorganization, clean up imports, create helper functions.

Commit 0.1: Move Database fixture to fixtures/

Extract watcher/tests/db/base.py:Database to watcher/tests/fixtures/database.py:

Changes:

  1. Create watcher/tests/fixtures/database.py with enhanced Database fixture
  2. Update watcher/tests/db/base.py to import from new location
  3. Keep __init__.py files empty (OpenStack style - no __all__)

Note: Database fixture should work for both unit and functional tests, so place in shared location.

Commit 0.2: Create test helpers module

Create watcher/tests/helpers.py with common helper functions that work in both unit and functional tests.

These helpers wrap existing db/utils.py and objects/utils.py functions with:

  • Better defaults
  • Simplified APIs
  • Automatic cleanup registration (for functional tests)
  • Wait helpers for async operations

watcher/tests/helpers.py:

"""Helper functions for Watcher tests.

These helpers work in both unit and functional tests.

Usage:
    from watcher.tests import helpers
    
    # In unit test
    audit = helpers.create_test_audit(self.context)
    
    # In functional test
    audit = helpers.create_test_audit(self.context, name='my-audit')
    result = helpers.wait_for_audit_state(
        self.api, audit.uuid, 'SUCCEEDED')
"""

import time

from oslo_utils import uuidutils

from watcher import objects
from watcher.tests.db import utils as db_utils


def create_test_audit(context, **kwargs):
    """Create a test audit with sensible defaults.
    
    Args:
        context: Request context
        **kwargs: Override any audit fields
    
    Returns:
        Audit object
    """
    defaults = {
        'uuid': uuidutils.generate_uuid(),
        'name': 'test-audit',
        'audit_type': 'ONESHOT',
        'state': 'PENDING',
        'goal_id': kwargs.pop('goal_id', 1),
    }
    defaults.update(kwargs)
    
    audit = db_utils.create_test_audit(**defaults)
    return objects.Audit.get_by_uuid(context, audit.uuid)


def create_test_action_plan(context, **kwargs):
    """Create a test action plan.
    
    Args:
        context: Request context
        **kwargs: Override any action plan fields.
                  If audit_id not provided, creates an audit.
    
    Returns:
        ActionPlan object
    """
    defaults = {
        'uuid': uuidutils.generate_uuid(),
        'state': 'RECOMMENDED',
    }
    
    # Auto-create audit if needed
    if 'audit_id' not in kwargs:
        audit = create_test_audit(context)
        defaults['audit_id'] = audit.id
    
    defaults.update(kwargs)
    
    action_plan = db_utils.create_test_action_plan(**defaults)
    return objects.ActionPlan.get_by_uuid(context, action_plan.uuid)


def wait_for_audit_state(api_client, audit_uuid, expected_state,
                         timeout=30, fail_states=None):
    """Wait for audit to reach expected state.
    
    For functional tests that need to wait for async operations.
    
    Args:
        api_client: API client instance (e.g., self.api)
        audit_uuid: UUID of audit to monitor
        expected_state: State to wait for (e.g., 'SUCCEEDED')
        timeout: Maximum time to wait in seconds
        fail_states: States indicating failure (default: FAILED, CANCELLED)
    
    Returns:
        Final audit dict
    
    Raises:
        AssertionError: If timeout or failure state reached
    """
    if fail_states is None:
        fail_states = ['FAILED', 'CANCELLED']
    
    end_time = time.time() + timeout
    while time.time() < end_time:
        audit = api_client.get_audit(audit_uuid)
        
        if audit['state'] == expected_state:
            return audit
        
        if audit['state'] in fail_states:
            raise AssertionError(
                'Audit %s failed with state: %s' %
                (audit_uuid, audit['state']))
        
        time.sleep(0.1)
    
    raise AssertionError(
        'Timeout waiting for audit %s to reach %s' %
        (audit_uuid, expected_state))


def wait_for_action_plan_state(api_client, action_plan_uuid, expected_state,
                               timeout=30, fail_states=None):
    """Wait for action plan to reach expected state.
    
    Args:
        api_client: API client instance
        action_plan_uuid: UUID of action plan
        expected_state: State to wait for
        timeout: Maximum wait time
        fail_states: Failure states
    
    Returns:
        Final action plan dict
    """
    if fail_states is None:
        fail_states = ['FAILED', 'CANCELLED']
    
    end_time = time.time() + timeout
    while time.time() < end_time:
        ap = api_client.get_action_plan(action_plan_uuid)
        
        if ap['state'] == expected_state:
            return ap
        
        if ap['state'] in fail_states:
            raise AssertionError(
                'Action plan %s failed with state: %s' %
                (action_plan_uuid, ap['state']))
        
        time.sleep(0.1)
    
    raise AssertionError(
        'Timeout waiting for action plan %s to reach %s' %
        (action_plan_uuid, expected_state))

Commit 0.3: Cleanup existing fixture organization

  1. Keep watcher/tests/fixtures/__init__.py empty (no exports)
  2. Document fixture usage with module-level docstrings
  3. Ensure all existing fixtures follow OpenStack patterns
  4. Update a few unit tests to validate helpers work

Validation:

  • Run full unit test suite: tox -e py3
  • Verify no imports broke
  • Check that helpers work correctly

Phase 1: Test Reorganization (1 commit)

Goal: Reorganize existing tests without breaking anything.

Changes:

OLD:                              NEW:
watcher/tests/                    watcher/tests/
├── base.py                       ├── __init__.py
├── common/                       ├── unit/              # All existing tests
├── decision_engine/              │   ├── __init__.py
├── applier/                      │   ├── base.py        # Moved from tests/
├── api/                          │   ├── common/
├── db/                           │   ├── decision_engine/
└── ...                           │   ├── applier/
                                  │   ├── api/
                                  │   ├── db/
                                  │   └── ...
                                  ├── local_fixtures/    # New (empty for now)
                                  │   └── __init__.py
                                  └── functional/        # New (empty for now)
                                      └── __init__.py

Implementation Steps:

  1. Create new directory structure:
mkdir -p watcher/tests/unit
mkdir -p watcher/tests/local_fixtures
mkdir -p watcher/tests/functional
  1. Move all existing tests:
# Move base.py
git mv watcher/tests/base.py watcher/tests/unit/base.py

# Move all test modules
for dir in watcher/tests/*/; do
    if [[ "$dir" != *"__pycache__"* ]]; then
        git mv "$dir" watcher/tests/unit/
    fi
done
  1. Update imports in moved test files:
# OLD: from watcher.tests import base
# NEW: from watcher.tests.unit import base
  1. Update tox.ini to handle new structure:
[testenv]
# Run unit tests by default
commands =
    stestr run --test-path=./watcher/tests/unit {posargs}

[testenv:functional]
# New environment for functional tests
commands =
    stestr run --test-path=./watcher/tests/functional {posargs}
  1. Create init.py files with proper exports

  2. Run full test suite to ensure nothing breaks

Commit Message:

Reorganize tests to support functional testing

Move all existing tests from watcher/tests/ to watcher/tests/unit/
to make room for functional tests. Create empty functional/ and
local_fixtures/ directories for future work.

This is a pure reorganization commit with no functional changes.
All existing tests should continue to pass.

Related-Bug: #XXXXXXX

Phase 2: Base Fixture Infrastructure (3-4 commits)

Goal: Create core fixtures needed by all functional tests.

Commit 2.1: Configuration and Database Fixtures

Files to Create:

watcher/tests/local_fixtures/__init__.py:

"""Local fixtures for Watcher functional tests.

Named 'local_fixtures' to avoid conflicts with the 'fixtures' package.

Per OpenStack style, this file is kept empty. Import fixtures directly:
    from watcher.tests.local_fixtures import database
    from watcher.tests.local_fixtures import rpc
"""

# Empty per OpenStack convention - import fixtures directly from their modules

watcher/tests/local_fixtures/conf.py:

"""Configuration fixture for Watcher tests."""

from oslo_config import cfg
from oslo_config import fixture as config_fixture

from watcher import conf as watcher_conf


CONF = cfg.CONF


class ConfFixture(config_fixture.Config):
    """Fixture to manage global conf settings for tests."""
    
    def setUp(self):
        super(ConfFixture, self).setUp()
        
        # Default group
        self.conf.set_default('debug', True)
        self.conf.set_default('host', 'test-host')
        
        # Database group
        self.conf.set_default('connection', 'sqlite://', group='database')
        self.conf.set_default('sqlite_synchronous', False, group='database')
        
        # API group
        self.conf.set_default('host', '127.0.0.1', group='api')
        self.conf.set_default('port', 9322, group='api')
        self.conf.set_default('max_limit', 1000, group='api')
        
        # Watcher-specific settings
        self.conf.set_default('conductor_topic', 'watcher.decision.control',
                              group='watcher_decision_engine')
        self.conf.set_default('conductor_topic', 'watcher.applier.control',
                              group='watcher_applier')
        
        # Disable periodic tasks in tests
        self.conf.set_default('periodic_interval', 0,
                              group='watcher_decision_engine')
        
        # Parse config
        watcher_conf.parse_args([], default_config_files=[], configure_db=False,
                                init_rpc=False)

watcher/tests/local_fixtures/database.py:

"""Database fixture for Watcher tests."""

import fixtures
from oslo_config import cfg
from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import test_fixtures as db_fixtures

from watcher.db.sqlalchemy import api as db_api
from watcher.db.sqlalchemy import migration


CONF = cfg.CONF
DB_SCHEMA = {}  # Schema cache for speed


class Database(fixtures.Fixture):
    """Create a database fixture with SQLite.
    
    Uses in-memory SQLite with schema caching for fast test execution.
    """
    
    def setUp(self):
        super(Database, self).setUp()
        
        # Create new enginefacade for this test
        new_engine = enginefacade.transaction_context()
        
        # Replace global context manager
        self.useFixture(
            db_fixtures.ReplaceEngineFacadeFixture(
                db_api.get_context_manager(), new_engine))
        
        # Configure database
        db_api.configure(CONF)
        
        self.get_engine = db_api.get_engine
        self._apply_schema()
        self.addCleanup(self.cleanup)
    
    def _apply_schema(self):
        """Apply database schema (cached for speed)."""
        global DB_SCHEMA
        
        if not DB_SCHEMA:
            # First test: run migrations and cache
            engine = self.get_engine()
            conn = engine.connect()
            
            # Run migrations
            migration.upgrade('head')
            
            # Cache schema
            DB_SCHEMA['schema'] = "".join(
                line for line in conn.connection.iterdump())
            conn.close()
        else:
            # Subsequent tests: use cached schema
            engine = self.get_engine()
            conn = engine.connect()
            conn.connection.executescript(DB_SCHEMA['schema'])
            conn.close()
    
    def cleanup(self):
        """Dispose engine."""
        engine = self.get_engine()
        engine.dispose()

Commit Message:

Add configuration and database fixtures for functional tests

Introduce ConfFixture and Database fixtures as foundation for
functional testing. These fixtures provide:

- Test-specific configuration with sensible defaults
- In-memory SQLite database with schema caching
- Automatic cleanup after each test

The Database fixture caches the schema after the first test runs
migrations, making subsequent tests much faster.

Part of functional test infrastructure implementation.

Related-Bug: #XXXXXXX

Commit 2.2: RPC and Notification Fixtures

watcher/tests/local_fixtures/rpc.py:

"""RPC fixture for Watcher tests."""

import fixtures
from oslo_config import cfg
import oslo_messaging as messaging
from oslo_messaging import conffixture as messaging_conffixture
from unittest import mock

from watcher.common import rpc


CONF = cfg.CONF


class RPCFixture(fixtures.Fixture):
    """Set up RPC with fake:// transport for testing."""
    
    def __init__(self, *exmods):
        super(RPCFixture, self).__init__()
        self.exmods = list(exmods)
        self._buses = {}
    
    def _fake_create_transport(self, url):
        """Create or return cached fake transport."""
        # Collapse all connections to single bus for testing
        url = None
        
        if url not in self._buses:
            self._buses[url] = messaging.get_rpc_transport(
                CONF, url=url,
                allowed_remote_exmods=rpc.get_allowed_exmods())
        return self._buses[url]
    
    def setUp(self):
        super(RPCFixture, self).setUp()
        self.addCleanup(rpc.cleanup)
        
        # Configure fake transport
        self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
        self.messaging_conf.transport_url = 'fake:/'
        self.useFixture(self.messaging_conf)
        
        # Patch transport creation
        self.useFixture(fixtures.MonkeyPatch(
            'watcher.common.rpc.create_transport',
            self._fake_create_transport))
        
        # Initialize RPC
        with mock.patch('watcher.common.rpc.get_transport_url') as mock_gtu:
            mock_gtu.return_value = None
            rpc.init(CONF)
        
        # Cleanup in-flight messages between tests
        def cleanup_rpc_messages():
            if hasattr(messaging._drivers, 'impl_fake'):
                messaging._drivers.impl_fake.FakeExchangeManager._exchanges = {}
        
        self.addCleanup(cleanup_rpc_messages)


class CastAsCallFixture(fixtures.Fixture):
    """Make RPC casts behave as calls for synchronous testing.
    
    This converts fire-and-forget casts into synchronous calls,
    making tests deterministic.
    """
    
    def setUp(self):
        super(CastAsCallFixture, self).setUp()
        
        # Stub out cast to use call instead
        self.useFixture(fixtures.MonkeyPatch(
            'oslo_messaging.RPCClient.cast',
            messaging.RPCClient.call))

watcher/tests/local_fixtures/notifications.py:

"""Notification fixture for Watcher tests."""

import collections
import functools
import threading

import fixtures
from oslo_log import log as logging
import oslo_messaging
from oslo_serialization import jsonutils
from oslo_utils import timeutils

from watcher.common import rpc


LOG = logging.getLogger(__name__)


class _Sub(object):
    """Subscription helper for waiting on notifications."""
    
    def __init__(self):
        self._cond = threading.Condition()
        self._notifications = []
    
    def received(self, notification):
        with self._cond:
            self._notifications.append(notification)
            self._cond.notify_all()
    
    def wait_n(self, n, event_type, timeout):
        """Wait until at least n notifications received."""
        with timeutils.StopWatch(timeout) as timer:
            with self._cond:
                while len(self._notifications) < n:
                    if timer.expired():
                        raise AssertionError(
                            "Notification %s not received within %s seconds" %
                            (event_type, timeout))
                    self._cond.wait(timer.leftover())
                
                return list(self._notifications)


class FakeVersionedNotifier(object):
    """Captures versioned notifications for verification."""
    
    def __init__(self, transport, publisher_id, serializer=None):
        self.transport = transport
        self.publisher_id = publisher_id
        self._serializer = serializer or \
            oslo_messaging.serializer.NoOpSerializer()
        self.versioned_notifications = []
        self.subscriptions = collections.defaultdict(_Sub)
        
        # Create notification methods
        for priority in ['debug', 'info', 'warn', 'error', 'critical']:
            setattr(self, priority,
                    functools.partial(self._notify, priority.upper()))
    
    def prepare(self, publisher_id=None):
        if publisher_id is None:
            publisher_id = self.publisher_id
        return self.__class__(self.transport, publisher_id,
                             serializer=self._serializer)
    
    def _notify(self, priority, ctxt, event_type, payload):
        """Capture notification."""
        payload = self._serializer.serialize_entity(ctxt, payload)
        jsonutils.to_primitive(payload)
        
        notification = {
            'publisher_id': self.publisher_id,
            'priority': priority,
            'event_type': event_type,
            'payload': payload,
        }
        self.versioned_notifications.append(notification)
        self.subscriptions[event_type].received(notification)
    
    def wait_for_versioned_notifications(self, event_type, n_events=1,
                                        timeout=10.0):
        """Wait for notifications with timeout."""
        return self.subscriptions[event_type].wait_n(
            n_events, event_type, timeout)
    
    def reset(self):
        self.versioned_notifications.clear()
        self.subscriptions.clear()


class NotificationFixture(fixtures.Fixture):
    """Fixture to capture oslo.messaging notifications."""
    
    def __init__(self, test):
        self.test = test
    
    def setUp(self):
        super(NotificationFixture, self).setUp()
        self.addCleanup(self.reset)
        
        # Create fake notifier
        self.fake_versioned_notifier = FakeVersionedNotifier(
            rpc.NOTIFIER.transport,
            rpc.NOTIFIER.publisher_id,
            serializer=getattr(rpc.NOTIFIER, '_serializer', None))
        
        # Stub out global notifier
        self.test.useFixture(fixtures.MonkeyPatch(
            'watcher.common.rpc.NOTIFIER',
            self.fake_versioned_notifier))
    
    def reset(self):
        self.fake_versioned_notifier.reset()
    
    def wait_for_versioned_notifications(self, event_type, n_events=1,
                                        timeout=10.0):
        return self.fake_versioned_notifier.wait_for_versioned_notifications(
            event_type, n_events, timeout)
    
    @property
    def versioned_notifications(self):
        return self.fake_versioned_notifier.versioned_notifications

Commit 2.3: External Service Fixtures (Nova)

watcher/tests/local_fixtures/nova.py:

"""Nova fixture for Watcher tests."""

import copy
import fixtures
from oslo_utils import uuidutils

from watcher.tests.local_fixtures import conf as conf_fixture


class NovaFixture(fixtures.Fixture):
    """Mock Nova API for Watcher tests.
    
    Provides stateful mocking of Nova compute operations including:
    - Listing compute nodes and services
    - Getting instance details
    - Live migration operations
    - Cold migration operations
    - Instance actions
    """
    
    # Pre-defined test instances
    INSTANCE_1 = {
        'id': '73b09e16-35b7-4922-804e-e8f5d9b740fc',
        'name': 'instance-1',
        'status': 'ACTIVE',
        'OS-EXT-SRV-ATTR:host': 'compute-1',
        'OS-EXT-SRV-ATTR:hypervisor_hostname': 'compute-1',
        'flavor': {'id': '1', 'name': 'm1.small'},
        'tenant_id': 'test-project',
    }
    
    INSTANCE_2 = {
        'id': 'cef19ce0-0ca2-11e6-a747-00012c99e920',
        'name': 'instance-2',
        'status': 'ACTIVE',
        'OS-EXT-SRV-ATTR:host': 'compute-2',
        'OS-EXT-SRV-ATTR:hypervisor_hostname': 'compute-2',
        'flavor': {'id': '2', 'name': 'm1.medium'},
        'tenant_id': 'test-project',
    }
    
    # Pre-defined compute nodes
    COMPUTE_NODE_1 = {
        'id': 1,
        'hypervisor_hostname': 'compute-1',
        'state': 'up',
        'status': 'enabled',
        'vcpus': 16,
        'vcpus_used': 2,
        'memory_mb': 32768,
        'memory_mb_used': 4096,
        'local_gb': 500,
        'local_gb_used': 50,
    }
    
    COMPUTE_NODE_2 = {
        'id': 2,
        'hypervisor_hostname': 'compute-2',
        'state': 'up',
        'status': 'enabled',
        'vcpus': 16,
        'vcpus_used': 4,
        'memory_mb': 32768,
        'memory_mb_used': 8192,
        'local_gb': 500,
        'local_gb_used': 100,
    }
    
    def __init__(self, test):
        super(NovaFixture, self).__init__()
        self.test = test
        self._instances = {}
        self._compute_nodes = {}
        self._services = {}
    
    def setUp(self):
        super(NovaFixture, self).setUp()
        
        # Add default instances and compute nodes
        self._instances[self.INSTANCE_1['id']] = copy.deepcopy(self.INSTANCE_1)
        self._instances[self.INSTANCE_2['id']] = copy.deepcopy(self.INSTANCE_2)
        self._compute_nodes[self.COMPUTE_NODE_1['id']] = copy.deepcopy(self.COMPUTE_NODE_1)
        self._compute_nodes[self.COMPUTE_NODE_2['id']] = copy.deepcopy(self.COMPUTE_NODE_2)
        
        # Mock nova client
        self.test.useFixture(fixtures.MonkeyPatch(
            'watcher.common.clients.nova.NovaClient.get_compute_node_list',
            self.get_compute_node_list))
        self.test.useFixture(fixtures.MonkeyPatch(
            'watcher.common.clients.nova.NovaClient.get_instance_list',
            self.get_instance_list))
        self.test.useFixture(fixtures.MonkeyPatch(
            'watcher.common.clients.nova.NovaClient.get_instance',
            self.get_instance))
        self.test.useFixture(fixtures.MonkeyPatch(
            'watcher.common.clients.nova.NovaClient.live_migrate',
            self.live_migrate))
    
    def get_compute_node_list(self):
        """Mock getting compute node list."""
        return list(self._compute_nodes.values())
    
    def get_instance_list(self):
        """Mock getting instance list."""
        return list(self._instances.values())
    
    def get_instance(self, instance_id):
        """Mock getting single instance."""
        if instance_id not in self._instances:
            raise Exception('InstanceNotFound: %s' % instance_id)
        return copy.deepcopy(self._instances[instance_id])
    
    def live_migrate(self, instance_id, dest_hostname, block_migration=False):
        """Mock live migration."""
        if instance_id not in self._instances:
            raise Exception('InstanceNotFound: %s' % instance_id)
        
        instance = self._instances[instance_id]
        instance['OS-EXT-SRV-ATTR:host'] = dest_hostname
        instance['OS-EXT-SRV-ATTR:hypervisor_hostname'] = dest_hostname
        return True
    
    def add_instance(self, instance_dict):
        """Add a custom instance to the fixture."""
        instance_id = instance_dict.get('id') or uuidutils.generate_uuid()
        instance_dict['id'] = instance_id
        self._instances[instance_id] = instance_dict
        return instance_dict
    
    def add_compute_node(self, node_dict):
        """Add a custom compute node to the fixture."""
        node_id = node_dict.get('id') or len(self._compute_nodes) + 1
        node_dict['id'] = node_id
        self._compute_nodes[node_id] = node_dict
        return node_dict

Commit 2.4: External Service Fixtures (Gnocchi)

watcher/tests/local_fixtures/gnocchi.py:

"""Gnocchi fixture for Watcher tests."""

import copy
import fixtures
from oslo_utils import uuidutils


class GnocchiFixture(fixtures.Fixture):
    """Mock Gnocchi API for Watcher tests.
    
    Provides stateful mocking of Gnocchi metric operations:
    - Resource listing and details
    - Metric measures
    - Aggregation operations
    """
    
    # Pre-defined resources (instances)
    RESOURCE_INSTANCE_1 = {
        'id': '73b09e16-35b7-4922-804e-e8f5d9b740fc',
        'type': 'instance',
        'project_id': 'test-project',
        'host': 'compute-1',
        'metrics': {
            'cpu_util': 'cpu-metric-1',
            'memory.usage': 'memory-metric-1',
        }
    }
    
    # Pre-defined measures
    CPU_MEASURES = [
        {'timestamp': '2025-10-07T10:00:00', 'value': 25.5},
        {'timestamp': '2025-10-07T10:01:00', 'value': 30.2},
        {'timestamp': '2025-10-07T10:02:00', 'value': 28.7},
    ]
    
    MEMORY_MEASURES = [
        {'timestamp': '2025-10-07T10:00:00', 'value': 2048},
        {'timestamp': '2025-10-07T10:01:00', 'value': 2100},
        {'timestamp': '2025-10-07T10:02:00', 'value': 2050},
    ]
    
    def __init__(self, test):
        super(GnocchiFixture, self).__init__()
        self.test = test
        self._resources = {}
        self._measures = {}
    
    def setUp(self):
        super(GnocchiFixture, self).setUp()
        
        # Add default resources
        res_id = self.RESOURCE_INSTANCE_1['id']
        self._resources[res_id] = copy.deepcopy(self.RESOURCE_INSTANCE_1)
        
        # Add default measures
        self._measures['cpu-metric-1'] = copy.deepcopy(self.CPU_MEASURES)
        self._measures['memory-metric-1'] = copy.deepcopy(self.MEMORY_MEASURES)
        
        # Mock gnocchi client
        self.test.useFixture(fixtures.MonkeyPatch(
            'watcher.common.clients.gnocchi.GnocchiClient.get_resources',
            self.get_resources))
        self.test.useFixture(fixtures.MonkeyPatch(
            'watcher.common.clients.gnocchi.GnocchiClient.get_measures',
            self.get_measures))
        self.test.useFixture(fixtures.MonkeyPatch(
            'watcher.common.clients.gnocchi.GnocchiClient.aggregate_measures',
            self.aggregate_measures))
    
    def get_resources(self, resource_type='instance', details=True):
        """Mock getting resources."""
        resources = [r for r in self._resources.values()
                    if r['type'] == resource_type]
        return resources
    
    def get_measures(self, metric_id, start=None, stop=None, aggregation='mean'):
        """Mock getting metric measures."""
        if metric_id not in self._measures:
            return []
        return copy.deepcopy(self._measures[metric_id])
    
    def aggregate_measures(self, metrics, aggregation='mean', start=None, stop=None):
        """Mock aggregating measures."""
        # Simple average for testing
        all_values = []
        for metric_id in metrics:
            if metric_id in self._measures:
                all_values.extend([m['value'] for m in self._measures[metric_id]])
        
        if not all_values:
            return []
        
        avg_value = sum(all_values) / len(all_values)
        return [{'timestamp': '2025-10-07T10:00:00', 'value': avg_value}]
    
    def add_resource(self, resource_dict):
        """Add custom resource to fixture."""
        res_id = resource_dict.get('id') or uuidutils.generate_uuid()
        resource_dict['id'] = res_id
        self._resources[res_id] = resource_dict
        return resource_dict
    
    def set_measures(self, metric_id, measures):
        """Set measures for a metric."""
        self._measures[metric_id] = copy.deepcopy(measures)

Phase 3: API and Service Fixtures (2 commits)

Commit 3.1: API Fixture for Pecan

watcher/tests/local_fixtures/api.py:

"""API fixture for Watcher tests."""

import fixtures
from oslo_config import cfg
from oslo_utils.fixture import uuidsentinel
from wsgi_intercept import interceptor

from watcher.api import app as watcher_app
from watcher.tests.local_fixtures import conf as conf_fixture
from watcher.tests.functional.api import client


CONF = cfg.CONF


class APIFixture(fixtures.Fixture):
    """Create a Watcher API server as a fixture.
    
    Runs real Pecan WSGI application using wsgi-intercept.
    """
    
    def __init__(self, api_version='v1'):
        super(APIFixture, self).__init__()
        self.api_version = api_version
    
    def setUp(self):
        super(APIFixture, self).setUp()
        
        # Unique hostname for wsgi-intercept
        hostname = str(uuidsentinel.watcher_api_host)
        port = 9322
        endpoint = 'http://%s:%s/' % (hostname, port)
        
        # Set debug mode
        self.useFixture(conf_fixture.ConfPatcher(debug=True))
        
        # Disable auth for testing
        self.useFixture(conf_fixture.ConfPatcher(
            auth_strategy='noauth',
            group='api'))
        
        # Load Pecan WSGI app
        app_conf = {
            'app': {
                'root': 'watcher.api.controllers.root.RootController',
                'modules': ['watcher.api'],
            }
        }
        app = watcher_app.setup_app(config=app_conf)
        
        # Install wsgi-intercept
        intercept = interceptor.RequestsInterceptor(
            lambda: app, url=endpoint)
        intercept.install_intercept()
        self.addCleanup(intercept.uninstall_intercept)
        
        # Create API clients
        base_url = 'http://%(host)s:%(port)s/%(version)s' % {
            'host': hostname,
            'port': port,
            'version': self.api_version
        }
        
        self.api = client.WatcherApiClient('user', base_url,
                                          project_id='test-project')
        self.admin_api = client.WatcherApiClient('admin', base_url,
                                                 project_id='test-project',
                                                 is_admin=True)

watcher/tests/functional/api/client.py:

"""API client for Watcher functional tests."""

import requests
from oslo_serialization import jsonutils


class WatcherApiClient(object):
    """Simple HTTP client for Watcher API."""
    
    def __init__(self, user_id, base_url, project_id='test-project',
                 is_admin=False):
        self.user_id = user_id
        self.project_id = project_id
        self.is_admin = is_admin
        self.base_url = base_url
    
    def _request(self, method, url, body=None, **kwargs):
        """Make HTTP request."""
        headers = {
            'X-Auth-Token': 'fake-token',
            'X-User-Id': self.user_id,
            'X-Project-Id': self.project_id,
            'X-Roles': 'admin' if self.is_admin else 'member',
            'Content-Type': 'application/json',
            'Accept': 'application/json',
        }
        headers.update(kwargs.get('headers', {}))
        
        full_url = self.base_url + url
        
        if method == 'GET':
            response = requests.get(full_url, headers=headers)
        elif method == 'POST':
            response = requests.post(full_url, json=body, headers=headers)
        elif method == 'PATCH':
            response = requests.patch(full_url, json=body, headers=headers)
        elif method == 'DELETE':
            response = requests.delete(full_url, headers=headers)
        else:
            raise ValueError('Unsupported method: %s' % method)
        
        return response
    
    def get(self, url, **kwargs):
        """GET request."""
        return self._request('GET', url, **kwargs)
    
    def post(self, url, body, **kwargs):
        """POST request."""
        return self._request('POST', url, body=body, **kwargs)
    
    def patch(self, url, body, **kwargs):
        """PATCH request."""
        return self._request('PATCH', url, body=body, **kwargs)
    
    def delete(self, url, **kwargs):
        """DELETE request."""
        return self._request('DELETE', url, **kwargs)
    
    # Helper methods for common operations
    
    def create_audit(self, audit_dict):
        """Create an audit."""
        response = self.post('/audits', audit_dict)
        response.raise_for_status()
        return response.json()
    
    def get_audit(self, audit_uuid):
        """Get audit details."""
        response = self.get('/audits/%s' % audit_uuid)
        response.raise_for_status()
        return response.json()
    
    def list_audits(self, **filters):
        """List audits."""
        response = self.get('/audits', params=filters)
        response.raise_for_status()
        return response.json()['audits']
    
    def delete_audit(self, audit_uuid):
        """Delete an audit."""
        response = self.delete('/audits/%s' % audit_uuid)
        response.raise_for_status()
    
    def get_action_plan(self, action_plan_uuid):
        """Get action plan details."""
        response = self.get('/action_plans/%s' % action_plan_uuid)
        response.raise_for_status()
        return response.json()
    
    def list_action_plans(self, **filters):
        """List action plans."""
        response = self.get('/action_plans', params=filters)
        response.raise_for_status()
        return response.json()['action_plans']
    
    def start_action_plan(self, action_plan_uuid):
        """Start action plan execution."""
        body = [{'op': 'replace', 'path': '/state', 'value': 'TRIGGERED'}]
        response = self.patch('/action_plans/%s' % action_plan_uuid, body)
        response.raise_for_status()
        return response.json()

Commit 3.2: Service Fixture

watcher/tests/local_fixtures/service.py:

"""Service fixture for Watcher tests."""

import fixtures
import threading
from unittest import mock

from watcher.common import context


class ServiceFixture(fixtures.Fixture):
    """Run a Watcher service as a test fixture.
    
    Starts services in background threads for functional testing.
    """
    
    def __init__(self, binary, host=None, **kwargs):
        """Initialize service fixture.
        
        Args:
            binary: Service binary name (watcher-decision-engine, watcher-applier)
            host: Host name for the service
            **kwargs: Additional service arguments
        """
        super(ServiceFixture, self).__init__()
        self.binary = binary
        self.host = host or 'test-host'
        self.kwargs = kwargs
    
    def setUp(self):
        super(ServiceFixture, self).setUp()
        
        # Create admin context
        self.ctxt = context.make_context(is_admin=True)
        
        # Mock context creation
        mock_ctx = mock.MagicMock(return_value=self.ctxt)
        self.useFixture(fixtures.MonkeyPatch(
            'watcher.common.context.make_context',
            mock_ctx))
        
        # Import and start the appropriate service
        if self.binary == 'watcher-decision-engine':
            from watcher.decision_engine import manager as de_manager
            self.manager = de_manager.DecisionEngineManager()
        elif self.binary == 'watcher-applier':
            from watcher.applier import manager as applier_manager
            self.manager = applier_manager.ApplierManager()
        else:
            raise ValueError('Unknown binary: %s' % self.binary)
        
        # Start in background thread
        self.thread = threading.Thread(
            target=self._run_service,
            daemon=True)
        self.thread.start()
        
        self.addCleanup(self._cleanup)
    
    def _run_service(self):
        """Run service in thread."""
        try:
            # Service managers typically have a run() or start() method
            if hasattr(self.manager, 'run'):
                self.manager.run()
            elif hasattr(self.manager, 'start'):
                self.manager.start()
        except Exception:
            # Expected when service is stopped
            pass
    
    def _cleanup(self):
        """Stop service and join thread."""
        if hasattr(self.manager, 'stop'):
            self.manager.stop()
        # Give thread time to finish
        self.thread.join(timeout=5)

Phase 4: Base Functional Test Class (1 commit)

watcher/tests/functional/base.py:

"""Base classes for Watcher functional tests."""

import fixtures
from oslo_config import cfg
from oslo_log import log as logging
from oslotest import base

from watcher.common import context
from watcher.tests import local_fixtures as watcher_fixtures


CONF = cfg.CONF
LOG = logging.getLogger(__name__)


class WatcherFunctionalTestCase(base.BaseTestCase):
    """Base class for Watcher functional tests.
    
    Provides:
    - Database (SQLite in-memory)
    - RPC (oslo.messaging fake driver)
    - Notifications
    - API server
    - External service mocks (Nova, Gnocchi, etc.)
    
    Functional tests should inherit from this class and can selectively
    override service setup if needed.
    """
    
    # Class attributes
    USES_DB = True
    STUB_RPC = True
    START_DECISION_ENGINE = False
    START_APPLIER = False
    
    def setUp(self):
        super(WatcherFunctionalTestCase, self).setUp()
        
        # Configuration
        self.useFixture(watcher_fixtures.ConfFixture(CONF))
        
        # Database
        if self.USES_DB:
            self.useFixture(watcher_fixtures.Database())
        
        # RPC and notifications
        if self.STUB_RPC:
            self.useFixture(watcher_fixtures.RPCFixture())
            # Use fake notification driver
            CONF.set_default('driver', ['test'],
                            group='oslo_messaging_notifications')
            # Make RPC casts synchronous for deterministic tests
            self.useFixture(watcher_fixtures.CastAsCallFixture())
        
        # Notification capture
        self.notifier = self.useFixture(
            watcher_fixtures.NotificationFixture(self))
        
        # External services
        self.nova = self.useFixture(watcher_fixtures.NovaFixture(self))
        self.gnocchi = self.useFixture(watcher_fixtures.GnocchiFixture(self))
        # Placement fixture can be added when needed:
        # self.placement = self.useFixture(PlacementFixture())
        
        # API
        self.api_fixture = self.useFixture(watcher_fixtures.APIFixture())
        self.api = self.api_fixture.api
        self.admin_api = self.api_fixture.admin_api
        
        # Start services if requested
        if self.START_DECISION_ENGINE:
            self.start_service('watcher-decision-engine')
        if self.START_APPLIER:
            self.start_service('watcher-applier')
        
        # Create admin context for tests
        self.context = context.make_context(is_admin=True)
    
    def flags(self, **kw):
        """Override flag variables for a test.
        
        Example:
            self.flags(periodic_interval=10,
                      group='watcher_decision_engine')
        """
        group = kw.pop('group', None)
        for k, v in kw.items():
            CONF.set_override(k, v, group)
            self.addCleanup(CONF.clear_override, k, group)
    
    def start_service(self, binary, host=None, **kwargs):
        """Start a Watcher service.
        
        Args:
            binary: Service name (watcher-decision-engine, watcher-applier)
            host: Host name for the service
            **kwargs: Additional arguments for service
        
        Returns:
            ServiceFixture instance
        """
        if host is not None:
            self.flags(host=host)
        
        svc = self.useFixture(
            watcher_fixtures.ServiceFixture(binary, host, **kwargs))
        return svc

Phase 5: Gabbi Test Infrastructure (3 commits)

Goal: Add Gabbi-based declarative API testing framework.

Gabbi tests provide declarative YAML-based HTTP testing, ideal for API behavior and microversion testing. They complement Python-based functional tests by focusing on API contracts.

Commit 5.1: Gabbi Test Loader and Base Fixture

Files to Create:

watcher/tests/functional/test_api_gabbi.py:

"""Gabbi test loader for Watcher API tests.

Loads declarative YAML tests from the gabbits/ directory.
"""

import os

from gabbi import driver
from oslotest import output
import wsgi_intercept

from watcher.tests.functional.fixtures import capture
from watcher.tests.functional.fixtures import gabbi as gabbi_fixtures


# Enforce strict response headers (native str)
wsgi_intercept.STRICT_RESPONSE_HEADERS = True

# Directory containing YAML test files
TESTS_DIR = 'gabbits'


def load_tests(loader, tests, pattern):
    """Provide TestSuite to the discovery process.
    
    This is the standard Python unittest load_tests protocol.
    Called by test runners (stestr, unittest discover).
    
    :param loader: unittest.TestLoader
    :param tests: Existing TestSuite (ignored)
    :param pattern: Pattern for test discovery (ignored)
    :returns: TestSuite containing Gabbi tests
    """
    test_dir = os.path.join(os.path.dirname(__file__), TESTS_DIR)
    
    # Per-test fixtures (clean output/logging per test)
    inner_fixtures = [
        output.CaptureOutput,
        capture.Logging,
    ]
    
    # Build test suite from YAML files
    return driver.build_tests(
        test_dir,                           # Directory with YAML files
        loader,                             # unittest.TestLoader
        host=None,                          # No real host (wsgi-intercept)
        test_loader_name=__name__,          # Module name for test naming
        intercept=gabbi_fixtures.setup_app, # App factory function
        inner_fixtures=inner_fixtures,      # Per-test fixtures
        fixture_module=gabbi_fixtures       # Module with GabbiFixture classes
    )

watcher/tests/functional/fixtures/gabbi.py:

"""Gabbi fixtures for Watcher API testing.

Provides GabbiFixture classes for declarative YAML-based API tests.
"""

import os

from gabbi import fixture
from oslo_config import cfg
from oslo_config import fixture as config_fixture
from oslo_log.fixture import logging_error
from oslo_policy import opts as policy_opts
from oslo_utils import uuidutils
from oslotest import output

from watcher.api import app as watcher_app
from watcher.common import context
from watcher import conf as watcher_conf
from watcher.tests import fixtures as db_fixtures
from watcher.tests.functional.fixtures import capture
from watcher.tests.unit import policy_fixture


# Global CONF for setup_app workaround
# (gabbi requires app factory to be zero-argument function)
CONF = None


def setup_app():
    """App factory for gabbi.
    
    Called by gabbi to get the WSGI application under test.
    Uses wsgi-intercept to route HTTP calls to in-process app.
    """
    global CONF
    
    # Create Pecan WSGI application
    # Note: Watcher uses custom Pecan config, not deploy.loadapp like Placement
    from watcher.api import config as api_config
    import pecan
    
    pecan_config = pecan.configuration.conf_from_dict(api_config.PECAN_CONFIG)
    app_conf = dict(pecan_config.app)
    
    # Disable auth for testing
    app_conf['enable_acl'] = False
    
    app = pecan.make_app(
        app_conf.pop('root'),
        logging=getattr(pecan_config, 'logging', {}),
        debug=True,
        **app_conf
    )
    
    return app


class APIFixture(fixture.GabbiFixture):
    """Base fixture for Watcher Gabbi tests.
    
    Sets up:
    - Configuration
    - Database (SQLite in-memory)
    - Policy
    - Logging and output capture
    - Environment variables for test data
    
    This fixture runs once per YAML file (start_fixture before first test,
    stop_fixture after last test).
    """
    
    def start_fixture(self):
        """Called once before any tests in a YAML file run."""
        global CONF
        
        # Set up logging and output capture
        self.standard_logging_fixture = capture.Logging()
        self.standard_logging_fixture.setUp()
        self.output_stream_fixture = output.CaptureOutput()
        self.output_stream_fixture.setUp()
        self.logging_error_fixture = (
            logging_error.get_logging_handle_error_fixture())
        self.logging_error_fixture.setUp()
        self.warnings_fixture = capture.WarningsFixture()
        self.warnings_fixture.setUp()
        
        # Create isolated config (don't use global CONF)
        self.conf_fixture = config_fixture.Config(cfg.ConfigOpts())
        self.conf_fixture.setUp()
        watcher_conf.register_opts(self.conf_fixture.conf)
        
        # Configure API with no auth
        self.conf_fixture.config(group='api', auth_strategy='noauth')
        self.conf_fixture.config(group='api', host='127.0.0.1')
        self.conf_fixture.config(group='api', port=9322)
        
        # Configure policy (no scope enforcement for tests)
        policy_opts.set_defaults(self.conf_fixture.conf)
        self.conf_fixture.config(
            group='oslo_policy',
            enforce_scope=False,
            enforce_new_defaults=False,
        )
        
        # Set up database
        self.placement_db_fixture = db_fixtures.Database(
            self.conf_fixture, set_config=True)
        self.placement_db_fixture.setUp()
        
        # Create context for fixture data creation
        self.context = context.make_context(is_admin=True)
        
        # Empty config files list (don't read /etc/watcher/watcher.conf)
        self.conf_fixture.conf([], default_config_files=[])
        
        # Set up policy fixture
        self.policy_fixture = policy_fixture.PolicyFixture(
            self.conf_fixture)
        self.policy_fixture.setUp()
        
        # Set up environment variables for use in YAML tests
        # These are substituted into test data via $ENVIRON['VAR_NAME']
        self._setup_environment_variables()
        
        # Store config globally for setup_app()
        CONF = self.conf_fixture.conf
    
    def _setup_environment_variables(self):
        """Set environment variables for YAML test data."""
        # Audit related
        os.environ['AUDIT_UUID'] = uuidutils.generate_uuid()
        os.environ['AUDIT_UUID_2'] = uuidutils.generate_uuid()
        os.environ['AUDIT_NAME'] = 'test-audit-%s' % uuidutils.generate_uuid()[:8]
        
        # Action plan related
        os.environ['ACTION_PLAN_UUID'] = uuidutils.generate_uuid()
        os.environ['ACTION_UUID'] = uuidutils.generate_uuid()
        
        # Strategy and goal
        os.environ['STRATEGY_UUID'] = uuidutils.generate_uuid()
        os.environ['GOAL_UUID'] = uuidutils.generate_uuid()
        os.environ['GOAL_NAME'] = 'test-goal'
        
        # Service
        os.environ['SERVICE_NAME'] = 'watcher-decision-engine'
        os.environ['SERVICE_HOST'] = 'test-host'
        
        # Project and user
        os.environ['PROJECT_ID'] = uuidutils.generate_uuid()
        os.environ['USER_ID'] = uuidutils.generate_uuid()
        
        # Compute resources (for strategies)
        os.environ['INSTANCE_UUID'] = uuidutils.generate_uuid()
        os.environ['COMPUTE_NODE_UUID'] = uuidutils.generate_uuid()
        os.environ['SRC_NODE'] = 'compute-1'
        os.environ['DEST_NODE'] = 'compute-2'
    
    def stop_fixture(self):
        """Called after all tests in a YAML file complete."""
        global CONF
        
        # Clean up all fixtures in reverse order
        self.placement_db_fixture.cleanUp()
        self.policy_fixture.cleanUp()
        self.warnings_fixture.cleanUp()
        self.logging_error_fixture.cleanUp()
        self.output_stream_fixture.cleanUp()
        self.standard_logging_fixture.cleanUp()
        self.conf_fixture.cleanUp()
        
        CONF = None

watcher/tests/functional/fixtures/capture.py:

"""Fixtures for capturing logs and filtering warnings.

Similar to Placement's capture fixtures, adapted for Watcher.
"""

import logging
import warnings

import fixtures
from oslo_log.fixture import logging_error as log_fixture
from oslotest import log
from sqlalchemy import exc as sqla_exc


class NullHandler(logging.Handler):
    """Custom NullHandler that formats records.
    
    Used to detect formatting errors in debug logs even when
    logs aren't captured.
    """
    
    def handle(self, record):
        self.format(record)
    
    def emit(self, record):
        pass
    
    def createLock(self):
        self.lock = None


class Logging(log.ConfigureLogging):
    """Logging fixture for tests.
    
    - Captures logs for later inspection
    - Ensures DEBUG logs are formatted even if not captured
    """
    
    def __init__(self):
        super(Logging, self).__init__()
        # Default to INFO if not otherwise set
        if self.level is None:
            self.level = logging.INFO
    
    def setUp(self):
        super(Logging, self).setUp()
        if self.level > logging.DEBUG:
            handler = NullHandler()
            self.useFixture(fixtures.LogHandler(handler, nuke_handlers=False))
            handler.setLevel(logging.DEBUG)


class WarningsFixture(fixtures.Fixture):
    """Filter or escalate certain warnings during test runs.
    
    Add additional entries as required. Remove when obsolete.
    """
    
    def setUp(self):
        super(WarningsFixture, self).setUp()
        
        self._original_warning_filters = warnings.filters[:]
        
        warnings.simplefilter("once", DeprecationWarning)
        
        # Ignore policy scope warnings (new RBAC system)
        warnings.filterwarnings(
            'ignore',
            message="Policy .* failed scope check",
            category=UserWarning)
        
        # Escalate invalid UUID warnings to errors
        warnings.filterwarnings('error', message=".*invalid UUID.*")
        
        # Prevent introducing unmapped columns
        warnings.filterwarnings(
            'error',
            category=sqla_exc.SAWarning)
        
        # Configure SQLAlchemy warnings
        warnings.filterwarnings(
            'ignore',
            category=sqla_exc.SADeprecationWarning)
        
        warnings.filterwarnings(
            'error',
            module='watcher',
            category=sqla_exc.SADeprecationWarning)
        
        self.addCleanup(self._reset_warning_filters)
    
    def _reset_warning_filters(self):
        warnings.filters[:] = self._original_warning_filters

Commit 5.2: Example Gabbi YAML Tests

Files to Create:

Create watcher/tests/functional/gabbits/ directory with example YAML tests:

watcher/tests/functional/gabbits/basic-http.yaml:

# Basic HTTP behavior tests for Watcher API

fixtures:
    - APIFixture

defaults:
    request_headers:
        x-auth-token: admin
        accept: application/json
        openstack-api-version: infra-optim 1.0

tests:
- name: 404 at unknown endpoint
  GET: /barnabas
  status: 404

- name: 200 at API root
  GET: /
  status: 200
  response_json_paths:
      $.versions[0].id: v1

- name: 200 at v1 root
  GET: /v1
  status: 200
  response_json_paths:
      $.id: v1
      $.media_types[0].base: application/json

watcher/tests/functional/gabbits/audit-lifecycle.yaml:

# Audit lifecycle API tests

fixtures:
    - APIFixture

defaults:
    request_headers:
        x-auth-token: admin
        accept: application/json
        content-type: application/json
        openstack-api-version: infra-optim 1.0

tests:
- name: list audits empty
  GET: /v1/audits
  response_json_paths:
      $.audits: []

- name: create audit
  POST: /v1/audits
  data:
      name: $ENVIRON['AUDIT_NAME']
      audit_type: ONESHOT
      goal: dummy
  status: 201
  response_headers:
      location: //v1/audits/[a-f0-9-]+/
  response_json_paths:
      $.uuid: /^[a-f0-9-]+$/
      $.name: $ENVIRON['AUDIT_NAME']
      $.audit_type: ONESHOT
      $.state: PENDING

- name: get audit
  GET: $LOCATION
  response_json_paths:
      $.uuid: $HISTORY['create audit'].$RESPONSE['$.uuid']
      $.name: $ENVIRON['AUDIT_NAME']
      $.state: PENDING
      $.goal_uuid: /^[a-f0-9-]+$/

- name: list audits has one
  GET: /v1/audits
  response_json_paths:
      $.audits[0].uuid: $HISTORY['create audit'].$RESPONSE['$.uuid']

- name: patch audit name
  PATCH: /v1/audits/$HISTORY['create audit'].$RESPONSE['$.uuid']
  request_headers:
      content-type: application/json
  data:
      - op: replace
        path: /name
        value: updated-audit-name
  status: 200
  response_json_paths:
      $.name: updated-audit-name

- name: delete audit
  DELETE: /v1/audits/$HISTORY['create audit'].$RESPONSE['$.uuid']
  status: 204

- name: get deleted audit 404
  GET: /v1/audits/$HISTORY['create audit'].$RESPONSE['$.uuid']
  status: 404

watcher/tests/functional/gabbits/microversions.yaml:

# Microversion testing

fixtures:
    - APIFixture

defaults:
    request_headers:
        x-auth-token: admin
        accept: application/json

tests:
- name: no version header defaults to 1.0
  GET: /v1
  response_headers:
      openstack-api-version: "infra-optim 1.0"

- name: explicit version 1.0
  GET: /v1
  request_headers:
      openstack-api-version: "infra-optim 1.0"
  response_headers:
      openstack-api-version: "infra-optim 1.0"

- name: latest version
  GET: /v1
  request_headers:
      openstack-api-version: "infra-optim latest"
  response_headers:
      # Check that response has a valid version
      openstack-api-version: /infra-optim \d+\.\d+/

- name: invalid version rejected
  GET: /v1
  request_headers:
      openstack-api-version: "infra-optim 999.999"
  status: 406

Commit 5.3: Specialized Gabbi Fixtures

watcher/tests/functional/fixtures/gabbi.py (additions):

# Add to existing gabbi.py file

class AuditFixture(APIFixture):
    """APIFixture with pre-created audit data.
    
    Creates:
    - A goal
    - A strategy
    - An audit template
    - A pending audit
    
    Useful for action plan and applier tests.
    """
    
    def start_fixture(self):
        # Call parent to set up base infrastructure
        super(AuditFixture, self).start_fixture()
        
        # Import helpers
        from watcher.tests.db import utils as db_utils
        from watcher import objects
        
        # Create goal
        goal = db_utils.create_test_goal(
            name=os.environ['GOAL_NAME'],
            uuid=os.environ['GOAL_UUID']
        )
        
        # Create strategy
        strategy = db_utils.create_test_strategy(
            name='dummy',
            uuid=os.environ['STRATEGY_UUID'],
            goal_id=goal.id
        )
        
        # Create audit template
        audit_template = db_utils.create_test_audit_template(
            name='test-template',
            goal_id=goal.id,
            strategy_id=strategy.id
        )
        
        # Create audit
        audit = db_utils.create_test_audit(
            uuid=os.environ['AUDIT_UUID'],
            name=os.environ['AUDIT_NAME'],
            audit_type='ONESHOT',
            state='PENDING',
            goal_id=goal.id,
            strategy_id=strategy.id,
            audit_template_id=audit_template.id
        )
        
        # Store IDs for tests
        os.environ['AUDIT_TEMPLATE_UUID'] = audit_template.uuid


class ActionPlanFixture(AuditFixture):
    """APIFixture with pre-created action plan data.
    
    Extends AuditFixture and adds:
    - A recommended action plan
    - Actions in the plan
    """
    
    def start_fixture(self):
        # Call parent to set up audit
        super(ActionPlanFixture, self).start_fixture()
        
        from watcher.tests.db import utils as db_utils
        
        # Get audit ID from environment
        audit_obj = objects.Audit.get_by_uuid(
            self.context, os.environ['AUDIT_UUID'])
        
        # Create action plan
        action_plan = db_utils.create_test_action_plan(
            uuid=os.environ['ACTION_PLAN_UUID'],
            audit_id=audit_obj.id,
            state='RECOMMENDED'
        )
        
        # Create action
        action = db_utils.create_test_action(
            uuid=os.environ['ACTION_UUID'],
            action_plan_id=action_plan.id,
            action_type='migrate',
            state='PENDING',
            input_parameters={
                'migration_type': 'live',
                'source_node': os.environ['SRC_NODE'],
                'destination_node': os.environ['DEST_NODE'],
                'resource_id': os.environ['INSTANCE_UUID'],
            }
        )

Commit Message:

Add Gabbi-based declarative API testing infrastructure

Introduce Gabbi test framework for declarative YAML-based API
testing. Gabbi tests complement Python functional tests by
focusing on:

- API behavior and contracts
- HTTP status codes and headers
- Microversion negotiation
- Request/response JSON structure

Key components:
- test_api_gabbi.py: Gabbi test loader
- fixtures/gabbi.py: APIFixture, AuditFixture, ActionPlanFixture
- fixtures/capture.py: Logging and warning capture
- gabbits/*.yaml: Example declarative tests

Gabbi tests use wsgi-intercept to route HTTP calls to an
in-process Pecan WSGI application, providing fast execution
without network overhead.

Part of functional test infrastructure implementation.

Related-Bug: #XXXXXXX

Phase 6: Example Functional Tests (2 commits)

Commit 5.1: Basic API Functional Test

watcher/tests/functional/test_api_audits.py:

"""Functional tests for Watcher audit API."""

from oslo_utils import uuidutils

from watcher.tests.functional import base


class TestAuditAPI(base.WatcherFunctionalTestCase):
    """Test audit API operations with real database and API."""
    
    def test_create_audit(self):
        """Test creating an audit via API."""
        # Create audit request
        audit_dict = {
            'audit_type': 'ONESHOT',
            'goal': 'server_consolidation',
            'name': 'test-audit',
        }
        
        # Create via API
        audit = self.api.create_audit(audit_dict)
        
        # Verify response
        self.assertIsNotNone(audit.get('uuid'))
        self.assertEqual('test-audit', audit['name'])
        self.assertEqual('PENDING', audit['state'])
        
        # Verify in database
        from watcher.common import context
        from watcher.objects import audit as audit_obj
        
        db_audit = audit_obj.Audit.get_by_uuid(
            self.context, audit['uuid'])
        self.assertEqual('test-audit', db_audit.name)
        self.assertEqual('server_consolidation', db_audit.goal.name)
    
    def test_list_audits(self):
        """Test listing audits via API."""
        # Create two audits
        audit1 = self.api.create_audit({
            'audit_type': 'ONESHOT',
            'goal': 'server_consolidation',
            'name': 'audit-1',
        })
        audit2 = self.api.create_audit({
            'audit_type': 'CONTINUOUS',
            'goal': 'thermal_optimization',
            'name': 'audit-2',
        })
        
        # List all audits
        audits = self.api.list_audits()
        
        # Verify
        self.assertEqual(2, len(audits))
        audit_uuids = [a['uuid'] for a in audits]
        self.assertIn(audit1['uuid'], audit_uuids)
        self.assertIn(audit2['uuid'], audit_uuids)
    
    def test_delete_audit(self):
        """Test deleting an audit via API."""
        # Create audit
        audit = self.api.create_audit({
            'audit_type': 'ONESHOT',
            'goal': 'server_consolidation',
            'name': 'test-audit',
        })
        audit_uuid = audit['uuid']
        
        # Delete
        self.api.delete_audit(audit_uuid)
        
        # Verify deleted
        from watcher import exception
        from watcher.objects import audit as audit_obj
        
        self.assertRaises(
            exception.AuditNotFound,
            audit_obj.Audit.get_by_uuid,
            self.context, audit_uuid)
    
    def test_create_audit_with_notification(self):
        """Test that audit creation emits notification."""
        # Create audit
        audit_dict = {
            'audit_type': 'ONESHOT',
            'goal': 'server_consolidation',
            'name': 'test-audit',
        }
        audit = self.api.create_audit(audit_dict)
        
        # Wait for notification
        notifications = self.notifier.wait_for_versioned_notifications(
            'audit.create', n_events=1, timeout=10.0)
        
        # Verify notification content
        self.assertEqual(1, len(notifications))
        payload = notifications[0]['payload']
        self.assertEqual(audit['uuid'],
                        payload['watcher_object.data']['uuid'])
        self.assertEqual('test-audit',
                        payload['watcher_object.data']['name'])

Commit 5.2: End-to-End Workflow Test

watcher/tests/functional/test_workflows.py:

"""End-to-end workflow functional tests."""

import time

from watcher.tests.functional import base


class TestAuditWorkflow(base.WatcherFunctionalTestCase):
    """Test complete audit → strategy → action plan workflow."""
    
    START_DECISION_ENGINE = True
    START_APPLIER = True
    
    def test_oneshot_audit_workflow(self):
        """Test complete workflow for ONESHOT audit.
        
        This test exercises:
        1. Creating an audit via API
        2. Decision engine picking up audit
        3. Strategy execution
        4. Action plan creation
        5. Applier executing actions
        """
        # Step 1: Create audit
        audit_dict = {
            'audit_type': 'ONESHOT',
            'goal': 'dummy',  # Use dummy strategy for testing
            'name': 'workflow-test',
        }
        audit = self.api.create_audit(audit_dict)
        audit_uuid = audit['uuid']
        
        # Step 2: Wait for audit to complete
        # Decision engine should pick it up and execute strategy
        for i in range(50):  # 5 seconds max
            audit = self.api.get_audit(audit_uuid)
            if audit['state'] in ('SUCCEEDED', 'FAILED', 'CANCELLED'):
                break
            time.sleep(0.1)
        
        self.assertEqual('SUCCEEDED', audit['state'],
                        'Audit did not complete successfully')
        
        # Step 3: Verify action plan was created
        action_plans = self.api.list_action_plans(audit_uuid=audit_uuid)
        self.assertGreater(len(action_plans), 0,
                          'No action plan created')
        
        action_plan = action_plans[0]
        self.assertIsNotNone(action_plan.get('uuid'))
        
        # Step 4: Trigger action plan
        self.api.start_action_plan(action_plan['uuid'])
        
        # Step 5: Wait for action plan execution
        for i in range(50):
            action_plan = self.api.get_action_plan(action_plan['uuid'])
            if action_plan['state'] in ('SUCCEEDED', 'FAILED', 'CANCELLED'):
                break
            time.sleep(0.1)
        
        self.assertEqual('SUCCEEDED', action_plan['state'],
                        'Action plan did not execute successfully')
        
        # Step 6: Verify notifications
        # Should have: audit.create, audit.update (x2), action_plan.create, etc.
        all_notifications = self.notifier.versioned_notifications
        audit_notifications = [n for n in all_notifications
                              if n['event_type'].startswith('audit.')]
        self.assertGreater(len(audit_notifications), 0)

Phase 6: Regression Test Framework (1 commit)

Directory Structure

watcher/tests/functional/regressions/
├── __init__.py
├── README.rst
└── test_bug_example.py  # Stub example

watcher/tests/functional/regressions/README.rst:

================================
Tests for Specific Regressions
================================

This directory contains regression tests for specific bugs reported in Launchpad.
Each test is designed to reproduce a bug and verify that it has been fixed.

Purpose
=======

Regression tests serve as long-term protection against bugs reoccurring. When a
significant bug is fixed, we create a functional test that:

1. Reproduces the exact conditions that triggered the bug
2. Verifies the bug is fixed
3. Prevents the bug from being reintroduced

These tests are MORE important than regular functional tests because they represent
real-world problems that affected users.

When to Create a Regression Test
=================================

Create a regression test when:

- A bug requires complex setup (multiple services, specific state)
- The bug involves interaction between multiple components
- The bug is non-obvious and could easily be reintroduced
- The bug caused significant user impact

Do NOT create regression tests for:

- Simple one-line fixes that are covered by unit tests
- Bugs in test code itself
- Documentation bugs

Writing Regression Tests
=========================

File Naming
-----------

Regression test files MUST be named: ``test_bug_<launchpad_id>.py``

Example: ``test_bug_1234567.py`` for bug #1234567

Class Naming
------------

Use a descriptive class name that explains what the bug was:

.. code-block:: python

    # Good
    class TestAuditFailsWithEmptyStrategy(base.WatcherFunctionalTestCase):
        """Regression test for bug #1234567."""

    # Bad
    class TestBug1234567(base.WatcherFunctionalTestCase):
        """Test for bug."""

Test Structure
--------------

Each regression test should have:

1. **Comprehensive docstring** explaining:
   - What the bug was
   - How to reproduce it
   - What the fix was
   - Why this test prevents regression

2. **Self-contained setup** with explicit fixtures

3. **Minimal inheritance** - inherit directly from base test class

4. **Clear test steps** with comments

Example Template
================

.. code-block:: python

    """Regression test for bug #1234567.

    Description of what the bug was and how it manifested to users.
    """

    from watcher.tests.functional import base


    class TestDescriptiveName(base.WatcherFunctionalTestCase):
        """Regression test for bug #1234567.
        
        Before the fix: Describe broken behavior
        After the fix: Describe correct behavior
        
        Root cause: Explain technical cause
        
        The test verifies: What this test checks
        """
        
        def setUp(self):
            super(TestDescriptiveName, self).setUp()
            # Explicit fixture setup
            # Any special configuration
        
        def test_specific_scenario(self):
            """Test the specific scenario that triggered the bug."""
            # Step 1: Setup condition
            # Step 2: Trigger bug scenario
            # Step 3: Verify fix works
            pass

Writing Tests Before the Bug is Fixed
======================================

When possible, write the test to demonstrate the bug BEFORE fixing it:

1. Write test that reproduces broken behavior
2. Assert the current (broken) behavior
3. Comment out expected (correct) assertions
4. Commit with "Related-Bug: #XXXXXX"
5. Fix the bug in production code
6. Update test: swap assertions (broken → commented, expected → active)
7. Commit with "Closes-Bug: #XXXXXX"

Example:

.. code-block:: python

    def test_audit_with_empty_strategy(self):
        """Test audit doesn't fail with empty strategy."""
        audit = self.api.create_audit({'goal': 'dummy'})
        
        # BUG: Currently fails with 500 error
        # This demonstrates the broken behavior:
        response = self.api.get_audit(audit['uuid'])
        self.assertEqual(500, response.status_code)
        
        # EXPECTED (commented out until bug is fixed):
        # response = self.api.get_audit(audit['uuid'])
        # self.assertEqual(200, response.status_code)
        # self.assertEqual('SUCCEEDED', response.json()['state'])

Then after the fix:

.. code-block:: python

    def test_audit_with_empty_strategy(self):
        """Test audit doesn't fail with empty strategy."""
        audit = self.api.create_audit({'goal': 'dummy'})
        
        # BUG FIXED: Now returns success
        # Old assertion (demonstrated broken behavior):
        # response = self.api.get_audit(audit['uuid'])
        # self.assertEqual(500, response.status_code)
        
        # Correct behavior after fix:
        response = self.api.get_audit(audit['uuid'])
        self.assertEqual(200, response.status_code)
        self.assertEqual('SUCCEEDED', response.json()['state'])

Stability Over Reuse
====================

Regression tests prioritize STABILITY over code reuse:

**Good:**

- Explicit fixture setup in setUp()
- Minimal inheritance (just base test class)
- Self-contained test methods
- Clear, verbose assertions

**Bad:**

- Deep inheritance from other test classes
- Hidden fixture dependencies
- Relying on helper methods that might change
- Terse, unclear assertions

The goal is that regression tests should continue to work even if other
test infrastructure changes significantly.

Example: Explicit Fixtures
---------------------------

.. code-block:: python

    # Good - Explicit and stable
    class TestBug123(base.WatcherFunctionalTestCase):
        def setUp(self):
            super(TestBug123, self).setUp()
            # Clear what fixtures this test uses
            self.useFixture(watcher_fixtures.NovaFixture(self))
            self.useFixture(watcher_fixtures.GnocchiFixture(self))

    # Bad - Hidden dependencies
    class TestBug123(SomeOtherTestClass):
        # What fixtures does SomeOtherTestClass set up?
        # If it changes, this test breaks even though bug hasn't regressed
        pass

Running Regression Tests
=========================

Run all regression tests:

.. code-block:: bash

    tox -e functional -- watcher.tests.functional.regressions

Run specific regression test:

.. code-block:: bash

    tox -e functional -- watcher.tests.functional.regressions.test_bug_1234567

With debug logging:

.. code-block:: bash

    OS_DEBUG=1 tox -e functional -- watcher.tests.functional.regressions.test_bug_1234567

watcher/tests/functional/regressions/test_bug_example.py:

"""Example regression test (stub).

This is a template showing how to write regression tests for Watcher.
Replace this with actual bug regression tests.
"""

from watcher.tests.functional import base


class TestExampleRegression(base.WatcherFunctionalTestCase):
    """Example regression test structure.
    
    This stub demonstrates the structure of a regression test.
    
    In a real regression test, you would:
    
    1. Describe the bug in detail in the docstring
    2. Explain how to reproduce it
    3. Document the fix
    4. Write test that verifies fix prevents regression
    
    Example:
    --------
    
    Bug #1234567: Audit fails when goal has no strategies
    
    Before the fix: Creating an audit with a goal that has no available
    strategies would cause the decision engine to crash with an
    unhandled exception.
    
    After the fix: The audit completes with state FAILED and an
    appropriate error message.
    
    Root cause: The strategy selector didn't handle the case where
    no strategies were available for a goal.
    
    The test verifies: Creating an audit with an empty strategy list
    results in FAILED state with proper error message, not a crash.
    """
    
    def setUp(self):
        super(TestExampleRegression, self).setUp()
        
        # Explicit fixture setup
        # For regression tests, make all fixtures explicit even if
        # the base class provides them. This ensures long-term stability.
        
        # Example: Override configuration for this specific test
        self.flags(some_option='specific_value',
                  group='watcher_decision_engine')
    
    def test_example_scenario(self):
        """Test the specific scenario that triggered the bug.
        
        This is where you reproduce the exact conditions that caused
        the bug and verify that it's fixed.
        """
        # Step 1: Set up preconditions
        # Create any necessary database records, etc.
        
        # Step 2: Trigger the scenario that caused the bug
        # E.g., create an audit, start an action plan, etc.
        
        # Step 3: Verify the fix works
        # Assert the correct behavior, not the broken behavior
        
        # For demonstration purposes only:
        self.skipTest("This is an example stub, not a real test")

Phase 7: Contributor Documentation (1 commit)

doc/source/contributor/functional-testing.rst:

==================
Functional Testing
==================

This guide explains how to write and run functional tests for Watcher,
including both Python-based and Gabbi (YAML-based) tests.

What Are Functional Tests?
===========================

Functional tests are integration tests that verify multiple components
working together with minimal mocking. They sit between unit tests and
full integration tests:

+----------------+------------------+-------------------+------------------+
| Test Type      | Scope            | Mocking           | Speed            |
+================+==================+===================+==================+
| Unit           | Single function  | Extensive mocking | Very fast        |
|                | or class         |                   |                  |
+----------------+------------------+-------------------+------------------+
| Functional     | Multiple         | External services | Fast             |
|                | components       | only              |                  |
+----------------+------------------+-------------------+------------------+
| Integration    | Complete system  | Minimal or none   | Slow             |
+----------------+------------------+-------------------+------------------+

When to Write Functional Tests
===============================

Write functional tests when:

- Testing workflows spanning multiple components
- Verifying RPC interactions between services
- Testing database migrations with real data
- Reproducing complex bugs (regression tests)
- Validating API contracts with real WSGI application

When to Write Unit Tests Instead
=================================

Write unit tests when:

- Testing a single function or method
- Testing edge cases and error conditions
- Testing algorithmic logic
- Mock dependencies are simple and clear

Key Differences from Unit Tests
================================

Functional vs Unit Tests
-------------------------

+------------------+----------------------------+----------------------------+
| Aspect           | Unit Tests                 | Functional Tests           |
+==================+============================+============================+
| **Location**     | ``watcher/tests/unit/``    | ``watcher/tests/functional/``|
+------------------+----------------------------+----------------------------+
| **Base Class**   | ``watcher.tests.unit.base``| ``watcher.tests.functional``|
|                  | ``.WatcherTestCase``       | ``.base.WatcherFunctional`` |
|                  |                            | ``TestCase``               |
+------------------+----------------------------+----------------------------+
| **Mocking**      | Extensive - mock           | Minimal - only external    |
|                  | everything except the      | services (Nova, Gnocchi,   |
|                  | code under test            | etc.)                      |
+------------------+----------------------------+----------------------------+
| **Database**     | Mocked or no database      | Real SQLite in-memory      |
+------------------+----------------------------+----------------------------+
| **RPC**          | Mocked                     | Real oslo.messaging fake   |
|                  |                            | driver                     |
+------------------+----------------------------+----------------------------+
| **API**          | Mock API calls             | Real Pecan WSGI app via    |
|                  |                            | wsgi-intercept             |
+------------------+----------------------------+----------------------------+
| **Services**     | Not started                | Can start decision engine, |
|                  |                            | applier services           |
+------------------+----------------------------+----------------------------+

Writing Your First Functional Test
===================================

Basic Template
--------------

.. code-block:: python

    """Functional tests for audit operations."""

    from watcher.tests.functional import base


    class TestAuditOperations(base.WatcherFunctionalTestCase):
        """Test audit creation and execution."""
        
        def test_create_audit(self):
            """Test creating an audit via API."""
            # Create audit
            audit = self.api.create_audit({
                'audit_type': 'ONESHOT',
                'goal': 'server_consolidation',
                'name': 'test-audit',
            })
            
            # Verify
            self.assertEqual('PENDING', audit['state'])
            self.assertEqual('test-audit', audit['name'])

Test Structure
--------------

1. Import from ``watcher.tests.functional.base``
2. Inherit from ``WatcherFunctionalTestCase``
3. Use ``self.api`` or ``self.admin_api`` for API operations
4. Use ``self.context`` for direct database operations
5. Use fixtures (``self.nova``, ``self.gnocchi``) for external services

Available Test Fixtures
========================

The base functional test class provides these fixtures automatically:

API Clients
-----------

- ``self.api`` - Regular user API client
- ``self.admin_api`` - Admin user API client

Database
--------

- In-memory SQLite database with real Watcher schema
- Automatically cleaned up after each test

RPC
---

- ``oslo.messaging`` with ``fake://`` transport
- Synchronous for deterministic testing

External Services
-----------------

- ``self.nova`` - Nova API mock (``NovaFixture``)
- ``self.gnocchi`` - Gnocchi API mock (``GnocchiFixture``)

Notifications
-------------

- ``self.notifier`` - Notification capture fixture

Starting Services
-----------------

Set class attributes to auto-start services:

.. code-block:: python

    class TestWithServices(base.WatcherFunctionalTestCase):
        START_DECISION_ENGINE = True
        START_APPLIER = True

Or start manually:

.. code-block:: python

    def setUp(self):
        super().setUp()
        self.start_service('watcher-decision-engine')

Working with External Service Fixtures
=======================================

Nova Fixture
------------

The Nova fixture provides mocked Nova API operations:

.. code-block:: python

    def test_with_nova_instances(self):
        """Test using Nova fixture."""
        # Use pre-defined instances
        instances = self.nova.get_instance_list()
        self.assertEqual(2, len(instances))
        
        # Add custom instance
        custom_instance = {
            'id': 'custom-id',
            'name': 'custom-instance',
            'status': 'ACTIVE',
            'OS-EXT-SRV-ATTR:host': 'custom-host',
        }
        self.nova.add_instance(custom_instance)
        
        # Simulate live migration
        self.nova.live_migrate('instance-id', 'dest-host')

Gnocchi Fixture
---------------

The Gnocchi fixture provides mocked metrics:

.. code-block:: python

    def test_with_gnocchi_metrics(self):
        """Test using Gnocchi fixture."""
        # Get measures
        measures = self.gnocchi.get_measures('cpu-metric-1')
        self.assertGreater(len(measures), 0)
        
        # Set custom measures
        custom_measures = [
            {'timestamp': '2025-10-07T10:00:00', 'value': 50.0},
            {'timestamp': '2025-10-07T10:01:00', 'value': 55.0},
        ]
        self.gnocchi.set_measures('custom-metric', custom_measures)

Verifying Notifications
========================

Use the notification fixture to verify events:

.. code-block:: python

    def test_audit_notification(self):
        """Test that audit creation emits notification."""
        # Perform operation
        audit = self.api.create_audit({'goal': 'dummy'})
        
        # Wait for notification
        notifications = self.notifier.wait_for_versioned_notifications(
            'audit.create', n_events=1, timeout=10.0)
        
        # Verify
        self.assertEqual(1, len(notifications))
        self.assertEqual(audit['uuid'],
                        notifications[0]['payload']['uuid'])

Configuration Overrides
=======================

Override configuration for specific tests:

.. code-block:: python

    def test_with_custom_config(self):
        """Test with custom configuration."""
        # Override for this test only
        self.flags(period_interval=60,
                  group='watcher_decision_engine')
        
        # Configuration automatically restored after test

Running Functional Tests
=========================

Run All Functional Tests
-------------------------

.. code-block:: bash

    tox -e functional

Run Specific Test Module
-------------------------

.. code-block:: bash

    tox -e functional -- watcher.tests.functional.test_api_audits

Run Specific Test
-----------------

.. code-block:: bash

    tox -e functional -- watcher.tests.functional.test_api_audits.TestAuditAPI.test_create_audit

With Debug Logging
------------------

.. code-block:: bash

    OS_DEBUG=1 tox -e functional -- watcher.tests.functional.test_api_audits

Best Practices
==============

DO
--

- ✅ Test complete workflows
- ✅ Use real Watcher code (API, DB, RPC)
- ✅ Use fixtures for external services
- ✅ Verify notifications when appropriate
- ✅ Test both success and failure scenarios
- ✅ Add regression tests for complex bugs

DON'T
-----

- ❌ Mock Watcher's own code
- ❌ Test implementation details
- ❌ Write tests that depend on timing
- ❌ Leave resources uncleaned (fixtures handle this)
- ❌ Skip error cases

Example: Complete Workflow Test
================================

.. code-block:: python

    """Example of comprehensive workflow test."""

    import time

    from watcher.tests.functional import base


    class TestCompleteWorkflow(base.WatcherFunctionalTestCase):
        """Test end-to-end audit workflow."""
        
        START_DECISION_ENGINE = True
        START_APPLIER = True
        
        def test_audit_to_execution(self):
            """Test complete workflow from audit to action execution."""
            # Step 1: Create audit
            audit = self.api.create_audit({
                'audit_type': 'ONESHOT',
                'goal': 'dummy',
                'name': 'workflow-test',
            })
            audit_uuid = audit['uuid']
            
            # Step 2: Wait for audit to complete
            for i in range(50):
                audit = self.api.get_audit(audit_uuid)
                if audit['state'] in ('SUCCEEDED', 'FAILED'):
                    break
                time.sleep(0.1)
            
            self.assertEqual('SUCCEEDED', audit['state'])
            
            # Step 3: Get action plan
            action_plans = self.api.list_action_plans(audit_uuid=audit_uuid)
            self.assertGreater(len(action_plans), 0)
            
            # Step 4: Execute action plan
            action_plan = action_plans[0]
            self.api.start_action_plan(action_plan['uuid'])
            
            # Step 5: Wait for completion
            for i in range(50):
                action_plan = self.api.get_action_plan(action_plan['uuid'])
                if action_plan['state'] in ('SUCCEEDED', 'FAILED'):
                    break
                time.sleep(0.1)
            
            self.assertEqual('SUCCEEDED', action_plan['state'])
            
            # Step 6: Verify notifications
            notifications = self.notifier.versioned_notifications
            audit_events = [n for n in notifications
                           if n['event_type'].startswith('audit.')]
            self.assertGreater(len(audit_events), 0)

Troubleshooting
===============

Test Hangs
----------

If a test hangs:

1. Check if you're waiting for a service that isn't started
2. Verify RPC ``CastAsCallFixture`` is enabled (it is by default)
3. Check for deadlocks in service interaction

Database Errors
---------------

If you see database errors:

1. Ensure ``USES_DB = True`` in your test class
2. Check that migrations are up to date
3. Verify schema cache isn't corrupted (delete and regenerate)

Import Errors
-------------

If fixtures can't be imported:

1. Check that you're importing from ``watcher.tests.local_fixtures``
2. Verify ``__init__.py`` exports the fixture
3. Ensure fixture file is in the correct location

Gabbi Tests (YAML-Based API Testing)
======================================

In addition to Python functional tests, Watcher supports Gabbi tests for
declarative API testing.

What Are Gabbi Tests?
---------------------

Gabbi tests are YAML-based HTTP API tests. They provide a simple, declarative
way to test REST APIs without writing Python code.

When to Use Gabbi Tests
------------------------

Use Gabbi tests for:

- **API behavior**: HTTP status codes, headers, response structure
- **Microversion testing**: Version-specific API behavior
- **Simple CRUD operations**: Create, read, update, delete endpoints
- **Error responses**: Invalid requests and error handling

Use Python functional tests for:

- **Complex workflows**: Multi-step operations spanning services
- **Asynchronous operations**: Waiting for state changes
- **Service integration**: Decision engine and applier interaction
- **Complex assertions**: Beyond HTTP response validation

Writing Gabbi Tests
-------------------

Create a YAML file in ``watcher/tests/functional/gabbits/``:

.. code-block:: yaml

    # audit-create.yaml
    fixtures:
        - APIFixture
    
    defaults:
        request_headers:
            x-auth-token: admin
            accept: application/json
            content-type: application/json
            openstack-api-version: infra-optim 1.0
    
    tests:
    - name: create audit
      POST: /v1/audits
      data:
          name: test-audit
          audit_type: ONESHOT
          goal: dummy
      status: 201
      response_headers:
          location: //v1/audits/[a-f0-9-]+/
      response_json_paths:
          $.name: test-audit
          $.state: PENDING

**YAML Structure:**

- ``fixtures``: List of fixture class names (from ``fixtures/gabbi.py``)
- ``defaults``: Default headers for all tests in file
- ``tests``: Sequential list of test cases

**Test Case Fields:**

- ``name``: Descriptive test name
- ``GET/POST/PUT/PATCH/DELETE``: HTTP method and URL
- ``request_headers``: Request headers (optional)
- ``data``: Request body as JSON (optional)
- ``status``: Expected HTTP status code
- ``response_headers``: Expected response headers (regex allowed)
- ``response_json_paths``: JSONPath assertions on response body

Using Environment Variables
~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Fixtures set environment variables that tests can reference:

.. code-block:: yaml

    tests:
    - name: create audit with UUID
      POST: /v1/audits
      data:
          name: $ENVIRON['AUDIT_NAME']
          uuid: $ENVIRON['AUDIT_UUID']
          audit_type: ONESHOT
          goal: dummy
      status: 201

Referencing Previous Tests
~~~~~~~~~~~~~~~~~~~~~~~~~~~

Tests can reference responses from earlier tests:

.. code-block:: yaml

    tests:
    - name: create audit
      POST: /v1/audits
      data:
          name: test-audit
          audit_type: ONESHOT
          goal: dummy
      status: 201
    
    - name: get created audit
      GET: $LOCATION  # Uses Location header from previous test
      response_json_paths:
          # Reference previous test's response
          $.uuid: $HISTORY['create audit'].$RESPONSE['$.uuid']
          $.name: test-audit

Gabbi Fixtures
--------------

Gabbi fixtures extend ``gabbi.fixture.GabbiFixture`` and run once per YAML file.

**Available Fixtures:**

- ``APIFixture``: Base fixture, empty database
- ``AuditFixture``: Pre-creates goal, strategy, audit
- ``ActionPlanFixture``: Pre-creates goal, strategy, audit, action plan

**Example Using Pre-Created Data:**

.. code-block:: yaml

    # action-plan.yaml
    fixtures:
        - ActionPlanFixture  # Has pre-created action plan
    
    tests:
    - name: get action plan
      GET: /v1/action_plans/$ENVIRON['ACTION_PLAN_UUID']
      status: 200
      response_json_paths:
          $.uuid: $ENVIRON['ACTION_PLAN_UUID']
          $.state: RECOMMENDED

Testing Microversions
----------------------

Test microversion-specific behavior:

.. code-block:: yaml

    # microversions.yaml
    tests:
    - name: old version rejects new field
      POST: /v1/audits
      request_headers:
          openstack-api-version: "infra-optim 1.4"
      data:
          audit_type: ONESHOT
          goal: dummy
          new_field: value  # Added in 1.5
      status: 400
    
    - name: new version accepts new field
      POST: /v1/audits
      request_headers:
          openstack-api-version: "infra-optim 1.5"
      data:
          audit_type: ONESHOT
          goal: dummy
          new_field: value
      status: 201

Running Gabbi Tests
-------------------

Run all gabbi tests:

.. code-block:: bash

    tox -e functional -- test_api_gabbi

Run specific YAML file:

.. code-block:: bash

    tox -e functional -- test_api_gabbi.AuditLifecycleGabbits

With debug logging:

.. code-block:: bash

    OS_DEBUG=1 tox -e functional -- test_api_gabbi

Gabbi Test Organization
-----------------------

**File Naming:**

- Use descriptive names: ``audit-lifecycle.yaml``, not ``test1.yaml``
- One file per API resource or concept
- Tests in a file run sequentially; files run in parallel

**Test Naming:**

- Use descriptive names that explain what is tested
- Good: ``"create audit with invalid goal returns 400"``
- Bad: ``"test 1"``

**File Organization:**

.. code-block:: text

    watcher/tests/functional/gabbits/
    ├── basic-http.yaml          # Basic API behavior
    ├── audit-lifecycle.yaml     # Audit CRUD
    ├── action-plan.yaml         # Action plan operations
    ├── microversions.yaml       # Version testing
    ├── goal.yaml                # Goal endpoints
    ├── strategy.yaml            # Strategy endpoints
    └── service.yaml             # Service endpoints

Gabbi vs Python Tests
---------------------

**Use Gabbi when:**

- Testing single API endpoints
- Verifying HTTP status codes and headers
- Testing microversions
- Checking error responses
- Tests fit declarative format

**Use Python when:**

- Testing multi-step workflows
- Need to wait for async operations
- Complex state management required
- Service-to-service interaction
- Need full Python capabilities

Further Reading
===============

- :doc:`regression-testing` - Writing regression tests
- :doc:`/contributor/testing` - General testing guidelines
- `Gabbi Documentation <https://gabbi.readthedocs.io/>`_
- `OpenStack Functional Testing Guide <https://docs.openstack.org/nova/latest/contributor/testing/functional-tests.html>`_
- `Placement Gabbi Tests <https://opendev.org/openstack/placement/src/branch/master/placement/tests/functional/gabbits>`_

Phase 8: CI Integration (1 commit)

Update tox.ini:

[tox]
minversion = 3.18.0
envlist = py3,functional,pep8
skipsdist = False

[testenv]
usedevelop = True
install_command = pip install -c{env:TOX_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/master} {opts} {packages}
setenv =
    VIRTUAL_ENV={envdir}
    LANGUAGE=en_US
    LC_ALL=en_US.utf-8
    OS_STDOUT_CAPTURE=1
    OS_STDERR_CAPTURE=1
    OS_TEST_TIMEOUT=160
    PYTHONDONTWRITEBYTECODE=1
deps =
    -r{toxinidir}/requirements.txt
    -r{toxinidir}/test-requirements.txt
passenv =
    OS_DEBUG
commands =
    stestr run --test-path=./watcher/tests/unit {posargs}

[testenv:functional{,-py310,-py311,-py312}]
description =
    Run functional tests for Watcher.
setenv =
    {[testenv]setenv}
deps =
    {[testenv]deps}
commands =
    stestr run --test-path=./watcher/tests/functional {posargs}
    stestr slowest

[testenv:functional-regression]
description =
    Run regression tests only.
setenv =
    {[testenv:functional]setenv}
deps =
    {[testenv:functional]deps}
commands =
    stestr run --test-path=./watcher/tests/functional/regressions {posargs}

Update .zuul.yaml:

- job:
    name: watcher-functional
    parent: openstack-tox-functional-py312
    description: |
      Run functional tests for the Watcher project.
    required-projects:
      - openstack/watcher
    irrelevant-files:
      - ^.*\.rst$
      - ^doc/.*$
      - ^releasenotes/.*$
      - ^watcher/locale/.*$
    vars:
      zuul_work_dir: src/opendev.org/openstack/watcher
      tox_envlist: functional
    timeout: 1800

- job:
    name: watcher-functional-regression
    parent: watcher-functional
    description: |
      Run regression tests for the Watcher project.
    vars:
      tox_envlist: functional-regression

- project:
    check:
      jobs:
        - watcher-functional
    gate:
      jobs:
        - watcher-functional
    periodic:
      jobs:
        - watcher-functional-regression

Gabbi Test Integration

What are Gabbi Tests?

Gabbi is a YAML-based declarative HTTP testing framework used extensively in OpenStack, particularly in the Placement service. It provides a clean, readable way to test REST APIs without writing Python code.

Why Gabbi for Watcher?

  1. API Contract Testing: Verify API behavior, status codes, headers
  2. Microversion Testing: Test API version negotiation and version-specific behavior
  3. Declarative Syntax: Easy to read and write, even for non-Python developers
  4. Fast Execution: Uses wsgi-intercept for in-process HTTP calls
  5. Sequential Tests: Tests in a YAML file run in order, allowing state progression
  6. Proven Pattern: Widely used in OpenStack (Placement, Ironic, Cyborg)

Gabbi vs Python Functional Tests

Aspect Gabbi Tests Python Functional Tests
Format YAML Python code
Use Case API contracts, HTTP behavior Complex workflows, integration
State Management Environment variables, $HISTORY Python objects, fixtures
Learning Curve Low (declarative) Medium (requires Python)
Expressiveness Limited to HTTP assertions Full Python capabilities
Best For API behavior, microversions Multi-service interactions

Architecture

┌─────────────────────────────────────────────────────┐
│  Gabbi YAML Test Files (gabbits/*.yaml)             │
│  - Declarative HTTP test definitions                │
│  - Environment variable substitution                │
│  - Sequential test ordering within files            │
└──────────────────────────┬──────────────────────────┘
                           │
┌──────────────────────────▼──────────────────────────┐
│  Test Loader (test_api_gabbi.py)                    │
│  - gabbi.driver.build_tests()                       │
│  - Discovers YAML files in gabbits/                 │
│  - Creates Python test cases                        │
└──────────────────────────┬──────────────────────────┘
                           │
┌──────────────────────────▼──────────────────────────┐
│  Gabbi Fixture Layer (fixtures/gabbi.py)            │
│  - APIFixture (base, no pre-created data)           │
│  - AuditFixture (with goals, strategies)            │
│  - ActionPlanFixture (with action plans)            │
└──────────────────────────┬──────────────────────────┘
                           │
      ┌────────────────────┼────────────────────┐
      │                    │                    │
┌─────▼──────┐  ┌─────────▼────────┐  ┌───────▼──────┐
│ Config     │  │ Database         │  │ Pecan App    │
│ (noauth)   │  │ (SQLite in-mem)  │  │ (wsgi-       │
│            │  │                  │  │ intercept)   │
└────────────┘  └──────────────────┘  └──────────────┘

Key Design Decisions

1. Fixture Reuse

Gabbi fixtures reuse components from Python functional tests:

Shared Components:

  • Database fixture (from watcher.tests.fixtures)
  • PolicyFixture (from watcher.tests.unit.policy_fixture)
  • ConfFixture (oslo.config fixture)
  • Test data helpers (watcher.tests.db.utils)

Gabbi-Specific:

  • capture.Logging - Log capture for gabbi tests
  • capture.WarningsFixture - Warning filtering
  • gabbi.APIFixture - GabbiFixture base class
  • Environment variable setup

Why This Approach:

  • ✅ Avoids duplication of database/config setup
  • ✅ Ensures consistency between test types
  • ✅ Leverages existing test data creation helpers
  • ✅ Simplifies maintenance

2. Test Organization

watcher/tests/functional/
├── test_api_gabbi.py        # Gabbi test loader (load_tests protocol)
├── test_api_audits.py       # Python functional tests for audits
├── test_workflows.py        # Python functional tests for workflows
├── fixtures/
│   ├── gabbi.py            # Gabbi-specific fixtures (APIFixture, etc.)
│   └── capture.py          # Logging/warning fixtures for gabbi
├── gabbits/                # Gabbi YAML test files
│   ├── basic-http.yaml
│   ├── audit-lifecycle.yaml
│   ├── action-plan.yaml
│   ├── microversions.yaml
│   ├── goal.yaml
│   ├── strategy.yaml
│   └── service.yaml
└── regressions/            # Python regression tests
    └── test_bug_*.py

Organization Principles:

  • Gabbi tests: One YAML file per API resource or concept
  • Python tests: One file per major functional area
  • Regressions: Bug-specific regression tests (Python only)

3. Fixture Hierarchy

GabbiFixture (from gabbi package)
    │
    ├── APIFixture (base)
    │   - Database
    │   - Config (noauth)
    │   - Policy
    │   - Environment variables (UUIDs)
    │   - No pre-created data
    │
    ├── AuditFixture (extends APIFixture)
    │   - Pre-creates: goal, strategy, audit template, audit
    │   - For tests that need existing audit data
    │
    └── ActionPlanFixture (extends AuditFixture)
        - Pre-creates: action plan, actions
        - For tests that need existing action plan data

Fixture Selection Guidelines:

Test Scenario Use Fixture Reason
Create goal APIFixture No pre-data needed
Create audit APIFixture No pre-data needed
List audits APIFixture or AuditFixture Depends on test
Create action plan AuditFixture Needs existing audit
Start action plan ActionPlanFixture Needs existing plan
Microversion tests APIFixture Independent of data

4. Pecan WSGI App Setup

Challenge: Watcher uses Pecan, not a deploy.loadapp() pattern like Placement.

Solution: Custom setup_app() function:

def setup_app():
    """Create Pecan WSGI app for gabbi tests."""
    from watcher.api import config as api_config
    import pecan
    
    # Load Pecan config
    pecan_config = pecan.configuration.conf_from_dict(
        api_config.PECAN_CONFIG)
    app_conf = dict(pecan_config.app)
    
    # Disable ACL for testing
    app_conf['enable_acl'] = False
    
    # Create app
    app = pecan.make_app(
        app_conf.pop('root'),
        logging=getattr(pecan_config, 'logging', {}),
        debug=True,
        **app_conf
    )
    
    return app

Key Points:

  • Disables ACL (enable_acl=False) - policy still enforced via PolicyFixture
  • Uses debug mode for better error messages
  • Returns raw Pecan app (no additional middleware needed for tests)

Example Gabbi Test Anatomy

gabbits/audit-lifecycle.yaml:

# Fixtures to use (from fixtures/gabbi.py)
fixtures:
    - APIFixture

# Default headers for all tests
defaults:
    request_headers:
        x-auth-token: admin
        accept: application/json
        content-type: application/json
        openstack-api-version: infra-optim 1.0

# Sequential tests (run in order)
tests:
- name: list audits empty
  GET: /v1/audits
  response_json_paths:
      $.audits: []

- name: create audit
  POST: /v1/audits
  data:
      name: $ENVIRON['AUDIT_NAME']  # Environment variable
      audit_type: ONESHOT
      goal: dummy
  status: 201
  response_headers:
      location: //v1/audits/[a-f0-9-]+/  # Regex match
  response_json_paths:
      $.name: $ENVIRON['AUDIT_NAME']
      $.state: PENDING

- name: get audit
  GET: $LOCATION  # Uses Location header from previous test
  response_json_paths:
      # Reference previous test response
      $.uuid: $HISTORY['create audit'].$RESPONSE['$.uuid']
      $.name: $ENVIRON['AUDIT_NAME']

Key Features:

  1. Environment Variables: $ENVIRON['AUDIT_NAME'] - set by fixture
  2. Response References: $LOCATION - uses previous response Location header
  3. History: $HISTORY['test-name'].$RESPONSE['$.uuid'] - reference prior test data
  4. JSONPath: $.audits[0].uuid - assert JSON structure
  5. Regex: /^[a-f0-9-]+$/ - pattern matching

Test Execution and Parallelism

stestr Configuration

.stestr.conf update:

[DEFAULT]
test_path=./watcher/tests/unit
top_dir=./

# Gabbi test grouping
# Ensures tests from the same YAML file run in the same process
# (maintains test ordering within a file)
group_regex=watcher\.tests\.functional\.test_api_gabbi(?:\.|_)([^_]+)

How It Works:

  • Tests within a YAML file run sequentially (maintains order)
  • Different YAML files run in parallel (for speed)
  • Pattern extracts YAML filename from test name
  • All tests with same capture group run together

Example:

Test name: watcher.tests.functional.test_api_gabbi.AuditLifecycleGabbits.test_001_list_audits_empty
Captures: AuditLifecycleGabbits (from audit-lifecycle.yaml)

All AuditLifecycleGabbits tests run in one process sequentially.

Documentation Strategy

For API Documentation

Gabbi tests can serve as executable API documentation:

  1. Clear test names: Describe what API does
  2. Complete examples: Show request/response structure
  3. Version tests: Document microversion behavior
  4. Error cases: Show error responses

Example:

tests:
- name: create audit with invalid goal returns 400
  DESC: |
    When creating an audit with a non-existent goal,
    the API returns 400 Bad Request with a clear error message.
  POST: /v1/audits
  data:
      audit_type: ONESHOT
      goal: non-existent-goal
  status: 400
  response_json_paths:
      $.errors[0].code: InvalidGoal

Contributor Documentation

New section in doc/source/contributor/functional-testing.rst:

Gabbi Tests
===========

Gabbi tests are declarative YAML-based API tests. Use them for:

- API behavior verification
- Microversion testing
- HTTP status code checks
- Response header validation

Writing Gabbi Tests
-------------------

Create a YAML file in watcher/tests/functional/gabbits/:

.. code-block:: yaml

    fixtures:
        - APIFixture
    
    defaults:
        request_headers:
            x-auth-token: admin
            accept: application/json
    
    tests:
    - name: create audit
      POST: /v1/audits
      data:
          audit_type: ONESHOT
          goal: dummy
      status: 201

Running Gabbi Tests
-------------------

.. code-block:: bash

    # Run all gabbi tests
    tox -e functional -- test_api_gabbi
    
    # Run specific YAML file
    tox -e functional -- test_api_gabbi.AuditLifecycleGabbits

Microversion Testing Pattern

Goal: Test that new API features are only available in appropriate microversions.

gabbits/microversions.yaml:

fixtures:
    - APIFixture

tests:
- name: old microversion rejects new field
  DESC: |
    Feature X was added in microversion 1.5.
    Requests with older versions should reject the new field.
  POST: /v1/audits
  request_headers:
      openstack-api-version: "infra-optim 1.4"
  data:
      audit_type: ONESHOT
      goal: dummy
      new_field: value  # Only valid in 1.5+
  status: 400
  response_json_paths:
      $.errors[0].detail: /.*new_field.*not supported.*version 1.4/

- name: new microversion accepts new field
  POST: /v1/audits
  request_headers:
      openstack-api-version: "infra-optim 1.5"
  data:
      audit_type: ONESHOT
      goal: dummy
      new_field: value  # Valid in 1.5+
  status: 201

Benefits Summary

For Watcher Project:

  1. Faster API Testing: Declarative tests are quick to write
  2. Better Coverage: Easy to add tests for all API endpoints
  3. Microversion Validation: Explicit version behavior testing
  4. Documentation: Tests serve as API usage examples
  5. Reduced Maintenance: Less Python code to maintain
  6. OpenStack Alignment: Follows Placement/Ironic patterns

For Contributors:

  1. Low Barrier: No Python expertise needed for basic API tests
  2. Clear Intent: YAML format is self-documenting
  3. Quick Feedback: Fast test execution
  4. Easy Review: Diff shows exactly what API behavior changed

Integration with Python Functional Tests

Complementary, Not Replacement:

Test Type Best For Example
Gabbi API contracts, single requests "POST /v1/audits returns 201"
Python Multi-step workflows "Audit → Strategy → Action Plan → Execute"
Gabbi Microversion behavior "Field X only in version 1.5+"
Python Service integration "Decision engine creates action plan via RPC"
Gabbi Error responses "Invalid goal returns 400"
Python Complex state "Action plan retries on failure"

Recommendation: Use both test types, selecting the best tool for each scenario.


Fixture Requirements Analysis

Required Fixtures Summary

Fixture Priority Complexity Notes
ConfFixture P0 Low Configuration management
Database P0 Medium SQLite with schema caching
RPCFixture P0 Low oslo.messaging fake driver
CastAsCallFixture P0 Low Synchronous RPC
NotificationFixture P0 Medium Notification capture with threading
APIFixture P0 High Pecan WSGI with wsgi-intercept
ServiceFixture P0 Medium Start DE/Applier services
NovaFixture P0 Medium Mock Nova API
GnocchiFixture P0 Medium Mock Gnocchi API
PlacementFixture P1 High Can reuse from placement project
CeilometerFixture P2 Low Optional, legacy

Watcher-Specific Considerations

  1. API Framework (Pecan)

    • Watcher uses Pecan instead of custom WSGI
    • Need to use watcher.api.app.setup_app() to get WSGI app
    • Auth middleware may differ from Nova
  2. Services

    • Decision Engine - runs strategies, creates action plans
    • Applier - executes actions
    • Both need ServiceFixture support
  3. Data Model

    • Audit templates
    • Audits (ONESHOT, CONTINUOUS)
    • Goals
    • Strategies
    • Action plans
    • Actions
  4. External Dependencies

    • Nova - primary data source for compute
    • Gnocchi - metrics and aggregation
    • Placement - resource providers (less critical than for Nova)
    • Ceilometer - legacy metrics (optional)

Timeline and Milestones

Recommended Timeline

Phase Duration Commits Description
Phase 0 1 week 3 Extract existing fixtures, create helpers
Phase 1 1 week 1 Test reorganization
Phase 2 2 weeks 3-4 Core fixtures (conf, db, rpc, notifications)
Phase 3 2 weeks 2 API and service fixtures
Phase 4 1 week 1 Base functional test class
Phase 5 1.5 weeks 3 Gabbi test infrastructure
Phase 6 2 weeks 2 Example Python functional tests
Phase 7 1 week 1 Regression test framework
Phase 8 1.5 weeks 1 Contributor documentation (including Gabbi)
Phase 9 1 week 1 CI integration
Total 14 weeks 18-19 commits

Milestones

M0: Preparation (Week 1) - Existing fixtures extracted, helpers created
M1: Foundation (Week 4) - Test reorganization + core fixtures complete
M2: Infrastructure (Week 8) - All fixtures and base class complete
M3: Gabbi Integration (Week 9.5) - Gabbi tests operational
M4: Validation (Week 11.5) - Example tests demonstrate functionality
M5: Production Ready (Week 14) - Docs complete, CI running


Testing Strategy

Validation at Each Phase

Phase 1: Test Reorganization

  • Run full unit test suite: tox -e py3

  • Verify 100% of tests still pass

  • Check import paths are correct

    Phase 2-3: Fixture Development

  • Write unit tests for each fixture

  • Test fixtures independently

  • Verify cleanup happens correctly

Phase 4: Base Test Class

  • Create simple smoke test
  • Verify all fixtures initialize
  • Test service startup

Phase 5: Example Tests

  • Run examples repeatedly (check for flakiness)
  • Measure test execution time
  • Validate notifications captured correctly

Phase 6-7: Documentation

  • Review with contributors
  • Test examples in documentation
  • Verify README instructions

Phase 8: CI Integration

  • Run in Zuul
  • Check job timeout (should be < 30 minutes)
  • Verify reporting works

Success Metrics

  • All existing unit tests pass after reorganization
  • Functional tests complete in < 5 minutes locally
  • Zero test flakiness (run 100 times, 100% pass rate)
  • CI job completes in < 30 minutes
  • Code coverage for functional tests > 60%

Appendix: Key Implementation Details

A. Pecan API Application Setup

# Watcher's API app setup differs from Nova
from watcher.api import app as watcher_app

# Pecan configuration
app_conf = {
    'app': {
        'root': 'watcher.api.controllers.root.RootController',
        'modules': ['watcher.api'],
        'debug': True,
    }
}

# Create WSGI app
app = watcher_app.setup_app(config=app_conf)

B. Service Manager Initialization

# Decision Engine Manager
from watcher.decision_engine import manager as de_manager
decision_engine_mgr = de_manager.DecisionEngineManager()

# Applier Manager
from watcher.applier import manager as applier_manager
applier_mgr = applier_manager.ApplierManager()

C. Database Schema Caching

# Use SQLite iterdump for schema caching
def _cache_schema(connection):
    schema_sql = "".join(line for line in connection.iterdump())
    return schema_sql

# Apply cached schema
def _apply_cached_schema(connection, schema_sql):
    connection.executescript(schema_sql)

D. Threading vs Eventlet

# Use standard threading, not eventlet (being removed from OpenStack)
import threading
import queue

# For services
thread = threading.Thread(target=service.start, daemon=True)
thread.start()

# For synchronization
condition = threading.Condition()
with condition:
    condition.wait(timeout=10)

Next Steps

  1. Review and Approve Plan: Get feedback from Watcher team
  2. Create Story/Tasks: Break down into Launchpad stories
  3. Begin Implementation: Start with Phase 1 (test reorganization)
  4. Iterative Review: Review each commit before proceeding
  5. Documentation Updates: Keep docs in sync with implementation

Questions for Discussion

  1. Should we support Ceilometer fixture or focus only on Gnocchi?
  2. Do we need Placement fixture immediately or can it wait?
  3. Should functional tests run in gate (check/gate) or only periodic?
  4. What's the minimum test coverage threshold for functional tests?
  5. Should we add performance/benchmark tests alongside functional tests?

End of Planning Document

This plan provides a comprehensive roadmap for introducing functional testing to Watcher using proven patterns from Nova, adapted for Watcher's unique architecture and requirements.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment