Skip to content

Instantly share code, notes, and snippets.

@SeanMooney
Created October 7, 2025 10:13
Show Gist options
  • Select an option

  • Save SeanMooney/0bc41721481dd6e5918a4504c956f882 to your computer and use it in GitHub Desktop.

Select an option

Save SeanMooney/0bc41721481dd6e5918a4504c956f882 to your computer and use it in GitHub Desktop.
funtional testing patterns for openstack projects.

OpenStack Nova Functional Test Infrastructure Guide

Version: 1.0
Date: October 2025
Author: AI-assisted analysis of Nova functional test infrastructure

This comprehensive guide documents Nova's functional test infrastructure and provides a blueprint for replicating this pattern in other OpenStack projects. It covers fixture architecture, RPC/messaging, database management, external service mocking, and CI/CD integration.


Table of Contents

  1. Overview
  2. Quick Start for New Contributors
  3. Architecture Principles
  4. Base Test Classes
  5. Database Fixtures (oslo.db)
  6. RPC and Messaging Fixtures (oslo.messaging)
  7. Notification Fixtures
  8. Configuration Fixtures (oslo.config)
  9. External Service Fixtures
  10. API Fixtures
  11. Complete Test Example
  12. Regression Tests
  13. Creating Reusable Test Infrastructure
  14. Tox Environment Configuration
  15. Zuul CI Integration
  16. Porting to Another Project
  17. Glossary of Terms

Overview

Nova's functional test infrastructure is built on a layered fixture architecture that provides:

  • In-memory SQLite databases for fast, isolated database testing
  • Fake RPC transport (oslo.messaging) for synchronous message passing
  • Mock external services (Cinder, Neutron, Glance, Placement)
  • Real API server via WSGI intercept
  • Versioned notification capture and verification
  • Multi-cell database support
  • Service lifecycle management

Key Components

┌─────────────────────────────────────────────────────────────┐
│                    Test Case (oslotest.base)                │
├─────────────────────────────────────────────────────────────┤
│                Configuration (oslo.config)                   │
│              ├─ ConfFixture: defaults for tests             │
│              └─ ConfPatcher: per-test overrides             │
├─────────────────────────────────────────────────────────────┤
│             Database Layer (oslo_db.enginefacade)            │
│     ├─ Database('api'): API DB (cell mappings, etc.)        │
│     ├─ CellDatabases: Per-cell DB with routing              │
│     └─ DB_SCHEMA cache: Fast schema application             │
├─────────────────────────────────────────────────────────────┤
│           RPC/Messaging (oslo.messaging)                     │
│     ├─ RPCFixture: Fake transport (fake://)                 │
│     ├─ CastAsCallFixture: Synchronous RPC casts             │
│     └─ CheatingSerializer: Preserve DB connection context   │
├─────────────────────────────────────────────────────────────┤
│               Notifications (oslo.messaging)                 │
│     ├─ NotificationFixture: Capture notifications           │
│     ├─ FakeNotifier: Legacy notification capture            │
│     └─ FakeVersionedNotifier: Versioned notification queue  │
├─────────────────────────────────────────────────────────────┤
│              External Service Mocking                        │
│     ├─ PlacementFixture: Real WSGI app with DB              │
│     ├─ CinderFixture: Volume operations mock                │
│     ├─ NeutronFixture: Network operations mock              │
│     └─ GlanceFixture: Image service mock                    │
├─────────────────────────────────────────────────────────────┤
│                   API Layer                                  │
│     ├─ OSAPIFixture: WSGI app with wsgi-intercept          │
│     └─ TestOpenStackClient: HTTP client for API calls       │
├─────────────────────────────────────────────────────────────┤
│                Service Management                            │
│     ├─ ServiceFixture: Run nova-compute, nova-scheduler     │
│     ├─ start_service(): Helper to start services            │
│     └─ Service lifecycle: start, stop, restart              │
└─────────────────────────────────────────────────────────────┘

Quick Start for New Contributors

I Just Want to Write a Test - Where Do I Start?

If you're new to functional testing and want to get started quickly:

1. Copy this minimal functional test template:

from myproject import test
from myproject.tests import local_fixtures


class TestMyFeature(test.TestCase):
    """Test my new feature."""
    
    def setUp(self):
        super().setUp()
        # Add fixtures you need
        self.useFixture(local_fixtures.Database())
        self.api = self.useFixture(local_fixtures.APIFixture()).api
    
    def test_my_feature(self):
        """Test that my feature works."""
        # Your test code here
        pass

2. Common questions answered:

  • "How do I mock Neutron?" → See External Service Fixtures
  • "How do I wait for something?" → See Creating Reusable Infrastructure
  • "How do I test RPC?" → See RPC and Messaging Fixtures

3. Next steps after your first test:

  • Read "Regression Tests" if fixing a bug
  • Read "Creating Reusable Infrastructure" if writing multiple tests
  • Read individual component sections as needed

4. Recommended reading order:

  • Overview (5 minutes)
  • Architecture Principles (10 minutes)
  • Complete Test Example (15 minutes)
  • Skip to "Porting to Another Project" for your project

Architecture Principles

1. Fixture Composition

Nova uses fixtures from the fixtures library extensively. Fixtures provide:

  • Setup/Teardown: Automatic cleanup via addCleanup()
  • Composition: useFixture() allows nesting
  • Isolation: Each test gets fresh state

2. In-Memory Databases

# SQLite in-memory with schema caching for speed
CONF.set_default('connection', "sqlite://", group='database')
CONF.set_default('connection', "sqlite://", group='api_database')

3. Fake RPC Transport

# oslo.messaging fake driver for synchronous testing
transport_url = 'fake:/'

4. Service Stubbing

External services are mocked at the API boundary, not with HTTP mocks:

# Stub the module-level API, not HTTP calls
self.test.stub_out('nova.volume.cinder.API.get', self.fake_get)

5. Cell Awareness

Nova's multi-cell architecture is reflected in tests:

  • API database: Cell mappings, host mappings
  • Cell databases: Instance data per cell
  • Context targeting: context.target_cell() switches databases

Base Test Classes

File: nova/test.py

The base test infrastructure starts with oslotest.base.BaseTestCase and layers Nova-specific functionality.

TestCase (Base for all Nova tests)

class TestCase(base.BaseTestCase):
    """Test case base class for all unit tests.
    
    Due to the slowness of DB access, please consider deriving from
    `NoDBTestCase` first.
    """
    USES_DB = True
    USES_DB_SELF = False
    REQUIRES_LOCKING = False
    STUB_RPC = True
    NUMBER_OF_CELLS = 1
    STUB_COMPUTE_ID = True
    
    def setUp(self):
        super(TestCase, self).setUp()
        
        # Fixture for isolated greenpool per test
        self.useFixture(nova_fixtures.IsolatedGreenPoolFixture(self.id()))
        
        # Standard logging with DEBUG support (OS_DEBUG=1)
        self.stdlog = self.useFixture(nova_fixtures.StandardLogging())
        
        # Locking for tests that need it (deprecated pattern)
        if self.REQUIRES_LOCKING:
            lock_path = self.useFixture(fixtures.TempDir()).path
            self.fixture = self.useFixture(
                config_fixture.Config(lockutils.CONF))
            self.fixture.config(lock_path=lock_path,
                                group='oslo_concurrency')
        
        # Configuration defaults
        self.useFixture(nova_fixtures.ConfFixture(CONF))
        
        # RPC setup
        if self.STUB_RPC:
            self.useFixture(nova_fixtures.RPCFixture('nova.test'))
            CONF.set_default('driver', ['test'],
                             group='oslo_messaging_notifications')
        
        # Object indirection API (for RPC serialization)
        objects_base.NovaObject.indirection_api = None
        
        # Database setup
        if self.USES_DB:
            self.useFixture(nova_fixtures.Database(database='api'))
            self._setup_cells()
            self.useFixture(nova_fixtures.DefaultFlavorsFixture())
        elif not self.USES_DB_SELF:
            self.useFixture(nova_fixtures.SingleCellSimple())
            self.useFixture(nova_fixtures.DatabasePoisonFixture())
        
        # Policy fixture
        self.policy = self.useFixture(nova_fixtures.PolicyFixture())
    
    def _setup_cells(self):
        """Setup a normal cellsv2 environment.
        
        This sets up the CellDatabase fixture with two cells, one cell0
        and one normal cell. CellMappings are created for both so that
        cells-aware code can find those two databases.
        """
        celldbs = nova_fixtures.CellDatabases()
        
        ctxt = context.get_context()
        fake_transport = 'fake://nowhere/'
        
        # cell0: special cell for instances that fail scheduling
        c0 = objects.CellMapping(
            context=ctxt,
            uuid=objects.CellMapping.CELL0_UUID,
            name='cell0',
            transport_url=fake_transport,
            database_connection=objects.CellMapping.CELL0_UUID)
        c0.create()
        self.cell_mappings[c0.name] = c0
        celldbs.add_cell_database(objects.CellMapping.CELL0_UUID)
        
        # cell1, cell2, ...: normal cells for instances
        for x in range(self.NUMBER_OF_CELLS):
            name = 'cell%i' % (x + 1)
            uuid = getattr(uuids, name)
            cell = objects.CellMapping(
                context=ctxt,
                uuid=uuid,
                name=name,
                transport_url=fake_transport,
                database_connection=uuid)
            cell.create()
            self.cell_mappings[name] = cell
            # cell1 is the default cell
            celldbs.add_cell_database(uuid, default=(x == 0))
        
        self.useFixture(celldbs)
    
    def start_service(self, name, host=None, cell_name=None, **kwargs):
        """Start a Nova service (compute, conductor, scheduler, etc.)
        
        :param name: Service name (compute, conductor, scheduler)
        :param host: Hostname for the service
        :param cell_name: Cell to run the service in
        :returns: The service object
        """
        cell = None
        if host is not None:
            self.useFixture(nova_fixtures.ConfPatcher(host=host))
        
        if name == 'compute' and self.USES_DB:
            ctxt = context.get_context()
            cell_name = cell_name or 'cell1'
            cell = self.cell_mappings[cell_name]
            if (host or name) not in self.host_mappings:
                hm = objects.HostMapping(context=ctxt,
                                         host=host or name,
                                         cell_mapping=cell)
                hm.create()
                self.host_mappings[hm.host] = hm
        
        svc = self.useFixture(
            nova_fixtures.ServiceFixture(name, host, cell=cell, **kwargs))
        return svc.service

NoDBTestCase (Fast unit tests)

class NoDBTestCase(TestCase):
    """Test case base class for tests that don't need the database.
    
    This makes tests run significantly faster. If possible, all new tests
    should derive from this class.
    """
    USES_DB = False

Database Fixtures

File: nova/tests/fixtures/nova.py

Nova uses oslo.db's enginefacade for database abstraction with in-memory SQLite.

Database Fixture (Single database)

class Database(fixtures.Fixture):
    """Create a database fixture.
    
    :param database: The type of database, 'main', or 'api'
    :param connection: The connection string to use (default: sqlite://)
    """
    
    def __init__(self, database='main', version=None, connection=None):
        super().__init__()
        assert database in {'main', 'api'}
        self.database = database
        self.version = version
        self.connection = connection
    
    def setUp(self):
        super().setUp()
        
        if self.database == 'main':
            if self.connection is not None:
                ctxt_mgr = main_db_api.create_context_manager(
                    connection=self.connection)
                self.get_engine = ctxt_mgr.writer.get_engine
            else:
                # Inject a new factory for each test
                new_engine = enginefacade.transaction_context()
                self.useFixture(
                    db_fixtures.ReplaceEngineFacadeFixture(
                        main_db_api.context_manager, new_engine))
                main_db_api.configure(CONF)
                self.get_engine = main_db_api.get_engine
        
        elif self.database == 'api':
            new_engine = enginefacade.transaction_context()
            self.useFixture(
                db_fixtures.ReplaceEngineFacadeFixture(
                    api_db_api.context_manager, new_engine))
            api_db_api.configure(CONF)
            self.get_engine = api_db_api.get_engine
        
        self._apply_schema()
        self.addCleanup(self.cleanup)
    
    def _apply_schema(self):
        """Apply database schema (cached for speed)"""
        global DB_SCHEMA
        if not DB_SCHEMA[(self.database, self.version)]:
            # Apply and cache schema
            engine = self.get_engine()
            conn = engine.connect()
            migration.db_sync(database=self.database, version=self.version)
            # Cache the schema as SQL
            DB_SCHEMA[(self.database, self.version)] = "".join(
                line for line in conn.connection.iterdump())
        else:
            # Apply the cached schema (much faster!)
            engine = self.get_engine()
            conn = engine.connect()
            conn.connection.executescript(
                DB_SCHEMA[(self.database, self.version)])
    
    def cleanup(self):
        engine = self.get_engine()
        engine.dispose()

Key Points:

  • Schema caching: The first test runs migrations, subsequent tests use cached SQL
  • In-memory SQLite: Fast, isolated, no cleanup needed
  • enginefacade: oslo.db abstraction for transaction management

CellDatabases Fixture (Multi-cell)

class CellDatabases(fixtures.Fixture):
    """Create per-cell databases for testing.
    
    Usage::
        fix = CellDatabases()
        fix.add_cell_database('connection1')
        fix.add_cell_database('connection2', default=True)
        self.useFixture(fix)
    """
    
    def __init__(self):
        self._ctxt_mgrs = {}
        self._last_ctxt_mgr = None
        self._default_ctxt_mgr = None
        self._cell_lock = ReaderWriterLock()
    
    def add_cell_database(self, connection_str, default=False):
        """Add a cell database to the fixture.
        
        :param connection_str: Identifier for the database connection
        :param default: Whether this is the default cell
        """
        # Create a new context manager for the cell
        ctxt_mgr = main_db_api.create_context_manager()
        self._ctxt_mgrs[connection_str] = ctxt_mgr
        
        # The first DB access is local, so initialize with this
        self._last_ctxt_mgr = ctxt_mgr
        
        if self._default_ctxt_mgr is None or default:
            self._default_ctxt_mgr = ctxt_mgr
        
        # Apply schema
        def get_context_manager(context):
            return ctxt_mgr
        
        with fixtures.MonkeyPatch(
            'nova.db.main.api.get_context_manager',
            get_context_manager,
        ):
            engine = ctxt_mgr.writer.get_engine()
            engine.dispose()
            self._cache_schema(connection_str)
            conn = engine.connect()
            conn.connection.executescript(DB_SCHEMA[('main', None)])
    
    def _wrap_target_cell(self, context, cell_mapping):
        """Context manager for cell targeting.
        
        This switches the global database state to point to the specified
        cell, allowing compute node code to work without cell awareness.
        """
        if cell_mapping:
            desired = self._ctxt_mgrs[cell_mapping.database_connection]
        else:
            desired = self._default_ctxt_mgr
        
        # Fast path: already in the right cell
        with self._cell_lock.read_lock():
            if self._last_ctxt_mgr == desired:
                with self._real_target_cell(context, cell_mapping) as c:
                    yield c
                    return
        
        # Switch cells with write lock
        with self._cell_lock.write_lock():
            if cell_mapping is not None:
                self._last_ctxt_mgr = desired
        
        # Yield with read lock (allows other threads to work)
        with self._cell_lock.read_lock():
            try:
                with self._real_target_cell(context, cell_mapping) as ccontext:
                    yield ccontext
            except Exception as exc:
                raised_exc = exc
        
        # Restore default
        with self._cell_lock.write_lock():
            self._last_ctxt_mgr = self._default_ctxt_mgr
        
        if raised_exc:
            raise raised_exc
    
    def setUp(self):
        super(CellDatabases, self).setUp()
        self.addCleanup(self.cleanup)
        self._real_target_cell = context.target_cell
        
        # Monkey-patch database and RPC functions
        self.useFixture(fixtures.MonkeyPatch(
            'nova.db.main.api.get_context_manager',
            self._wrap_get_context_manager))
        self.useFixture(fixtures.MonkeyPatch(
            'nova.context.target_cell',
            self._wrap_target_cell))
        self.useFixture(fixtures.MonkeyPatch(
            'nova.rpc.get_server',
            self._wrap_get_server))
        self.useFixture(fixtures.MonkeyPatch(
            'nova.rpc.get_client',
            self._wrap_get_client))

Key Points:

  • Per-cell isolation: Each cell has its own in-memory database
  • Context targeting: context.target_cell() switches which DB to use
  • Thread-safe: ReaderWriterLock for concurrent cell access
  • RPC multiplexing: Each cell can have its own RPC bus

Example: Database Setup

class MyFunctionalTest(test.TestCase):
    """Example showing database fixture usage"""
    
    def setUp(self):
        super().setUp()
        # Database is already set up by TestCase base class
        # self.cell_mappings contains cell0 and cell1
    
    def test_instance_in_cell(self):
        """Create an instance and verify it's in the database"""
        ctxt = context.get_admin_context()
        
        # Create an instance in cell1
        with context.target_cell(ctxt, self.cell_mappings['cell1']) as cctxt:
            instance = objects.Instance(
                context=cctxt,
                uuid=uuidutils.generate_uuid(),
                project_id='fake-project')
            instance.create()
        
        # Verify it's not in cell0
        with context.target_cell(ctxt, self.cell_mappings['cell0']) as cctxt:
            self.assertRaises(
                exception.InstanceNotFound,
                objects.Instance.get_by_uuid,
                cctxt, instance.uuid)

RPC and Messaging Fixtures

File: nova/tests/fixtures/nova.py

Nova uses oslo.messaging with a fake driver for synchronous RPC testing.

RPCFixture

class RPCFixture(fixtures.Fixture):
    """Set up RPC with the fake:// transport for testing."""
    
    def __init__(self, *exmods):
        super(RPCFixture, self).__init__()
        self.exmods = []
        self.exmods.extend(exmods)
        self._buses = {}
    
    def _fake_create_transport(self, url):
        """Create or return cached fake transport.
        
        NOTE: Currently collapses all connections to a single bus.
        This is how our tests expect things to work.
        """
        url = None  # Collapse all to single bus
        
        if url not in self._buses:
            exmods = rpc.get_allowed_exmods()
            self._buses[url] = messaging.get_rpc_transport(
                CONF,
                url=url,
                allowed_remote_exmods=exmods)
        return self._buses[url]
    
    def setUp(self):
        super(RPCFixture, self).setUp()
        self.addCleanup(rpc.cleanup)
        
        # Register exception modules
        rpc.add_extra_exmods(*self.exmods)
        self.addCleanup(rpc.clear_extra_exmods)
        
        # Configure fake transport
        self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
        self.messaging_conf.transport_url = 'fake:/'
        self.useFixture(self.messaging_conf)
        
        # Patch transport creation
        self.useFixture(fixtures.MonkeyPatch(
            'nova.rpc.create_transport', self._fake_create_transport))
        
        # Initialize RPC
        with mock.patch('nova.rpc.get_transport_url') as mock_gtu:
            mock_gtu.return_value = None
            rpc.init(CONF)
        
        # Cleanup in-flight messages between tests
        def cleanup_in_flight_rpc_messages():
            messaging._drivers.impl_fake.FakeExchangeManager._exchanges = {}
        
        self.addCleanup(cleanup_in_flight_rpc_messages)

Configuration:

# In nova/tests/fixtures/conf.py (ConfFixture)
# The fake:// transport is set by RPCFixture
# No additional configuration needed

CastAsCallFixture

class CastAsCallFixture(fixtures.Fixture):
    """Make RPC casts behave as calls for synchronous testing.
    
    Normally, RPC casts are fire-and-forget. This fixture makes them
    synchronous by converting them to calls, making tests deterministic.
    """
    
    def __init__(self, testcase):
        super().__init__()
        self.testcase = testcase
    
    @staticmethod
    def _stub_out(testcase, obj=None):
        if obj:
            orig_prepare = obj.prepare
        else:
            orig_prepare = messaging.RPCClient.prepare
        
        def prepare(self, *args, **kwargs):
            # Casts with fanout=True would throw errors if monkeypatched
            # to call method, so we override fanout to False
            if 'fanout' in kwargs:
                kwargs['fanout'] = False
            cctxt = orig_prepare(self, *args, **kwargs)
            CastAsCallFixture._stub_out(testcase, cctxt)  # Recurse!
            return cctxt
        
        if obj:
            cls = getattr(sys.modules[obj.__class__.__module__],
                          obj.__class__.__name__)
            testcase.stub_out('%s.%s.prepare' % (obj.__class__.__module__,
                                                 obj.__class__.__name__),
                              prepare)
            testcase.stub_out('%s.%s.cast' % (obj.__class__.__module__,
                                              obj.__class__.__name__),
                              cls.call)
        else:
            testcase.stub_out('oslo_messaging.RPCClient.prepare', prepare)
            testcase.stub_out('oslo_messaging.RPCClient.cast',
                              messaging.RPCClient.call)
    
    def setUp(self):
        super().setUp()
        self._stub_out(self.testcase)

Usage:

class MyTest(test.TestCase):
    CAST_AS_CALL = True  # Default in _IntegratedTestBase
    
    # Or manually:
    def setUp(self):
        super().setUp()
        self.useFixture(nova_fixtures.CastAsCallFixture(self))

CheatingSerializer (for Cell Databases)

class CheatingSerializer(rpc.RequestContextSerializer):
    """A messaging.RequestContextSerializer that helps with cells.
    
    Our normal serializer does not pass db_connection and mq_connection,
    for good reason. However, during tests, since we're all in the same
    process, we want cell-targeted RPC calls to preserve these values.
    """
    
    def serialize_context(self, context):
        """Serialize context with the db_connection inside."""
        values = super(CheatingSerializer, self).serialize_context(context)
        values['db_connection'] = context.db_connection
        values['mq_connection'] = context.mq_connection
        return values
    
    def deserialize_context(self, values):
        """Deserialize context and honor db_connection if present."""
        ctxt = super(CheatingSerializer, self).deserialize_context(values)
        ctxt.db_connection = values.pop('db_connection', None)
        ctxt.mq_connection = values.pop('mq_connection', None)
        return ctxt

This is used automatically by CellDatabases fixture.


Notification Fixtures

File: nova/tests/fixtures/notifications.py

Nova uses versioned notifications (oslo.messaging). The fixture captures notifications for verification.

NotificationFixture

class NotificationFixture(fixtures.Fixture):
    """Fixture to capture oslo.messaging notifications."""
    
    def __init__(self, test):
        self.test = test
    
    def setUp(self):
        super().setUp()
        self.addCleanup(self.reset)
        
        # Create fake notifiers
        self.fake_notifier = FakeNotifier(
            rpc.LEGACY_NOTIFIER.transport,
            rpc.LEGACY_NOTIFIER.publisher_id,
            serializer=getattr(rpc.LEGACY_NOTIFIER, '_serializer', None))
        
        self.fake_versioned_notifier = FakeVersionedNotifier(
            rpc.NOTIFIER.transport,
            rpc.NOTIFIER.publisher_id,
            serializer=getattr(rpc.NOTIFIER, '_serializer', None),
            test_case_id=self.test.id())
        
        # Stub out the global notifiers
        if rpc.LEGACY_NOTIFIER and rpc.NOTIFIER:
            self.test.stub_out('nova.rpc.LEGACY_NOTIFIER', self.fake_notifier)
            self.test.stub_out('nova.rpc.NOTIFIER', 
                               self.fake_versioned_notifier)
    
    def reset(self):
        self.fake_notifier.reset()
        self.fake_versioned_notifier.reset()
    
    def wait_for_versioned_notifications(
        self, event_type, n_events=1, timeout=10.0,
    ):
        """Wait for n_events of event_type to be emitted.
        
        :param event_type: Notification event type (e.g., 'instance.create.end')
        :param n_events: Number of events to wait for
        :param timeout: Timeout in seconds
        :returns: List of notification dicts
        """
        return self.fake_versioned_notifier.wait_for_versioned_notifications(
            event_type, n_events, timeout)
    
    @property
    def versioned_notifications(self):
        """List of all versioned notifications emitted."""
        return self.fake_versioned_notifier.versioned_notifications
    
    @property
    def notifications(self):
        """List of all legacy notifications emitted."""
        return self.fake_notifier.notifications

FakeVersionedNotifier

class FakeVersionedNotifier(FakeNotifier):
    """Captures versioned notifications with subscription support."""
    
    def __init__(
        self, transport, publisher_id, serializer=None, parent=None,
        test_case_id=None
    ):
        super().__init__(
            transport, publisher_id, serializer, test_case_id=test_case_id)
        if parent:
            self.versioned_notifications = parent.versioned_notifications
            self.subscriptions = parent.subscriptions
        else:
            self.versioned_notifications = []
            self.subscriptions = collections.defaultdict(_Sub)
    
    def _notify(self, priority, ctxt, event_type, payload):
        """Capture notification and notify subscribers."""
        sender_test_case_id = self._get_sender_test_case_id()
        
        # Prevent late notifications from finished tests
        if sender_test_case_id != self.test_case_id:
            raise RuntimeError(
                'FakeVersionedNotifier received %s notification from '
                'test case %s which differs from current test %s' %
                (event_type, sender_test_case_id, self.test_case_id))
        
        payload = self._serializer.serialize_entity(ctxt, payload)
        notification = {
            'publisher_id': self.publisher_id,
            'priority': priority,
            'event_type': event_type,
            'payload': payload,
        }
        self.versioned_notifications.append(notification)
        self.subscriptions[event_type].received(notification)
    
    def wait_for_versioned_notifications(
        self, event_type, n_events=1, timeout=10.0,
    ):
        """Wait for notifications with timeout."""
        return self.subscriptions[event_type].wait_n(
            n_events, event_type, timeout)

_Sub (Subscription helper)

class _Sub(object):
    """Allow a subscriber to efficiently wait for an event."""
    
    def __init__(self):
        self._cond = threading.Condition()
        self._notifications = []
    
    def received(self, notification):
        with self._cond:
            self._notifications.append(notification)
            self._cond.notify_all()
    
    def wait_n(self, n, event, timeout):
        """Wait until at least n notifications have been received."""
        with timeutils.StopWatch(timeout) as timer:
            with self._cond:
                while len(self._notifications) < n:
                    if timer.expired():
                        raise AssertionError(
                            "Notification %s hasn't been received" % event)
                    self._cond.wait(timer.leftover())
                
                return list(self._notifications)

Example: Notification Testing

def test_instance_create_notification(self):
    """Verify instance.create.end notification is emitted."""
    # Create instance via API
    server_req = self._build_server()
    server = self.api.post_server({'server': server_req})
    
    # Wait for notification
    notifications = self.notifier.wait_for_versioned_notifications(
        'instance.create.end', n_events=1, timeout=10.0)
    
    # Verify notification content
    self.assertEqual(1, len(notifications))
    self.assertEqual(server['id'], 
                     notifications[0]['payload']['nova_object.data']['uuid'])

Configuration Fixtures

File: nova/tests/fixtures/conf.py

Configuration uses oslo.config with test-specific defaults.

ConfFixture

class ConfFixture(config_fixture.Config):
    """Fixture to manage global conf settings."""
    
    def setUp(self):
        super(ConfFixture, self).setUp()
        
        # default group
        self.conf.set_default('compute_driver', 'fake.SmallFakeDriver')
        self.conf.set_default('host', 'fake-mini')
        self.conf.set_default('periodic_enable', False)
        
        # api_database group
        self.conf.set_default('connection', "sqlite://", group='api_database')
        self.conf.set_default('sqlite_synchronous', False,
                              group='api_database')
        
        # database group
        self.conf.set_default('connection', "sqlite://", group='database')
        self.conf.set_default('sqlite_synchronous', False, group='database')
        
        # key_manager group
        self.conf.set_default('backend',
                              'nova.keymgr.conf_key_mgr.ConfKeyManager',
                              group='key_manager')
        
        # wsgi group
        self.conf.set_default('api_paste_config',
                              paths.state_path_def('etc/nova/api-paste.ini'),
                              group='wsgi')
        
        # api group
        self.conf.set_default('response_validation', 'error', group='api')
        
        # notifications
        self.conf.set_default(
            'notification_format', "both", group="notifications")
        
        # oslo.limit
        self.conf.set_default('endpoint_id', 'ENDPOINT_ID', group='oslo_limit')
        
        config.parse_args([], default_config_files=[], configure_db=False,
                          init_rpc=False)

ConfPatcher (per-test overrides)

class ConfPatcher(fixtures.Fixture):
    """Fixture to patch and restore global CONF.
    
    Usage::
        self.useFixture(nova_fixtures.ConfPatcher(host='compute1'))
        self.useFixture(nova_fixtures.ConfPatcher(
            enabled_filters=['FilterA', 'FilterB'],
            group='filter_scheduler'))
    """
    
    def __init__(self, **kwargs):
        super(ConfPatcher, self).__init__()
        self.group = kwargs.pop('group', None)
        self.args = kwargs
    
    def setUp(self):
        super(ConfPatcher, self).setUp()
        for k, v in self.args.items():
            self.addCleanup(CONF.clear_override, k, self.group)
            CONF.set_override(k, v, self.group)

Example: Configuration

def test_with_custom_config(self):
    """Test with custom scheduler configuration."""
    # Override configuration for this test
    self.flags(enabled_filters=['ComputeFilter', 'ImagePropertiesFilter'],
               group='filter_scheduler')
    self.flags(disk_allocation_ratio=2.0)
    
    # Or use ConfPatcher
    self.useFixture(nova_fixtures.ConfPatcher(
        weight_classes=['nova.scheduler.weights.ram.RAMWeigher'],
        group='filter_scheduler'))
    
    # Configuration is automatically cleaned up after test

External Service Fixtures

PlacementFixture

File: nova/tests/functional/fixtures.py

Placement is special: it runs a real WSGI app with a real database (imported from placement repo).

class PlacementFixture(placement_fixtures.PlacementFixture):
    """A fixture to run Placement operations.
    
    Runs a local WSGI server with the Placement application using
    NoAuth middleware.
    """
    
    def setUp(self):
        super(PlacementFixture, self).setUp()
        
        # Fix socket options for wsgi-intercept
        self.useFixture(fixtures.MonkeyPatch(
            'keystoneauth1.session.TCPKeepAliveAdapter.init_poolmanager',
            adapters.HTTPAdapter.init_poolmanager))
        
        self._client = ka.Adapter(ks.Session(auth=None), raise_exc=False)
        
        # Monkey-patch Nova's scheduler report client
        self.useFixture(fixtures.MonkeyPatch(
            'nova.scheduler.client.report.SchedulerReportClient.get',
            self._fake_get))
        self.useFixture(fixtures.MonkeyPatch(
            'nova.scheduler.client.report.SchedulerReportClient.post',
            self._fake_post))
        self.useFixture(fixtures.MonkeyPatch(
            'nova.scheduler.client.report.SchedulerReportClient.put',
            self._fake_put))
        self.useFixture(fixtures.MonkeyPatch(
            'nova.scheduler.client.report.SchedulerReportClient.delete',
            self._fake_delete))
        
        self.api = PlacementApiClient(self)
    
    def _fake_get(self, client, url, version=None, global_request_id=None):
        headers = {'x-auth-token': self.token}
        self._update_headers_with_version(headers, version)
        return self._client.get(
            url,
            endpoint_override=self.endpoint,
            headers=headers)

Key Points:

  • Uses placement.tests.functional.fixtures.PlacementFixture as base
  • Real Placement WSGI app with SQLite database
  • wsgi-intercept for HTTP calls
  • NoAuth middleware (no Keystone)

CinderFixture

File: nova/tests/fixtures/cinder.py

Cinder is mocked at the API layer with stateful volume tracking.

class CinderFixture(fixtures.Fixture):
    """A fixture to mock volume operations (Cinder v3 API)."""
    
    # Volume IDs for common test scenarios
    SWAP_OLD_VOL = 'a07f71dc-8151-4e7d-a0cc-cd24a3f11113'
    SWAP_NEW_VOL = '227cc671-f30b-4488-96fd-7d0bf13648d8'
    MULTIATTACH_VOL = '4757d51f-54eb-4442-8684-3399a6431f67'
    IMAGE_BACKED_VOL = '6ca404f3-d844-4169-bb96-bc792f37de98'
    
    def __init__(self, test, az='nova'):
        super().__init__()
        self.test = test
        self.az = az
        # State tracking
        self.volumes = collections.defaultdict(dict)
        self.volume_to_attachment = collections.defaultdict(dict)
    
    def setUp(self):
        super().setUp()
        self._create_fakes()
    
    def _create_fakes(self):
        """Stub out all nova.volume.cinder.API methods."""
        self.useFixture(fixtures.MockPatch(
            'nova.volume.cinder.API.attachment_create',
            side_effect=self.fake_attachment_create, autospec=False))
        self.useFixture(fixtures.MockPatch(
            'nova.volume.cinder.API.attachment_update',
            side_effect=self.fake_attachment_update, autospec=False))
        self.useFixture(fixtures.MockPatch(
            'nova.volume.cinder.API.attachment_delete',
            side_effect=self.fake_attachment_delete, autospec=False))
        self.useFixture(fixtures.MockPatch(
            'nova.volume.cinder.API.get',
            side_effect=self.fake_get, autospec=False))
        # ... more methods
    
    def fake_attachment_create(
        self, context, volume_id, instance_uuid, connector=None,
        mountpoint=None
    ):
        """Mock attachment_create."""
        attachment_id = uuidutils.generate_uuid()
        attachment = {
            'id': attachment_id,
            'connection_info': {
                'driver_volume_type': 'fake_type',
                'data': {'foo': 'bar'}
            }
        }
        
        # Track attachment
        self.volume_to_attachment[volume_id][attachment_id] = {
            'id': attachment_id,
            'instance_uuid': instance_uuid,
            'connector': connector,
            'status': 'reserved',
        }
        
        return attachment
    
    def fake_get(self, context, volume_id, microversion=None):
        """Mock get volume."""
        return {
            'id': volume_id,
            'status': 'available',
            'size': 1,
            'attach_time': '',
            'availability_zone': self.az,
            'attachments': {},
            'multiattach': volume_id == self.MULTIATTACH_VOL,
        }

Key Points:

  • Stateful: tracks volumes, attachments, snapshots
  • Mocked at nova.volume.cinder.API (not HTTP)
  • Supports multi-attach, volume swap, etc.

NeutronFixture

File: nova/tests/fixtures/neutron.py

Neutron is mocked with port, network, and subnet state.

class NeutronFixture(fixtures.Fixture):
    """A fixture to boot instances with neutron ports."""
    
    tenant_id = nova_fixtures.PROJECT_ID
    
    # Default network
    network_1 = {
        'id': '3cb9bc59-5699-4588-a4b1-b87f96708bc6',
        'name': 'private',
        'subnets': [],
        'tenant_id': tenant_id,
        'provider:network_type': 'vxlan',
    }
    
    def __init__(self, test):
        super().__init__()
        self.test = test
        self._ports = {}
        self._networks = {self.network_1['id']: self.network_1}
    
    def setUp(self):
        super().setUp()
        
        # Disable vif_plugging_timeout for tests
        self.test.flags(vif_plugging_timeout=0)
        
        # Stub out Nova's Neutron API
        self.test.stub_out(
            'nova.network.neutron.get_client', self._get_client)
    
    def _get_client(self, context, admin=False):
        """Return a fake Neutron client."""
        admin = admin or context.is_admin and not context.auth_token
        return _FakeNeutronClient(self, admin)
    
    def create_port(self, body):
        """Mock neutronclient.v2_0.client.Client.create_port."""
        port_req = body.get('port')
        port_id = port_req.get('id') or uuidutils.generate_uuid()
        
        port = {
            'id': port_id,
            'network_id': port_req['network_id'],
            'tenant_id': port_req.get('tenant_id', self.tenant_id),
            'mac_address': port_req.get(
                'mac_address', 'fa:16:3e:xx:xx:xx'),
            'fixed_ips': port_req.get('fixed_ips', []),
            'status': 'ACTIVE',
            'binding:vif_type': 'ovs',
        }
        
        self._ports[port_id] = port
        return {'port': copy.deepcopy(port)}
    
    def show_port(self, port_id, **_params):
        """Mock neutronclient.v2_0.client.Client.show_port."""
        if port_id not in self._ports:
            raise neutron_client_exc.PortNotFoundClient()
        return {'port': copy.deepcopy(self._ports[port_id])}

Key Points:

  • Stateful: tracks ports, networks, subnets
  • Supports port binding, security groups, QoS
  • Mocked at nova.network.neutron.get_client

GlanceFixture

File: nova/tests/fixtures/glance.py

Glance is mocked with in-memory image storage.

class GlanceFixture(fixtures.Fixture):
    """A fixture for simulating Glance."""
    
    # Default test images
    image1 = {
        'id': '155d900f-4e14-4e4c-a73d-069cbf4541e6',
        'name': 'fakeimage123456',
        'created_at': '2011-01-01T01:02:03Z',
        'updated_at': '2011-01-01T01:02:03Z',
        'status': 'active',
        'properties': {
            'kernel_id': 'nokernel',
            'ramdisk_id': 'nokernel',
        },
        'min_ram': 0,
        'min_disk': 0,
        'size': 25165824,
    }
    
    def __init__(self, test):
        super().__init__()
        self.test = test
        self.images = {}
    
    def setUp(self):
        super().setUp()
        
        # Configure Glance endpoint
        self.test.useFixture(nova_fixtures.ConfPatcher(
            group='glance', api_servers=['http://localhost:9292']))
        
        # Stub out Glance API
        self.test.stub_out(
            'nova.image.glance.API.get_remote_image_service',
            lambda context, image_href: (self, image_href))
        self.test.stub_out(
            'nova.image.glance.get_default_image_service',
            lambda: self)
        
        # Pre-create default images
        self.create(None, self.image1)
        # ... more images
        
        self._imagedata = {}
    
    def create(self, context, metadata, data=None):
        """Create an image."""
        image_id = metadata.get('id', uuidutils.generate_uuid())
        metadata['id'] = image_id
        self.images[image_id] = copy.deepcopy(metadata)
        return self.images[image_id]
    
    def show(self, context, image_id, include_locations=False,
             show_deleted=True):
        """Get image metadata."""
        if image_id not in self.images:
            raise exception.ImageNotFound(image_id=image_id)
        return copy.deepcopy(self.images[image_id])

Key Points:

  • In-memory image storage
  • Supports image metadata, downloads
  • Mocked at nova.image.glance.API

API Fixtures

File: nova/tests/fixtures/nova.py

The API fixture runs a real Nova WSGI application using wsgi-intercept.

OSAPIFixture

class OSAPIFixture(fixtures.Fixture):
    """Create an OS API server as a fixture.
    
    This spawns an OS API server in a new greenthread. The fixture has
    a .api parameter which is a simple REST client.
    
    This fixture has the following clients:
        self.api - Project user with "member" role
        self.admin_api - Project user with "admin" role
        self.reader_api - Project user with "reader" role
    """
    
    def __init__(
        self, api_version='v2', project_id=PROJECT_ID,
        use_project_id_in_urls=False, stub_keystone=True,
    ):
        super(OSAPIFixture, self).__init__()
        self.api_version = api_version
        self.project_id = project_id
        self.use_project_id_in_urls = use_project_id_in_urls
        self.stub_keystone = stub_keystone
    
    def setUp(self):
        super(OSAPIFixture, self).setUp()
        
        # Unique hostname for wsgi-intercept
        hostname = uuidsentinel.osapi_host
        port = 80
        service_name = 'osapi_compute'
        endpoint = 'http://%s:%s/' % (hostname, port)
        
        self.useFixture(ConfPatcher(debug=True))
        
        if self.stub_keystone:
            self._stub_keystone()
        
        # Fix socket options for wsgi-intercept
        self.useFixture(fixtures.MonkeyPatch(
                'keystoneauth1.session.TCPKeepAliveAdapter.init_poolmanager',
                adapters.HTTPAdapter.init_poolmanager))
        
        # Load WSGI app
        loader = wsgi.Loader().load_app(service_name)
        app = lambda: loader
        
        # Register service
        wsgi_app._setup_service(CONF.host, service_name)
        
        # Install wsgi-intercept
        intercept = interceptor.RequestsInterceptor(app, url=endpoint)
        intercept.install_intercept()
        self.addCleanup(intercept.uninstall_intercept)
        
        # Create API clients
        base_url = 'http://%(host)s:%(port)s/%(api_version)s' % ({
            'host': hostname, 'port': port, 'api_version': self.api_version})
        if self.use_project_id_in_urls:
            base_url += '/' + self.project_id
        
        self.api = client.TestOpenStackClient(
            'fake', base_url, project_id=self.project_id,
            roles=['reader', 'member'])
        self.admin_api = client.TestOpenStackClient(
            'admin', base_url, project_id=self.project_id,
            roles=['reader', 'member', 'admin'])
        self.reader_api = client.TestOpenStackClient(
            'reader', base_url, project_id=self.project_id,
            roles=['reader'])
    
    def _stub_keystone(self):
        """Stub out authentication middleware."""
        self.useFixture(fixtures.MockPatch(
            'keystonemiddleware.auth_token.filter_factory',
            return_value=lambda _app: _app))
        
        # Stub out context middleware
        def fake_ctx(env, **kwargs):
            user_id = env['HTTP_X_AUTH_USER']
            project_id = env['HTTP_X_AUTH_PROJECT_ID']
            is_admin = user_id == 'admin'
            roles = env['HTTP_X_ROLES'].split(',')
            return context.RequestContext(
                user_id, project_id, is_admin=is_admin, roles=roles, **kwargs)
        
        self.useFixture(fixtures.MonkeyPatch(
            'nova.api.auth.NovaKeystoneContext._create_context', fake_ctx))

Key Points:

  • Real Nova WSGI app (not mocked!)
  • wsgi-intercept: HTTP requests stay in-process
  • Multiple clients with different roles
  • NoAuth: Keystone middleware is stubbed

TestOpenStackClient

File: nova/tests/functional/api/client.py

class TestOpenStackClient(object):
    """Simple client for making Nova API requests."""
    
    def __init__(self, user_id, base_url, project_id, roles=None):
        self.user_id = user_id
        self.project_id = project_id
        self.roles = ','.join(roles or ['member'])
        self.base_url = base_url
        self.microversion = None
    
    def api_request(self, url, method='GET', body=None, headers=None):
        """Make an API request."""
        headers = headers or {}
        headers.update({
            'x-auth-user': self.user_id,
            'x-auth-project-id': self.project_id,
            'x-roles': self.roles,
        })
        
        if self.microversion:
            headers['X-OpenStack-Nova-API-Version'] = self.microversion
        
        full_url = self.base_url + url
        
        if method == 'GET':
            response = requests.get(full_url, headers=headers)
        elif method == 'POST':
            response = requests.post(full_url, json=body, headers=headers)
        elif method == 'PUT':
            response = requests.put(full_url, json=body, headers=headers)
        elif method == 'DELETE':
            response = requests.delete(full_url, headers=headers)
        
        return response
    
    def post_server(self, server_dict):
        """Create a server (instance)."""
        response = self.api_request('/servers', method='POST', 
                                     body=server_dict)
        return response.json()['server']
    
    def get_server(self, server_id):
        """Get server details."""
        response = self.api_request('/servers/%s' % server_id)
        return response.json()['server']

Complete Test Example

File: nova/tests/functional/test_servers.py (simplified)

Here's a complete example showing how all fixtures work together:

"""Example functional test for server operations."""

from oslo_utils.fixture import uuidsentinel as uuids

from nova import context
from nova import objects
from nova import test
from nova.tests import fixtures as nova_fixtures
from nova.tests.functional import fixtures as func_fixtures
from nova.tests.functional import integrated_helpers


class ServerCreateTest(test.TestCase,
                        integrated_helpers.InstanceHelperMixin):
    """Test server create operations with full stack."""
    
    # Class attributes
    api_major_version = 'v2.1'
    microversion = 'latest'
    ADMIN_API = False
    
    def setUp(self):
        super(ServerCreateTest, self).setUp()
        
        # Configuration
        self.flags(compute_driver='fake.SmallFakeDriver')
        
        # External service fixtures
        self.useFixture(nova_fixtures.RealPolicyFixture())
        self.glance = self.useFixture(nova_fixtures.GlanceFixture(self))
        self.neutron = self.useFixture(nova_fixtures.NeutronFixture(self))
        self.cinder = self.useFixture(nova_fixtures.CinderFixture(self))
        self.placement = self.useFixture(
            func_fixtures.PlacementFixture()).api
        
        # Notification fixture
        self.notifier = self.useFixture(
            nova_fixtures.NotificationFixture(self))
        
        # API fixture
        self.api_fixture = self.useFixture(nova_fixtures.OSAPIFixture(
            api_version='v2.1'))
        self.api = self.api_fixture.api
        self.api.microversion = self.microversion
        self.admin_api = self.api_fixture.admin_api
        self.admin_api.microversion = self.microversion
        
        # Start services
        self.start_service('conductor')
        self.scheduler = self.start_service('scheduler')
        self.compute = self.start_service('compute', host='compute1')
    
    def _build_server(self, name='test-server', image_uuid=None, 
                      flavor_id=None, networks=None):
        """Helper to build server request."""
        return {
            'name': name,
            'imageRef': image_uuid or nova_fixtures.GlanceFixture.image1['id'],
            'flavorRef': flavor_id or '1',
            'networks': networks or [{'uuid': 
                nova_fixtures.NeutronFixture.network_1['id']}],
        }
    
    def _wait_for_state_change(self, server, expected_status, max_retries=50):
        """Helper to wait for server status."""
        for i in range(max_retries):
            server = self.api.get_server(server['id'])
            if server['status'] == expected_status:
                return server
            if server['status'] == 'ERROR':
                self.fail('Server went to ERROR state')
            time.sleep(0.1)
        self.fail('Timed out waiting for server to reach %s' % expected_status)
    
    def test_create_server(self):
        """Test basic server creation."""
        # Build server request
        server_req = self._build_server()
        
        # Create server via API
        server = self.api.post_server({'server': server_req})
        self.assertEqual('BUILD', server['status'])
        
        # Wait for ACTIVE
        server = self._wait_for_state_change(server, 'ACTIVE')
        self.assertEqual('ACTIVE', server['status'])
        
        # Verify in database
        ctxt = context.get_admin_context()
        instance = objects.Instance.get_by_uuid(ctxt, server['id'])
        self.assertEqual('active', instance.vm_state)
        self.assertEqual('compute1', instance.host)
        
        # Verify notification was emitted
        notifications = self.notifier.wait_for_versioned_notifications(
            'instance.create.end', n_events=1, timeout=10.0)
        self.assertEqual(1, len(notifications))
        self.assertEqual(server['id'], 
            notifications[0]['payload']['nova_object.data']['uuid'])
    
    def test_create_server_with_volume(self):
        """Test server creation with Cinder volume."""
        # Create volume via Cinder fixture
        volume_id = '9c6d9c2d-7a8f-4c80-938d-3bf062b8d489'
        
        # Build server with block_device_mapping_v2
        server_req = self._build_server()
        server_req['block_device_mapping_v2'] = [{
            'boot_index': 0,
            'uuid': volume_id,
            'source_type': 'volume',
            'destination_type': 'volume',
        }]
        server_req['imageRef'] = ''  # No image for boot-from-volume
        
        # Create server
        server = self.api.post_server({'server': server_req})
        server = self._wait_for_state_change(server, 'ACTIVE')
        
        # Verify volume attachment
        attachments = self.cinder.volume_to_attachment[volume_id]
        self.assertEqual(1, len(attachments))
        
        # Get the attachment
        attachment = list(attachments.values())[0]
        self.assertEqual(server['id'], attachment['instance_uuid'])
        self.assertIsNotNone(attachment['connector'])
    
    def test_create_server_placement_allocation(self):
        """Test that placement allocations are created."""
        server_req = self._build_server()
        server = self.api.post_server({'server': server_req})
        server = self._wait_for_state_change(server, 'ACTIVE')
        
        # Get resource provider UUID for compute1
        compute_rp_uuid = self._get_provider_uuid_by_host('compute1')
        
        # Verify allocation in Placement
        allocations_url = '/allocations/%s' % server['id']
        allocations = self.placement.get(allocations_url).body
        
        self.assertIn(compute_rp_uuid, 
                      allocations['allocations'])
        self.assertIn('VCPU', 
                      allocations['allocations'][compute_rp_uuid]['resources'])
    
    def test_delete_server(self):
        """Test server deletion."""
        # Create server
        server_req = self._build_server()
        server = self.api.post_server({'server': server_req})
        server = self._wait_for_state_change(server, 'ACTIVE')
        
        # Delete server
        self.api.delete_server(server['id'])
        
        # Wait for it to be gone
        self._wait_until_deleted(server)
        
        # Verify notification
        notifications = self.notifier.wait_for_versioned_notifications(
            'instance.delete.end', n_events=1, timeout=10.0)
        self.assertEqual(1, len(notifications))
        
        # Verify allocation is cleaned up in Placement
        allocations_url = '/allocations/%s' % server['id']
        allocations = self.placement.get(allocations_url).body
        self.assertEqual({}, allocations['allocations'])

Regression Tests

Overview

Regression tests are specialized functional tests designed to reproduce and prevent specific bugs from reoccurring. Nova maintains a dedicated nova/tests/functional/regressions/ directory for these tests, which provides long-term stability and clear documentation of historical bugs.

Key Principles

  1. Bug Reproduction First: Write the test to reproduce the bug before fixing it
  2. Explicit Dependencies: Make all dependencies clear in the test code. Reuse fixtures and helper functions, but do so explicitly rather than through deep inheritance chains
  3. Pragmatic Reuse: Use stable fixtures (e.g., NeutronFixture) and helpers (e.g., InstanceHelperMixin) to avoid duplication, while keeping dependencies visible
  4. Self-Contained Setup: Set up the full stack in setUp() with explicit fixture declarations
  5. Clear Documentation: Include detailed docstrings explaining the bug, its impact, and the fix
  6. Stable Over Time: Avoid hidden dependencies so tests remain valid as unrelated code evolves

File: nova/tests/functional/regressions/README.rst

================================
 Tests for Specific Regressions
================================

When we have a bug reported by end users that we can write a full
stack reproduce on, we should. And we should keep a regression test
for that bug in our tree. It can be deleted at some future date if
needed, but largely should not be changed.

Writing Regression Tests
========================

- These should be full stack tests which inherit from
  nova.test.TestCase directly or with explicit stable mixins
  (e.g., InstanceHelperMixin). This prevents coupling with other tests
  while allowing pragmatic reuse of stable helpers.

- They should setup a full stack cloud in their setUp via fixtures.
  All fixture usage should be explicit in the test's setUp() method.

- Reusing stable fixtures (RealPolicyFixture, NeutronFixture, etc.) and
  helper functions (integrated_helpers) is encouraged, but dependencies
  must be explicit and visible in the test code.

- They should each live in a file which is named test_bug_######.py

- Avoid deep inheritance chains that hide dependencies. The goal is
  stability: changes to unrelated test infrastructure should not break
  regression tests.

Writing Tests Before the Bug is Fixed
=====================================

When possible, write the regression test to demonstrate the bug before
fixing it:

1. Write test that reproduces the broken behavior
2. Assert the current (broken) behavior
3. Comment out the expected (correct) assertions
4. Commit the test with "Related-Bug: #XXXXXX"
5. Fix the bug in production code
6. Update test: swap assertions (broken → commented, expected → active)
7. Commit with "Closes-Bug: #XXXXXX"

This approach provides clear documentation of the bug lifecycle and
verifies that the fix actually works.

Directory Structure

<project>/tests/functional/regressions/
├── __init__.py
├── README.rst
├── test_bug_1234567.py
├── test_bug_1234568.py
└── test_bug_1234569.py

Naming Convention

  • File: test_bug_<launchpad_bug_id>.py
  • Class: Descriptive name explaining the bug scenario
  • Test method: test_<specific_scenario>

The Lifecycle of a Regression Test

Phase 1: Pre-Fix (Reproducing the Bug)

When a bug is reported, the first step is to write a regression test that reproduces the broken behavior. At this stage:

  1. Write the test to demonstrate the bug
  2. Include assertions that show the current broken behavior
  3. Add commented-out assertions that show the expected correct behavior
  4. Document the bug thoroughly in the docstring

Example: Pre-Fix State

File: nova/tests/functional/regressions/test_bug_1234567.py

# Copyright 2024 ACME Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from nova import test
from nova.tests import fixtures as nova_fixtures


class TestDeleteServerWithReservedVolumes(test.TestCase):
    """Regression test for bug #1234567.
    
    When deleting a server that failed to schedule, reserved volumes
    are not being cleaned up in Cinder, leaving volumes stuck in
    'attaching' state.
    
    The bug occurs because:
    1. Server create fails to schedule (no valid host)
    2. Server goes to ERROR state
    3. User deletes the server
    4. API deletes the server locally (no compute to contact)
    5. Volume reservations are never cleaned up
    
    Expected behavior:
    - Volume attachments should be deleted during local delete
    - Volumes should return to 'available' state
    """
    
    def setUp(self):
        super(TestDeleteServerWithReservedVolumes, self).setUp()
        self.useFixture(nova_fixtures.RealPolicyFixture())
        self.useFixture(nova_fixtures.NeutronFixture(self))
        self.useFixture(nova_fixtures.GlanceFixture(self))
        self.cinder = self.useFixture(nova_fixtures.CinderFixture(self))
        
        api_fixture = self.useFixture(nova_fixtures.OSAPIFixture(
            api_version='v2.1'))
        self.api = api_fixture.api
        
        self.start_service('conductor')
        self.start_service('scheduler')
        # NOTE: Intentionally NOT starting compute so instance fails to schedule
        
        self.useFixture(nova_fixtures.CastAsCallFixture(self))
    
    def test_delete_error_server_cleans_up_volume_attachments(self):
        """Test that volume attachments are cleaned up on local delete."""
        # Use a pre-existing volume
        volume_id = '9c6d9c2d-7a8f-4c80-938d-3bf062b8d489'
        
        # Create a boot-from-volume server
        server_req = {
            'name': 'test-server',
            'networks': 'none',
            'block_device_mapping_v2': [{
                'boot_index': 0,
                'uuid': volume_id,
                'source_type': 'volume',
                'destination_type': 'volume',
            }],
        }
        server = self.api.post_server({'server': server_req})
        server_id = server['id']
        
        # Wait for server to go to ERROR (no valid host)
        server = self._wait_for_state_change(server, 'ERROR')
        
        # Verify volume attachment was created
        self.assertIn(volume_id,
                      self.cinder.volume_ids_for_instance(server_id))
        
        # Delete the server (local delete since no compute)
        self.api.delete_server(server_id)
        self._wait_until_deleted(server)
        
        # BUG: Volume attachment is NOT cleaned up
        # This assertion demonstrates the broken behavior:
        self.assertIn(volume_id,
                      self.cinder.volume_ids_for_instance(server_id))
        
        # EXPECTED (commented out until bug is fixed):
        # self.assertNotIn(volume_id,
        #                  self.cinder.volume_ids_for_instance(server_id))
    
    def _wait_for_state_change(self, server, expected_status):
        """Helper to wait for server status change."""
        for i in range(50):
            server = self.api.get_server(server['id'])
            if server['status'] == expected_status:
                return server
            time.sleep(0.1)
        self.fail('Timed out waiting for server %s' % expected_status)
    
    def _wait_until_deleted(self, server):
        """Helper to wait for server deletion."""
        # implementation omitted for brevity
        pass

At this stage, the test fails in the expected way, demonstrating the bug. This test is committed to the repository with:

git add nova/tests/functional/regressions/test_bug_1234567.py
git commit -m "Add regression test for bug 1234567

This test reproduces the issue where volume attachments are not
cleaned up when deleting a server that failed to schedule.

The test currently asserts the broken behavior and has the
expected correct assertion commented out. This will be updated
when the bug is fixed.

Related-Bug: #1234567"

Phase 2: Fixing the Bug

While fixing the bug, the regression test serves as verification that the fix works. The fix commit includes:

  1. The actual bug fix in the production code
  2. Update to the regression test: swap the assertions

Example: Fix Commit

Changes to: nova/tests/functional/regressions/test_bug_1234567.py

        # Delete the server (local delete since no compute)
        self.api.delete_server(server_id)
        self._wait_until_deleted(server)
        
        # BUG FIXED: Volume attachment is now cleaned up correctly
        # Old assertion (demonstrating broken behavior):
        # self.assertIn(volume_id,
        #               self.cinder.volume_ids_for_instance(server_id))
        
        # Correct behavior:
        self.assertNotIn(volume_id,
                         self.cinder.volume_ids_for_instance(server_id))

The fix commit message:

git commit -m "Fix volume attachment cleanup on local delete

When a server fails to schedule and is then deleted, we perform
a local delete in the API. This was not cleaning up volume
attachments in Cinder, leaving volumes in 'attaching' state.

This fix ensures that during local delete, we iterate through
all volume attachments and delete them via the Cinder API.

The regression test is updated to assert the correct behavior.

Closes-Bug: #1234567"

Phase 3: Post-Fix (Prevention)

After the fix is merged, the regression test serves its long-term purpose: preventing the bug from reoccurring. If anyone modifies the delete path in a way that reintroduces the bug, the regression test will fail immediately in CI.

Real-World Examples from Nova

Example 1: Using _IntegratedTestBase with Override

File: nova/tests/functional/regressions/test_bug_1404867.py

from nova.tests.functional import integrated_helpers


class DeleteWithReservedVolumes(integrated_helpers._IntegratedTestBase):
    """Test deleting of an instance in error state that has a reserved volume.

    This test boots a server from volume which will fail to be scheduled,
    ending up in ERROR state with no host assigned and then deletes the server.

    Since the server failed to be scheduled, a local delete should run which
    will make sure that reserved volumes at the API layer are properly cleaned
    up.

    The regression is that Nova would not clean up the reserved volumes and
    the volume would be stuck in 'attaching' state.
    """
    api_major_version = 'v2.1'
    microversion = 'latest'

    def _setup_compute_service(self):
        # Override to NOT start compute, ensuring scheduling failure
        pass

    def test_delete_with_reserved_volumes_new(self):
        # Create a server which should go to ERROR state
        volume_id = nova_fixtures.CinderFixture.IMAGE_BACKED_VOL
        server = self._create_error_server(volume_id)
        server_id = server['id']

        # Volume attachment should exist
        self.assertIn(volume_id,
                      self.cinder.volume_ids_for_instance(server_id))

        # Delete the server
        self.api.delete_server(server['id'])

        # The volume attachment should be cleaned up
        self.assertNotIn(volume_id,
                         self.cinder.volume_ids_for_instance(server_id))

Key Observations:

  1. Inherits from _IntegratedTestBase: A stable base class from integrated_helpers that provides common setup
  2. Explicit override: _setup_compute_service() override makes the test's special requirements visible
  3. Uses inherited helpers: Methods like _create_error_server() from the base class (explicit reuse)
  4. Clear documentation: Docstring explains the bug, its cause, and the fix

Example 2: Minimal Inheritance with Explicit Fixtures

File: nova/tests/functional/regressions/test_bug_1522536.py

from nova import test
from nova.tests import fixtures as nova_fixtures
from nova.tests.functional.api import client


class TestServerGet(test.TestCase):
    """Regression test for bug #1522536.
    
    Before fixing this bug, getting a numeric id caused a 500
    error. After the fix it returns a 404, which is expected.
    """
    REQUIRES_LOCKING = True

    def setUp(self):
        super(TestServerGet, self).setUp()
        # Explicit fixture setup - clear what this test needs
        self.useFixture(nova_fixtures.RealPolicyFixture())
        self.useFixture(nova_fixtures.NeutronFixture(self))
        self.useFixture(nova_fixtures.GlanceFixture(self))
        api_fixture = self.useFixture(nova_fixtures.OSAPIFixture(
            api_version='v2.1'))

        self.api = api_fixture.api

        # Explicit service startup
        self.start_service('conductor')
        self.start_service('scheduler')
        self.compute = self.start_service('compute')

        # Explicit helper fixture
        self.useFixture(nova_fixtures.CastAsCallFixture(self))

        self.image_id = self.api.get_images()[0]['id']
        self.flavor_id = self.api.get_flavors()[0]['id']

    def test_id_overlap(self):
        """Regression test for bug #1522536."""
        server = dict(name='server1',
                      imageRef=self.image_id,
                      flavorRef=self.flavor_id)
        self.api.post_server({'server': server})
        # Should raise 404 (not 500) when getting numeric id
        self.assertRaises(client.OpenStackApiNotFoundException,
                          self.api.get_server, 1)

Key Observations:

  1. Direct inheritance: Only from test.TestCase - minimal dependencies
  2. All fixtures explicit: Every useFixture() call is visible in setUp()
  3. Reuses stable fixtures: NeutronFixture, GlanceFixture, CastAsCallFixture - all explicit
  4. Simple and clear: Anyone can understand dependencies by reading setUp()

Another Example: Quota Cleanup

File: nova/tests/functional/regressions/test_bug_1670627.py

This test ensures that when deleting a server from cell0 (failed to schedule), quota is properly decremented.

class TestDeleteFromCell0CheckQuota(test.TestCase):
    """Regression test for quota cleanup when deleting from cell0.
    
    In Ocata, servers that fail to schedule are placed in cell0.
    When deleted, quota was not being properly cleaned up because
    the delete happened in cell0 but the quota reservation was in
    the main database.
    """
    
    def test_delete_error_instance_in_cell0_and_check_quota(self):
        # Get starting quota
        starting_usage = self.api.get_limits()

        # Create server (will fail to schedule, go to ERROR in cell0)
        server = self.api.post_server({'server': server_req})
        self._wait_for_instance_status(server['id'], 'ERROR')

        # Verify quota was incremented
        current_usage = self.api.get_limits()
        self.assertEqual(starting_usage['totalInstancesUsed'] + 1,
                         current_usage['totalInstancesUsed'])

        # Delete the server
        self.api.delete_server(server['id'])
        self._wait_for_instance_delete(server['id'])

        # BUG FIXED: Quota should be decremented
        ending_usage = self.api.get_limits()
        self.assertEqual(starting_usage['totalInstancesUsed'],
                         ending_usage['totalInstancesUsed'])

Porting to Mystique: Complete Example

Let's create a complete example for our fictional "mystique" networking service.

Scenario: Bug #7890123 - Network port not cleaned up when network delete fails

File: mystique/tests/functional/regressions/test_bug_7890123.py

Phase 1: Pre-Fix (Reproducing the Bug)

# Copyright 2025 ACME Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""Regression test for bug 7890123.

When deleting a network fails due to an external error (e.g., Neutron
timeout), any ports that were successfully deleted as part of the cascade
delete are not being restored or tracked properly.

This leaves the system in an inconsistent state where:
1. The network still exists in Mystique's database
2. The ports are deleted from Neutron
3. Users cannot reconnect to the network because port creation fails

Expected behavior:
- If network delete fails, rollback port deletions OR
- Mark ports as deleted in Mystique's database to reflect reality
"""

import fixtures
from unittest import mock

from mystique import test
from mystique.tests import local_fixtures as mystique_fixtures


class TestNetworkDeletePortCleanup(test.TestCase):
    """Regression test for bug #7890123.
    
    This test verifies that when a network delete operation fails
    after some ports have been deleted, the system handles the
    inconsistency correctly.
    """
    
    def setUp(self):
        super(TestNetworkDeletePortCleanup, self).setUp()
        
        # Configuration
        self.useFixture(mystique_fixtures.ConfFixture())
        
        # Database
        self.useFixture(mystique_fixtures.Database())
        
        # RPC
        self.useFixture(mystique_fixtures.RPCFixture())
        
        # External services
        self.neutron = self.useFixture(
            mystique_fixtures.NeutronFixture(self))
        
        # API
        self.api_fixture = self.useFixture(
            mystique_fixtures.APIFixture())
        self.api = self.api_fixture.api
        self.admin_api = self.api_fixture.admin_api
        
        # Start services
        self.start_service('mystique-server')
    
    def test_network_delete_failure_port_cleanup(self):
        """Test port cleanup when network delete fails.
        
        This test demonstrates bug #7890123 where ports are left in an
        inconsistent state when network deletion fails partway through.
        """
        # Create a network
        network_req = {
            'name': 'test-network',
            'admin_state_up': True,
        }
        network = self.api.create_network(network_req)
        network_id = network['id']
        
        # Create two ports on the network
        port1 = self.api.create_port({
            'network_id': network_id,
            'name': 'test-port-1'
        })
        port2 = self.api.create_port({
            'network_id': network_id,
            'name': 'test-port-2'
        })
        
        # Verify ports exist
        ports = self.api.list_ports(network_id=network_id)
        self.assertEqual(2, len(ports))
        
        # Mock Neutron to fail network delete after port deletion
        original_delete = self.neutron.delete_network
        
        def fake_delete_network(network_id):
            # Delete ports first (this is what Neutron does)
            for port in list(self.neutron._ports.values()):
                if port['network_id'] == network_id:
                    del self.neutron._ports[port['id']]
            
            # Then fail to delete the network
            raise Exception("Neutron timeout during network delete")
        
        self.useFixture(fixtures.MonkeyPatch(
            'mystique.tests.local_fixtures.neutron.NeutronFixture.delete_network',
            fake_delete_network))
        
        # Attempt to delete the network (should fail)
        exc = self.assertRaises(
            Exception,
            self.admin_api.delete_network,
            network_id)
        self.assertIn('timeout', str(exc))
        
        # BUG: Ports are deleted in Neutron but still shown by Mystique API
        # This demonstrates the broken behavior:
        ports = self.api.list_ports(network_id=network_id)
        self.assertEqual(2, len(ports),
                         "Bug: Ports still shown even though deleted in Neutron")
        
        # Verify ports are actually gone from Neutron
        self.assertEqual(0, len(self.neutron._ports))
        
        # EXPECTED BEHAVIOR (commented out until bug is fixed):
        # After a failed network delete, ports should be marked as deleted
        # or the system should be in a consistent state
        
        # ports = self.api.list_ports(network_id=network_id)
        # self.assertEqual(0, len(ports),
        #                  "Ports should be marked deleted to match Neutron")
        
        # OR we should have a way to sync/reconcile the state:
        # self.admin_api.sync_network_ports(network_id)
        # ports = self.api.list_ports(network_id=network_id)
        # self.assertEqual(0, len(ports))

Commit message for pre-fix:

Add regression test for bug 7890123

This test reproduces the issue where ports are left in an inconsistent
state when network deletion fails in Neutron after ports have been
deleted.

The test currently asserts the broken behavior (ports still visible in
Mystique even though deleted in Neutron) and has the expected correct
behavior commented out.

This will be updated when the bug is fixed.

Related-Bug: #7890123

Phase 2: The Fix

After investigation, the team decides to implement a reconciliation mechanism. The fix involves:

  1. Adding a _sync_ports_with_neutron() method to the network manager
  2. Calling this during network delete failure recovery
  3. Updating the test to verify the fix

Updated test file (showing the changes):

    def test_network_delete_failure_port_cleanup(self):
        """Test port cleanup when network delete fails.
        
        This test verifies that when network deletion fails in Neutron
        after ports have been deleted, Mystique reconciles its port
        state with Neutron to maintain consistency.
        """
        # ... setup code unchanged ...
        
        # Attempt to delete the network (should fail)
        exc = self.assertRaises(
            Exception,
            self.admin_api.delete_network,
            network_id)
        self.assertIn('timeout', str(exc))
        
        # BUG FIXED: After the failed delete, the system reconciles port state
        # Old assertion (demonstrated broken behavior):
        # ports = self.api.list_ports(network_id=network_id)
        # self.assertEqual(2, len(ports),
        #                  "Bug: Ports still shown even though deleted in Neutron")
        
        # Correct behavior: Ports are automatically reconciled
        ports = self.api.list_ports(network_id=network_id)
        self.assertEqual(0, len(ports),
                         "Ports reconciled with Neutron state")
        
        # Verify the network still exists (delete failed)
        network = self.api.show_network(network_id)
        self.assertEqual(network_id, network['id'])
        
        # Verify we can now successfully delete the empty network
        self.admin_api.delete_network(network_id)
        self.assertRaises(
            Exception,  # NetworkNotFound
            self.api.show_network,
            network_id)

Commit message for fix:

Fix port consistency after failed network delete

When a network delete operation fails in Neutron after ports have
been deleted, Mystique's database was left with stale port records.
This caused confusion and prevented proper cleanup.

This fix adds a reconciliation mechanism that:
1. Detects when Neutron delete fails
2. Queries Neutron for actual port state
3. Updates Mystique's database to match
4. Allows retry of the delete operation

The regression test is updated to assert the correct behavior.

Closes-Bug: #7890123

Best Practices for Regression Tests

1. Explicit Dependencies Over Hidden Inheritance

The key principle: Make dependencies explicit. It's perfectly acceptable to reuse fixtures and helper functions, but do so explicitly rather than through deep inheritance chains.

Good - Explicit Fixture Reuse:

from nova.tests import fixtures as nova_fixtures
from nova.tests.functional import integrated_helpers


class TestBug123(test.TestCase, integrated_helpers.InstanceHelperMixin):
    """Direct inheritance from base TestCase with explicit mixin usage.
    
    Uses InstanceHelperMixin for stable helper methods like
    _wait_for_state_change() and _build_server().
    """
    
    def setUp(self):
        super(TestBug123, self).setUp()
        # Explicitly set up fixtures - clear what dependencies exist
        self.useFixture(nova_fixtures.RealPolicyFixture())
        self.useFixture(nova_fixtures.NeutronFixture(self))
        self.glance = self.useFixture(nova_fixtures.GlanceFixture(self))
        self.cinder = self.useFixture(nova_fixtures.CinderFixture(self))
    
    def test_something(self):
        # Use helper from mixin - explicit and clear
        server = self._build_server()
        self._wait_for_state_change(server, 'ACTIVE')

Good - Reusing Standalone Helpers:

from nova.tests.functional import integrated_helpers


class TestBug456(test.TestCase):
    """Using stable helper functions from integrated_helpers module."""
    
    def test_something(self):
        # Explicit call to standalone helper function
        rp_uuid = integrated_helpers.get_provider_uuid_by_host(
            self.placement, 'compute1')
        
        # Clear what's being used and where it comes from
        allocations = integrated_helpers.get_allocations_for_server(
            self.placement, server['id'])

Avoid - Deep Inheritance Chains:

class TestBug789(SomeSpecificTestClass):
    """Deep inheritance hides dependencies.
    
    Problem: If SomeSpecificTestClass changes its setUp(), fixture
    usage, or is refactored/removed, this test breaks even though
    the bug hasn't regressed.
    """
    pass

Why This Matters:

  1. Stability: When dependencies are explicit, changes to unrelated test infrastructure don't break regression tests
  2. Clarity: Anyone reading the test can see exactly what it depends on
  3. Pragmatism: Reusing stable fixtures and helpers avoids duplication (DRY principle)
  4. Maintainability: If a shared fixture changes, it's clear which tests are affected

What's Safe to Reuse:

Fixtures (via self.useFixture()):

self.useFixture(nova_fixtures.RealPolicyFixture())
self.useFixture(nova_fixtures.NeutronFixture(self))

Stable mixins (like InstanceHelperMixin):

class TestBug123(test.TestCase, integrated_helpers.InstanceHelperMixin):
    pass

Standalone helper functions:

from nova.tests.functional.api import client

exc = self.assertRaises(client.OpenStackApiNotFoundException, ...)

Deep class hierarchies:

# Avoid: Multiple levels of inheritance
class TestBug123(SomeOtherTestClass):  # which inherits from another class...
    pass

The Balance: DRY vs Stability

This approach strikes a balance between three goals:

  1. DRY (Don't Repeat Yourself): Reuse fixtures and helpers instead of duplicating code
  2. Stability: Tests survive refactoring of unrelated code because dependencies are explicit
  3. Clarity: Anyone can understand what a test depends on by reading it

Example of this balance:

# Good: Explicit reuse - stable and DRY
class TestBug123(test.TestCase, integrated_helpers.InstanceHelperMixin):
    def setUp(self):
        super().setUp()
        self.useFixture(nova_fixtures.NeutronFixture(self))  # Explicit
        self.useFixture(nova_fixtures.GlanceFixture(self))   # Explicit
    
    def test_something(self):
        server = self._build_server()  # From mixin - visible import
        self._wait_for_state_change(server, 'ACTIVE')  # From mixin

# Bad: Hidden dependencies - breaks when parent changes
class TestBug456(SomeComplexTestClass):  # What does this provide?
    def test_something(self):
        # Relies on self.api, self.neutron, etc from parent
        # If parent changes, this breaks even if bug hasn't regressed
        pass

2. Self-Contained Setup with Explicit Dependencies

The test's setUp() method should explicitly declare all fixtures and dependencies, even when reusing shared components.

Good - Explicit Fixture Declaration:

def setUp(self):
    super().setUp()
    # Explicit fixture setup - clear what this test needs
    self.useFixture(nova_fixtures.RealPolicyFixture())
    self.useFixture(nova_fixtures.NeutronFixture(self))
    self.glance = self.useFixture(nova_fixtures.GlanceFixture(self))
    self.cinder = self.useFixture(nova_fixtures.CinderFixture(self))
    
    # Start services explicitly
    self.start_service('conductor')
    self.start_service('scheduler')
    self.start_service('compute', host='compute1')
    
    # API fixture
    self.api_fixture = self.useFixture(nova_fixtures.OSAPIFixture(
        api_version='v2.1'))
    self.api = self.api_fixture.api

Also Good - Using Stable Helpers:

from nova.tests.functional import integrated_helpers


class TestBug123(test.TestCase, integrated_helpers.InstanceHelperMixin):
    """Using InstanceHelperMixin provides _build_server(), etc."""
    
    def setUp(self):
        super().setUp()
        # Still explicitly set up fixtures even with mixin
        self.useFixture(nova_fixtures.NeutronFixture(self))
        # ...etc
        
        # Mixin provides _build_server(), _wait_for_state_change()
        # but you still control fixture setup

Avoid - Hidden Dependencies:

def setUp(self):
    super().setUp()
    # Problem: What fixtures does this set up? You have to check parent class
    self._setup_inherited_fixtures()
    
    # Problem: Relies on parent's setUp() creating self.api, self.cinder, etc
    # If parent changes, this breaks

Key Principle: Anyone reading the test should understand its dependencies by reading setUp(), not by tracing through parent classes.

3. Clear Documentation

Every regression test should include:

class TestBugXXXXXX(test.TestCase):
    """One-line summary of the bug.
    
    Detailed explanation:
    - What was the bug?
    - How did it manifest to users?
    - What sequence of operations triggers it?
    - What was the root cause?
    - How was it fixed?
    
    The test ensures the bug does not regress by:
    - Describing what the test does
    """

4. Reproduce Exact Conditions

def _setup_compute_service(self):
    # Override parent to create specific conditions for the bug
    # In this case, NOT starting compute ensures scheduling failure
    pass

def setUp(self):
    super().setUp()
    # Set specific config that triggers the bug
    self.flags(allow_resize_to_same_host=False)
    # etc.

5. Descriptive Assertions

Good:

self.assertNotIn(volume_id,
                 self.cinder.volume_ids_for_instance(server_id),
                 "Volume should be detached after local delete")

Avoid:

self.assertNotIn(volume_id, vol_list)  # What does this test?

Testing the Lifecycle Locally

When developing a regression test, you can verify the lifecycle:

# Phase 1: Verify test reproduces the bug (should pass with broken assertion)
tox -e functional -- mystique.tests.functional.regressions.test_bug_7890123

# Phase 2: Apply the fix, test should now pass with correct assertion
# (After updating test)
tox -e functional -- mystique.tests.functional.regressions.test_bug_7890123

# Phase 3: Verify test catches regression
# Temporarily revert the fix, test should fail
tox -e functional -- mystique.tests.functional.regressions.test_bug_7890123

Summary: Regression Test Checklist

When creating a regression test:

  • File named test_bug_<id>.py in tests/functional/regressions/
  • Inherits from test.TestCase or minimal base class (stable mixins like InstanceHelperMixin are OK)
  • All dependencies are explicit (fixtures, helpers, imports)
  • Self-contained setUp() with all fixtures declared inline
  • Reuses stable fixtures and helpers where appropriate (DRY principle)
  • Avoids deep inheritance chains that hide dependencies
  • Comprehensive docstring explaining the bug
  • Test reproduces the bug (initial commit)
  • Broken assertions demonstrate current behavior
  • Expected assertions commented out
  • After fix: swap assertions (broken → commented, expected → active)
  • Test verifies the fix prevents regression
  • Clear, descriptive assertion messages
  • Stable over time: changes to unrelated code don't break this test

Additional Resources

For more examples, examine Nova's regression tests:

  • nova/tests/functional/regressions/test_bug_1404867.py - Volume cleanup
  • nova/tests/functional/regressions/test_bug_1670627.py - Quota handling
  • nova/tests/functional/regressions/test_bug_1718455.py - Multi-create
  • nova/tests/functional/regressions/test_bug_1790204.py - Same-host resize

Each demonstrates the pattern of:

  1. Clear documentation of the bug
  2. Self-contained setup
  3. Explicit reproduction of the failure scenario
  4. Verification of the fix

Creating Reusable Test Infrastructure

Overview

While regression tests should have explicit dependencies, creating reusable fixtures, mixins, and helper functions is strongly encouraged for common operations. This promotes the DRY (Don't Repeat Yourself) principle while maintaining clarity and stability.

Key principle: Make helpers that are stable, well-documented, and easy to discover.

When to Create Reusable Components

Create reusable components when you notice:

  1. Repeated patterns across multiple tests
  2. Complex operations that would benefit from abstraction
  3. Common assertions that could be standardized
  4. Setup sequences used in multiple test files

Critical Principle: Automatic Cleanup

IMPORTANT: When creating helper functions or methods that create resources (networks, servers, volumes, etc.), they MUST register cleanup functions by default. This ensures:

  1. No resource leaks between tests
  2. Test isolation - each test starts with a clean slate
  3. Reliable test runs - prevents cascading failures

Pattern to follow:

def _create_resource(self, name='test-resource'):
    """Create a resource and automatically register cleanup."""
    resource = self.api.create_resource({'name': name})
    
    # CRITICAL: Always register cleanup for created resources
    self.addCleanup(self._cleanup_resource, resource['id'])
    
    return resource

def _cleanup_resource(self, resource_id):
    """Cleanup helper - deletes resource, ignoring NotFound errors."""
    try:
        self.api.delete_resource(resource_id)
    except Exception as e:
        # Ignore if already deleted (e.g., test explicitly deleted it)
        if 'NotFound' not in str(e):
            raise

Why this matters:

  • Tests run in random order in CI
  • Multiple tests may create resources with similar names
  • Leftover resources can cause future test failures
  • Cleanup functions run even if the test fails

Types of Reusable Components

1. Fixtures (Most Common)

Fixtures are the primary mechanism for code reuse in functional tests. They provide setup/teardown with automatic cleanup.

Example: Creating a Stable Fixture

File: mystique/tests/local_fixtures/neutron.py

"""Neutron fixture for Mystique tests."""

import copy
import fixtures
from oslo_utils import uuidutils


class NeutronFixture(fixtures.Fixture):
    """Mock Neutron API for Mystique tests.
    
    This fixture provides a stateful mock of Neutron's API, tracking
    networks, ports, and subnets. It's designed to be stable and
    reusable across all functional tests.
    
    Usage:
        self.neutron = self.useFixture(NeutronFixture(self))
        network = self.neutron.create_network({'name': 'test-net'})
    """
    
    def __init__(self, test):
        super().__init__()
        self.test = test
        self._networks = {}
        self._ports = {}
        self._subnets = {}
    
    def setUp(self):
        super().setUp()
        
        # Mock Neutron client using fixtures.MonkeyPatch
        self.test.useFixture(fixtures.MonkeyPatch(
            'mystique.network.neutron.get_client',
            self._get_client))
    
    def _get_client(self, context, admin=False):
        """Return fake Neutron client."""
        return _FakeNeutronClient(self)
    
    def create_network(self, body):
        """Create a network - stable API for tests to use."""
        network_req = body.get('network')
        network_id = network_req.get('id') or uuidutils.generate_uuid()
        
        network = {
            'id': network_id,
            'name': network_req.get('name'),
            'status': 'ACTIVE',
            'admin_state_up': network_req.get('admin_state_up', True),
        }
        
        self._networks[network_id] = network
        return {'network': copy.deepcopy(network)}
    
    def get_network(self, network_id):
        """Get network by ID - stable helper."""
        if network_id not in self._networks:
            raise Exception('NetworkNotFound: %s' % network_id)
        return copy.deepcopy(self._networks[network_id])


class _FakeNeutronClient:
    """Fake Neutron client wrapper."""
    
    def __init__(self, fixture):
        self.fixture = fixture
    
    def create_network(self, body):
        return self.fixture.create_network(body)
    
    def show_network(self, network_id):
        return {'network': self.fixture.get_network(network_id)}

Why this works:

  • Stable interface: Methods like create_network() won't change
  • Well-documented: Clear docstrings explain usage
  • Self-contained: All Neutron mocking in one place
  • Discoverable: In local_fixtures/ directory with clear name

2. Mixins (Helper Methods)

Mixins provide helper methods that can be added to any test class. Use them for common operations that don't require fixtures.

Example: Creating a Stable Mixin

File: mystique/tests/functional/integrated_helpers.py

"""Helper mixins and functions for functional tests."""

import time

from mystique.tests.functional.api import client


class NetworkHelperMixin:
    """Mixin providing common network test operations.
    
    This mixin is stable and designed for long-term reuse across
    functional tests, including regression tests.
    
    Usage:
        class MyTest(test.TestCase, NetworkHelperMixin):
            def test_something(self):
                network = self._create_network('test-net')
                self._wait_for_network_active(network)
    """
    
    def _create_network(self, name='test-network', **kwargs):
        """Create a network and return the response.
        
        IMPORTANT: This helper automatically registers cleanup to delete
        the network when the test completes. This ensures tests don't
        leave resources behind.
        
        Args:
            name: Network name (default: 'test-network')
            **kwargs: Additional network properties
        
        Returns:
            dict: Network creation response
        """
        network_req = {
            'name': name,
            'admin_state_up': kwargs.get('admin_state_up', True),
        }
        network_req.update(kwargs)
        network = self.api.create_network(network_req)
        
        # CRITICAL: Always register cleanup for created resources
        self.addCleanup(self._cleanup_network, network['id'])
        
        return network
    
    def _cleanup_network(self, network_id):
        """Cleanup helper - deletes network, ignoring NotFound errors."""
        try:
            self.api.delete_network(network_id)
        except Exception as e:
            # Ignore if already deleted
            if 'NotFound' not in str(e):
                raise
    
    def _wait_for_network_active(self, network, timeout=30):
        """Wait for network to reach ACTIVE status.
        
        Args:
            network: Network dict with 'id' key
            timeout: Maximum time to wait in seconds
        
        Returns:
            dict: Updated network dict
        
        Raises:
            AssertionError: If network doesn't reach ACTIVE in time
        """
        network_id = network['id']
        for i in range(timeout * 10):
            network = self.api.show_network(network_id)
            if network['status'] == 'ACTIVE':
                return network
            if network['status'] == 'ERROR':
                self.fail('Network %s went to ERROR' % network_id)
            time.sleep(0.1)
        self.fail('Timeout waiting for network %s to be ACTIVE' %
                  network_id)
    
    def _delete_network_and_wait(self, network_id, timeout=30):
        """Delete a network and wait for it to be removed.
        
        Args:
            network_id: UUID of network to delete
            timeout: Maximum time to wait in seconds
        """
        self.api.delete_network(network_id)
        for i in range(timeout * 10):
            try:
                self.api.show_network(network_id)
                time.sleep(0.1)
            except client.NotFoundException:
                return
        self.fail('Timeout waiting for network %s deletion' % network_id)


class PortHelperMixin:
    """Mixin for common port operations."""
    
    def _create_port(self, network_id, name='test-port', **kwargs):
        """Create a port on the given network.
        
        IMPORTANT: This helper automatically registers cleanup to delete
        the port when the test completes. This ensures tests don't
        leave resources behind.
        
        Stable helper for port creation across all functional tests.
        """
        port_req = {
            'network_id': network_id,
            'name': name,
        }
        port_req.update(kwargs)
        port = self.api.create_port(port_req)
        
        # CRITICAL: Always register cleanup for created resources
        self.addCleanup(self._cleanup_port, port['id'])
        
        return port
    
    def _cleanup_port(self, port_id):
        """Cleanup helper - deletes port, ignoring NotFound errors."""
        try:
            self.api.delete_port(port_id)
        except Exception as e:
            # Ignore if already deleted
            if 'NotFound' not in str(e):
                raise

Why this works:

  • Composable: Mix multiple helpers into one test class
  • Documented: Each method has clear docstring
  • Stable: Interface won't change, only implementation
  • Explicit usage: class MyTest(test.TestCase, NetworkHelperMixin)

3. Standalone Helper Functions

For operations that don't need test instance state, create standalone functions. These are the most explicit form of reuse.

Example: Creating Standalone Helpers

File: mystique/tests/functional/integrated_helpers.py

"""Standalone helper functions for functional tests."""


def wait_for_condition(condition_func, timeout=30, sleep_time=0.1,
                        error_message=None):
    """Generic wait helper for any condition.
    
    This is a stable utility function that can be reused anywhere.
    
    Args:
        condition_func: Callable that returns True when condition met
        timeout: Maximum time to wait in seconds
        sleep_time: Time to sleep between checks
        error_message: Custom error message (optional)
    
    Returns:
        The return value of condition_func when it becomes truthy
    
    Raises:
        TimeoutError: If condition not met within timeout
    
    Example:
        def check_ready():
            return api.get_network(net_id)['status'] == 'ACTIVE'
        
        wait_for_condition(check_ready, timeout=60,
                          error_message='Network never became active')
    """
    import time
    end_time = time.time() + timeout
    
    while time.time() < end_time:
        result = condition_func()
        if result:
            return result
        time.sleep(sleep_time)
    
    if error_message:
        raise TimeoutError(error_message)
    raise TimeoutError('Condition not met within %s seconds' % timeout)


def get_provider_uuid_by_name(placement_client, provider_name):
    """Get resource provider UUID by name from Placement.
    
    Stable helper for Placement interactions.
    
    Args:
        placement_client: Placement API client
        provider_name: Name of the resource provider
    
    Returns:
        str: UUID of the resource provider
    
    Raises:
        ValueError: If provider not found
    """
    resp = placement_client.get('/resource_providers')
    providers = resp.body['resource_providers']
    
    for provider in providers:
        if provider['name'] == provider_name:
            return provider['uuid']
    
    raise ValueError('Provider %s not found' % provider_name)


def assert_network_state(test_case, network, expected_status,
                         expected_name=None):
    """Assert network is in expected state.
    
    Reusable assertion helper that can be used across all tests.
    
    Args:
        test_case: Test case instance (for assertions)
        network: Network dict to check
        expected_status: Expected status value
        expected_name: Expected name (optional)
    """
    test_case.assertEqual(expected_status, network['status'],
                         'Network status mismatch')
    if expected_name:
        test_case.assertEqual(expected_name, network['name'],
                             'Network name mismatch')

Why this works:

  • No hidden state: Pure functions, easy to understand
  • Explicit imports: Clear where functionality comes from
  • Composable: Combine multiple helpers in any test
  • Testable: Helper functions can be unit tested themselves

Usage in Tests

Example: Using Reusable Components

"""Regression test demonstrating reusable component usage."""

from mystique import test
from mystique.tests import local_fixtures as mystique_fixtures
from mystique.tests.functional import integrated_helpers


class TestNetworkDeletion(test.TestCase,
                          integrated_helpers.NetworkHelperMixin):
    """Regression test for bug #7891234.
    
    Demonstrates proper use of reusable components while maintaining
    explicit dependencies for stability.
    """
    
    def setUp(self):
        super().setUp()
        
        # Explicit fixture setup - clear what we depend on
        self.useFixture(mystique_fixtures.ConfFixture())
        self.useFixture(mystique_fixtures.Database())
        self.neutron = self.useFixture(
            mystique_fixtures.NeutronFixture(self))
        
        # API fixture
        self.api_fixture = self.useFixture(
            mystique_fixtures.APIFixture())
        self.api = self.api_fixture.api
        
        # Start services
        self.start_service('mystique-server')
    
    def test_network_delete_with_ports(self):
        """Test network deletion when ports exist."""
        # Use mixin helper - explicit, stable, and DRY
        network = self._create_network('test-net')
        network_id = network['id']
        
        # Use mixin helper for port creation
        port = self._create_port(network_id, 'test-port')
        
        # Use standalone helper function - explicit import
        integrated_helpers.wait_for_condition(
            lambda: self.api.show_port(port['id'])['status'] == 'ACTIVE',
            timeout=30,
            error_message='Port never became active')
        
        # Delete network (should cascade to ports)
        self._delete_network_and_wait(network_id)
        
        # Verify cleanup using standalone helper
        with self.assertRaises(Exception):
            self.api.show_network(network_id)

What makes this good:

  1. Explicit fixture usage: All fixtures declared in setUp()
  2. Mixin for common operations: NetworkHelperMixin provides _create_network(), etc.
  3. Standalone helpers: wait_for_condition() imported explicitly
  4. DRY principle: No duplicated wait logic, network creation, etc.
  5. Stability: If helpers change, it's clear this test uses them

Guidelines for Creating Reusable Components

1. Make Them Stable

# Good: Stable interface that won't change
def wait_for_network_active(api, network_id, timeout=30):
    """Wait for network to reach ACTIVE status.
    
    This function signature is stable and won't change.
    """
    pass

# Bad: Unstable, might need new parameters later
def wait_for_network(api, network_id):
    # What if we need to wait for different statuses?
    # Or different timeout values?
    pass

2. Always Register Cleanup for Resources

CRITICAL: Any helper that creates resources MUST register cleanup by default.

# Good: Automatic cleanup registration
def _create_server(self, name='test-server', **kwargs):
    """Create a server and automatically register cleanup.
    
    IMPORTANT: Cleanup is automatic - the server will be deleted
    when the test completes, even if the test fails.
    """
    server = self.api.create_server({'name': name, **kwargs})
    self.addCleanup(self._cleanup_server, server['id'])
    return server

def _cleanup_server(self, server_id):
    """Cleanup helper - safely delete server."""
    try:
        self.api.delete_server(server_id)
    except Exception as e:
        if 'NotFound' not in str(e):
            raise

# Bad: No cleanup - causes resource leaks
def _create_server(self, name='test-server'):
    """Create a server."""
    return self.api.create_server({'name': name})
    # Missing cleanup registration!

Why this is critical:

  • Prevents resource leaks between tests
  • Ensures test isolation
  • Cleanup runs even if test fails or times out
  • Tests can run in any order safely

3. Document Thoroughly

class NetworkHelperMixin:
    """Mixin for network operations in functional tests.
    
    This mixin is designed for long-term stability and reuse.
    All methods maintain backward compatibility.
    
    Usage:
        class MyTest(test.TestCase, NetworkHelperMixin):
            def test_something(self):
                network = self._create_network('test')
    
    Available methods:
        - _create_network(name, **kwargs): Create a network
        - _wait_for_network_active(network, timeout): Wait for ACTIVE
        - _delete_network_and_wait(network_id, timeout): Delete network
    """
    
    def _create_network(self, name='test-network', **kwargs):
        """Create a network.
        
        Args:
            name: Network name (default: 'test-network')
            **kwargs: Additional network properties (admin_state_up,
                     description, etc.)
        
        Returns:
            dict: Network response from API
        
        Example:
            network = self._create_network('my-net',
                                          admin_state_up=False)
        """
        pass

4. Make Them Discoverable

Organize reusable components in well-known locations:

<project>/tests/
├── functional/
│   ├── integrated_helpers.py    # Mixins and standalone helpers
│   ├── base.py                   # Base test classes
│   └── api/
│       └── client.py             # API client helpers
├── local_fixtures/
│   ├── __init__.py               # Export common fixtures
│   ├── database.py               # Database fixture
│   ├── rpc.py                    # RPC fixture
│   ├── neutron.py                # Neutron fixture
│   └── api.py                    # API fixture

Export commonly-used components in __init__.py:

# <project>/tests/local_fixtures/__init__.py
"""Local fixtures for project functional tests."""

# Export the most commonly used fixtures for easy import
from .api import APIFixture  # noqa: F401
from .conf import ConfFixture  # noqa: F401
from .database import Database  # noqa: F401
from .neutron import NeutronFixture  # noqa: F401
from .rpc import RPCFixture  # noqa: F401

__all__ = [
    'APIFixture',
    'ConfFixture',
    'Database',
    'NeutronFixture',
    'RPCFixture',
]

5. Version and Deprecate Carefully

When you need to change a helper:

def _wait_for_network_status(self, network_id, status='ACTIVE',
                             timeout=30):
    """Wait for network to reach specified status.
    
    This replaces the old _wait_for_network_active() method with a
    more flexible version.
    
    Args:
        network_id: UUID of network
        status: Target status (default: 'ACTIVE')
        timeout: Maximum wait time in seconds
    """
    pass

def _wait_for_network_active(self, network, timeout=30):
    """Wait for network to reach ACTIVE status.
    
    DEPRECATED: Use _wait_for_network_status() instead.
    This method is maintained for backward compatibility and will be
    removed in the Z release.
    """
    import warnings
    warnings.warn('_wait_for_network_active is deprecated, use '
                  '_wait_for_network_status instead',
                  DeprecationWarning)
    return self._wait_for_network_status(network['id'], 'ACTIVE',
                                        timeout)

Benefits of Reusable Components

  1. Reduced Duplication: Write wait logic once, use everywhere
  2. Consistency: All tests use the same patterns
  3. Easier Maintenance: Fix a bug once in the helper
  4. Better Documentation: Helpers document common patterns
  5. Faster Test Writing: New tests compose existing helpers
  6. Explicit Dependencies: Tests show what helpers they use

Summary

Do create:

  • ✅ Stable fixtures for common service mocks
  • ✅ Mixins for common test operations
  • ✅ Standalone helpers for reusable logic
  • ✅ Well-documented, discoverable components

Do use explicitly:

  • self.useFixture(CommonFixture(self))
  • class MyTest(test.TestCase, HelperMixin)
  • from tests.functional import integrated_helpers

Don't do:

  • ❌ Create helpers that hide dependencies
  • ❌ Make unstable interfaces that change frequently
  • ❌ Bury helpers in obscure locations
  • ❌ Create deep inheritance chains

The goal: Make it easy to write tests that are DRY while keeping dependencies explicit for long-term stability.


Tox Environment Configuration

File: tox.ini

[tox]
minversion = 3.18.0
envlist = py3,functional,pep8

[testenv]
usedevelop = True
install_command = python -I -m pip install -c{env:TOX_CONSTRAINTS_FILE:...} {opts} {packages}
setenv =
  VIRTUAL_ENV={envdir}
  LANGUAGE=en_US
  LC_ALL=en_US.utf-8
  OS_STDOUT_CAPTURE=1
  OS_STDERR_CAPTURE=1
  OS_TEST_TIMEOUT=160
  PYTHONDONTWRITEBYTECODE=1
deps =
  -r{toxinidir}/test-requirements.txt
extras =
  # Project-specific extras (e.g., zvm, vmware for Nova)
passenv =
  # Allow OS_DEBUG for verbose logging
  OS_DEBUG
  # For greenlet leak detection
  NOVA_RAISE_ON_GREENLET_LEAK
commands =
  stestr run {posargs}
  stestr slowest

[testenv:functional{,-py310,-py311,-py312}]
description =
  Run functional tests.
setenv =
  {[testenv]setenv}
  # Enforce no greenlet leaks
  NOVA_RAISE_ON_GREENLET_LEAK=True
deps =
  {[testenv]deps}
  # Placement is required for functional tests
  openstack-placement>=9.0.0.0b1
commands =
  stestr --test-path=./nova/tests/functional run {posargs}
  stestr slowest

Key Environment Variables

  • OS_DEBUG=1: Enable DEBUG level logging in tests
  • OS_TEST_TIMEOUT: Timeout for individual tests (seconds)
  • NOVA_RAISE_ON_GREENLET_LEAK: Make greenlet leaks fail tests

Running Functional Tests

# All functional tests
tox -e functional

# Specific test
tox -e functional -- nova.tests.functional.test_servers.TestServers.test_create_server

# With debug logging
OS_DEBUG=1 tox -e functional -- nova.tests.functional.test_servers

# With failing tests only (reruns failures)
tox -e functional -- --failing

Zuul CI Integration

File: .zuul.yaml

- job:
    name: nova-tox-functional-py312
    parent: openstack-tox-functional-py312
    description: |
      Run tox-based functional tests for the OpenStack Nova project
      under cPython version 3.12.
    required-projects:
      # Nova functional tests need placement
      - openstack/nova
      - openstack/placement
    irrelevant-files:
      # Skip job if only docs changed
      - ^.*\.rst$
      - ^api-.*$
      - ^doc/(source|test)/.*$
      - ^nova/locale/.*$
      - ^releasenotes/.*$
    vars:
      zuul_work_dir: src/opendev.org/openstack/nova
      bindep_profile: test py312
    timeout: 3600  # 1 hour timeout

Key Zuul Concepts

  1. required-projects: Zuul clones these repos before running tests
  2. irrelevant-files: Skip job if only these files changed
  3. bindep_profile: System dependencies (e.g., libvirt-dev)
  4. timeout: Maximum job runtime

bindep.txt (System Dependencies)

# System dependencies for functional tests
libvirt-dev [platform:dpkg test]
libvirt-devel [platform:rpm test]

Porting to Another Project

Step-by-Step Guide

Let's port this to a fictional OpenStack project called "mystique" (a networking service).

Important Notes:

  • Use mystique/tests/local_fixtures/ (not fixtures/) to avoid import conflicts with the fixtures package
  • Use fixtures.MonkeyPatch and useFixture() instead of stub_out()
  • Use standard threading instead of eventlet (eventlet is being removed from OpenStack)
  • Focus on single database (multi-DB like CellDatabases is Nova-specific)

1. Base Test Class

File: mystique/tests/functional/base.py

"""Base classes for Mystique functional tests."""

import fixtures
from oslo_config import cfg
from oslo_log import log as logging
from oslotest import base

from mystique import context
from mystique import rpc
from mystique.tests import local_fixtures as mystique_fixtures


CONF = cfg.CONF
LOG = logging.getLogger(__name__)


class MystiqueFunctionalTestCase(base.BaseTestCase):
    """Base class for Mystique functional tests.
    
    Sets up:
    - Database (oslo.db with SQLite)
    - RPC (oslo.messaging with fake driver)
    - Notifications
    - API server
    - External service mocks
    """
    
    # Class attributes
    USES_DB = True
    STUB_RPC = True
    
    def setUp(self):
        super(MystiqueFunctionalTestCase, self).setUp()
        
        # Configuration
        self.useFixture(mystique_fixtures.ConfFixture(CONF))
        
        # Database
        if self.USES_DB:
            self.useFixture(mystique_fixtures.Database())
        
        # RPC
        if self.STUB_RPC:
            self.useFixture(mystique_fixtures.RPCFixture())
            CONF.set_default('driver', ['test'],
                             group='oslo_messaging_notifications')
        
        # Notifications
        self.notifier = self.useFixture(
            mystique_fixtures.NotificationFixture(self))
        
        # External services
        self.neutron = self.useFixture(
            mystique_fixtures.NeutronFixture(self))
        self.nova = self.useFixture(
            mystique_fixtures.NovaFixture(self))
        
        # API
        self.api_fixture = self.useFixture(
            mystique_fixtures.APIFixture())
        self.api = self.api_fixture.api
        self.admin_api = self.api_fixture.admin_api
        
        # Start services
        self.start_service('mystique-server')
    
    def flags(self, **kw):
        """Override flag variables for a test.
        
        Example:
            self.flags(enabled_filters=['FilterA', 'FilterB'],
                       group='scheduler')
        """
        group = kw.pop('group', None)
        for k, v in kw.items():
            CONF.set_override(k, v, group)
            # Register cleanup to restore original value
            self.addCleanup(CONF.clear_override, k, group)
    
    def start_service(self, name, host=None, **kwargs):
        """Start a Mystique service."""
        if host is not None:
            self.flags(host=host)
        
        svc = self.useFixture(
            mystique_fixtures.ServiceFixture(name, host, **kwargs))
        return svc.service

2. Configuration Fixture

File: mystique/tests/local_fixtures/conf.py

"""Configuration fixture for Mystique tests."""

from oslo_config import cfg
from oslo_config import fixture as config_fixture

from mystique import config
from mystique import paths


CONF = cfg.CONF


class ConfFixture(config_fixture.Config):
    """Fixture to manage global conf settings."""
    
    def setUp(self):
        super(ConfFixture, self).setUp()
        
        # Default group
        self.conf.set_default('debug', True)
        self.conf.set_default('host', 'test-host')
        
        # Database group
        self.conf.set_default('connection', 'sqlite://', group='database')
        self.conf.set_default('sqlite_synchronous', False, group='database')
        
        # API group
        self.conf.set_default('api_workers', 1, group='api')
        
        # Parse args with no config files
        config.parse_args([], default_config_files=[], configure_db=False,
                          init_rpc=False)


class ConfPatcher(fixtures.Fixture):
    """Fixture to patch and restore global CONF."""
    
    def __init__(self, **kwargs):
        super(ConfPatcher, self).__init__()
        self.group = kwargs.pop('group', None)
        self.args = kwargs
    
    def setUp(self):
        super(ConfPatcher, self).setUp()
        for k, v in self.args.items():
            self.addCleanup(CONF.clear_override, k, self.group)
            CONF.set_override(k, v, self.group)

3. Database Fixture

File: mystique/tests/local_fixtures/database.py

Note: This is the single database case. If your project needs multiple databases (like Nova's cells), you would need a more complex fixture, but that's uncommon.

"""Database fixture for Mystique tests."""

import fixtures
from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import test_fixtures as db_fixtures

from mystique.db import api as db_api
from mystique.db import migration


CONF = cfg.CONF
DB_SCHEMA = {}  # Schema cache


class Database(fixtures.Fixture):
    """Create a database fixture with SQLite."""
    
    def __init__(self):
        super().__init__()
    
    def setUp(self):
        super().setUp()
        
        # Inject a new factory for each test
        new_engine = enginefacade.transaction_context()
        self.useFixture(
            db_fixtures.ReplaceEngineFacadeFixture(
                db_api.context_manager, new_engine))
        db_api.configure(CONF)
        
        self.get_engine = db_api.get_engine
        self._apply_schema()
        self.addCleanup(self.cleanup)
    
    def _apply_schema(self):
        """Apply database schema (cached for speed)."""
        global DB_SCHEMA
        if not DB_SCHEMA:
            engine = self.get_engine()
            conn = engine.connect()
            migration.db_sync()
            DB_SCHEMA = "".join(
                line for line in conn.connection.iterdump())
        else:
            engine = self.get_engine()
            conn = engine.connect()
            conn.connection.executescript(DB_SCHEMA)
    
    def cleanup(self):
        engine = self.get_engine()
        engine.dispose()

4. RPC Fixture

File: mystique/tests/local_fixtures/rpc.py

"""RPC fixture for Mystique tests."""

import fixtures
from oslo_config import cfg
import oslo_messaging as messaging
from oslo_messaging import conffixture as messaging_conffixture

from mystique import rpc


CONF = cfg.CONF


class RPCFixture(fixtures.Fixture):
    """Set up RPC with the fake:// transport for testing."""
    
    def __init__(self, *exmods):
        super(RPCFixture, self).__init__()
        self.exmods = []
        self.exmods.extend(exmods)
        self._buses = {}
    
    def _fake_create_transport(self, url):
        """Create or return cached fake transport."""
        url = None  # Collapse all to single bus
        
        if url not in self._buses:
            exmods = rpc.get_allowed_exmods()
            self._buses[url] = messaging.get_rpc_transport(
                CONF,
                url=url,
                allowed_remote_exmods=exmods)
        return self._buses[url]
    
    def setUp(self):
        super(RPCFixture, self).setUp()
        self.addCleanup(rpc.cleanup)
        
        rpc.add_extra_exmods(*self.exmods)
        self.addCleanup(rpc.clear_extra_exmods)
        
        self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
        self.messaging_conf.transport_url = 'fake:/'
        self.useFixture(self.messaging_conf)
        
        self.useFixture(fixtures.MonkeyPatch(
            'mystique.rpc.create_transport', self._fake_create_transport))
        
        from unittest import mock
        with mock.patch('mystique.rpc.get_transport_url') as mock_gtu:
            mock_gtu.return_value = None
            rpc.init(CONF)
        
        def cleanup_in_flight_rpc_messages():
            messaging._drivers.impl_fake.FakeExchangeManager._exchanges = {}
        
        self.addCleanup(cleanup_in_flight_rpc_messages)

5. Notification Fixture

File: mystique/tests/local_fixtures/notifications.py

Note: Uses standard threading instead of eventlet (which is being removed from OpenStack).

"""Notification fixture for Mystique tests."""

import collections
import functools
import threading

import fixtures
from oslo_log import log as logging
import oslo_messaging
from oslo_serialization import jsonutils
from oslo_utils import timeutils

from mystique import rpc


LOG = logging.getLogger(__name__)


class _Sub(object):
    """Subscription helper for waiting on notifications."""
    
    def __init__(self):
        self._cond = threading.Condition()
        self._notifications = []
    
    def received(self, notification):
        with self._cond:
            self._notifications.append(notification)
            self._cond.notify_all()
    
    def wait_n(self, n, event, timeout):
        """Wait until at least n notifications have been received."""
        with timeutils.StopWatch(timeout) as timer:
            with self._cond:
                while len(self._notifications) < n:
                    if timer.expired():
                        raise AssertionError(
                            "Notification %s not received" % event)
                    self._cond.wait(timer.leftover())
                
                return list(self._notifications)


class FakeVersionedNotifier(object):
    """Captures versioned notifications."""
    
    def __init__(self, transport, publisher_id, serializer=None):
        self.transport = transport
        self.publisher_id = publisher_id
        self._serializer = serializer or \
            oslo_messaging.serializer.NoOpSerializer()
        self.versioned_notifications = []
        self.subscriptions = collections.defaultdict(_Sub)
        
        for priority in ['debug', 'info', 'warn', 'error', 'critical']:
            setattr(
                self, priority,
                functools.partial(self._notify, priority.upper()))
    
    def prepare(self, publisher_id=None):
        if publisher_id is None:
            publisher_id = self.publisher_id
        return self.__class__(
            self.transport, publisher_id, serializer=self._serializer)
    
    def _notify(self, priority, ctxt, event_type, payload):
        """Capture notification."""
        payload = self._serializer.serialize_entity(ctxt, payload)
        jsonutils.to_primitive(payload)  # Verify serialization
        
        notification = {
            'publisher_id': self.publisher_id,
            'priority': priority,
            'event_type': event_type,
            'payload': payload,
        }
        self.versioned_notifications.append(notification)
        self.subscriptions[event_type].received(notification)
    
    def wait_for_versioned_notifications(
        self, event_type, n_events=1, timeout=10.0
    ):
        """Wait for notifications with timeout."""
        return self.subscriptions[event_type].wait_n(
            n_events, event_type, timeout)
    
    def reset(self):
        self.versioned_notifications.clear()
        self.subscriptions.clear()


class NotificationFixture(fixtures.Fixture):
    """Fixture to capture oslo.messaging notifications."""
    
    def __init__(self, test):
        self.test = test
    
    def setUp(self):
        super().setUp()
        self.addCleanup(self.reset)
        
        self.fake_versioned_notifier = FakeVersionedNotifier(
            rpc.NOTIFIER.transport,
            rpc.NOTIFIER.publisher_id,
            serializer=getattr(rpc.NOTIFIER, '_serializer', None))
        
        if rpc.NOTIFIER:
            self.test.useFixture(fixtures.MonkeyPatch(
                'mystique.rpc.NOTIFIER', 
                self.fake_versioned_notifier))
    
    def reset(self):
        self.fake_versioned_notifier.reset()
    
    def wait_for_versioned_notifications(
        self, event_type, n_events=1, timeout=10.0
    ):
        return self.fake_versioned_notifier.wait_for_versioned_notifications(
            event_type, n_events, timeout)
    
    @property
    def versioned_notifications(self):
        return self.fake_versioned_notifier.versioned_notifications

6. External Service Fixtures

File: mystique/tests/local_fixtures/neutron.py

"""Neutron fixture for Mystique tests."""

import copy
import fixtures

from mystique.tests.local_fixtures import conf as conf_fixtures


class NeutronFixture(fixtures.Fixture):
    """Mock Neutron API for Mystique tests."""
    
    def __init__(self, test):
        super().__init__()
        self.test = test
        self._networks = {}
        self._ports = {}
    
    def setUp(self):
        super().setUp()
        
        # Mock Neutron client using fixtures.MonkeyPatch
        self.test.useFixture(fixtures.MonkeyPatch(
            'mystique.network.neutron.get_client',
            self._get_client))
    
    def _get_client(self, context, admin=False):
        """Return fake Neutron client."""
        return _FakeNeutronClient(self)
    
    def create_network(self, body):
        """Mock network creation."""
        network_req = body.get('network')
        network_id = network_req.get('id') or str(uuid.uuid4())
        
        network = {
            'id': network_id,
            'name': network_req.get('name'),
            'status': 'ACTIVE',
            'admin_state_up': network_req.get('admin_state_up', True),
        }
        
        self._networks[network_id] = network
        return {'network': copy.deepcopy(network)}
    
    def show_network(self, network_id):
        """Mock get network."""
        if network_id not in self._networks:
            raise Exception('NetworkNotFound')
        return {'network': copy.deepcopy(self._networks[network_id])}
    
    def list_networks(self, **filters):
        """Mock list networks."""
        networks = list(self._networks.values())
        return {'networks': copy.deepcopy(networks)}


class _FakeNeutronClient:
    """Fake Neutron client wrapper."""
    
    def __init__(self, fixture):
        self.fixture = fixture
    
    def create_network(self, body):
        return self.fixture.create_network(body)
    
    def show_network(self, network_id):
        return self.fixture.show_network(network_id)
    
    def list_networks(self, **filters):
        return self.fixture.list_networks(**filters)

7. API Fixture

File: mystique/tests/local_fixtures/api.py

"""API fixture for Mystique tests."""

import fixtures
from oslo_utils.fixture import uuidsentinel
from wsgi_intercept import interceptor

from mystique.api import wsgi
from mystique.tests.local_fixtures import conf as conf_fixtures
from mystique.tests.functional.api import client


class APIFixture(fixtures.Fixture):
    """Create a Mystique API server as a fixture."""
    
    def __init__(self, api_version='v1'):
        super(APIFixture, self).__init__()
        self.api_version = api_version
    
    def setUp(self):
        super(APIFixture, self).setUp()
        
        hostname = uuidsentinel.api_host
        port = 80
        service_name = 'mystique_api'
        endpoint = 'http://%s:%s/' % (hostname, port)
        
        self.useFixture(conf_fixtures.ConfPatcher(debug=True))
        
        # Load WSGI app
        loader = wsgi.Loader().load_app(service_name)
        app = lambda: loader
        
        # Install wsgi-intercept
        intercept = interceptor.RequestsInterceptor(app, url=endpoint)
        intercept.install_intercept()
        self.addCleanup(intercept.uninstall_intercept)
        
        # Create API clients
        base_url = 'http://%(host)s:%(port)s/%(api_version)s' % {
            'host': hostname, 'port': port, 'api_version': self.api_version}
        
        self.api = client.TestMystiqueClient('user', base_url)
        self.admin_api = client.TestMystiqueClient('admin', base_url, 
                                                    is_admin=True)

8. Service Fixture

File: mystique/tests/local_fixtures/service.py

"""Service fixture for Mystique tests."""

import fixtures
from unittest import mock

from mystique import context
from mystique import service


class ServiceFixture(fixtures.Fixture):
    """Run a Mystique service as a test fixture.
    
    Services are started in background threads (not eventlet greenthreads,
    as eventlet is being removed from OpenStack).
    """
    
    def __init__(self, name, host=None, **kwargs):
        self.name = name
        self.host = host or name
        kwargs.setdefault('host', self.host)
        kwargs.setdefault('binary', 'mystique-%s' % name)
        self.kwargs = kwargs
    
    def setUp(self):
        super(ServiceFixture, self).setUp()
        self.ctxt = context.get_admin_context()
        
        # Use fixtures.MonkeyPatch instead of mock.patch context manager
        mock_ctx = mock.MagicMock(return_value=self.ctxt)
        self.useFixture(fixtures.MonkeyPatch(
            'mystique.context.get_admin_context',
            mock_ctx))
        
        self.service = service.Service.create(**self.kwargs)
        self.service.start()
        self.addCleanup(self.service.kill)

9. Complete Test Example

File: mystique/tests/functional/test_networks.py

"""Functional tests for Mystique network operations."""

from mystique.tests.functional import base


class NetworkTestCase(base.MystiqueTestCase):
    """Test network creation and management."""
    
    def test_create_network(self):
        """Test basic network creation."""
        # Create network via API
        network_req = {
            'name': 'test-network',
            'admin_state_up': True,
        }
        network = self.api.create_network(network_req)
        self.assertEqual('ACTIVE', network['status'])
        
        # Verify notification
        notifications = self.notifier.wait_for_versioned_notifications(
            'network.create.end', n_events=1, timeout=10.0)
        self.assertEqual(1, len(notifications))
        self.assertEqual(network['id'],
            notifications[0]['payload']['network_id'])

10. tox.ini

[tox]
minversion = 3.18.0
envlist = py3,functional,pep8

[testenv]
usedevelop = True
setenv =
  VIRTUAL_ENV={envdir}
  OS_STDOUT_CAPTURE=1
  OS_STDERR_CAPTURE=1
  OS_TEST_TIMEOUT=160
deps =
  -r{toxinidir}/test-requirements.txt
commands =
  stestr run {posargs}

[testenv:functional]
description = Run functional tests
setenv =
  {[testenv]setenv}
commands =
  stestr --test-path=./mystique/tests/functional run {posargs}

11. .zuul.yaml

- job:
    name: mystique-functional
    parent: openstack-tox-functional-py312
    description: Run Mystique functional tests
    required-projects:
      - openstack/mystique
    irrelevant-files:
      - ^.*\.rst$
      - ^doc/.*$
      - ^releasenotes/.*$
    vars:
      zuul_work_dir: src/opendev.org/openstack/mystique
      tox_envlist: functional
    timeout: 1800

Summary

This guide provides a complete blueprint for replicating Nova's functional test infrastructure in any OpenStack project. The key components are:

Core Layers

  1. Base Test Class (oslotest.base.BaseTestCase)

    • Test isolation and cleanup
    • Fixture management
  2. Configuration (oslo.config)

    • Test-specific defaults
    • Per-test overrides
  3. Database (oslo.db)

    • In-memory SQLite
    • Schema caching
    • Multi-database support (cells)
  4. RPC/Messaging (oslo.messaging)

    • Fake transport
    • Synchronous casts
    • Message capture
  5. Notifications (oslo.messaging)

    • Versioned notification capture
    • Subscription and waiting
  6. External Services

    • Mocked at API boundary
    • Stateful tracking
    • Realistic behavior
  7. API Testing

    • Real WSGI app
    • wsgi-intercept
    • Multiple clients (admin, user, reader)
  8. Service Lifecycle

    • Start/stop services
    • Service fixtures
    • Clean shutdown

Best Practices

  • Speed: Schema caching, in-memory databases, synchronous RPC
  • Isolation: Fresh state per test, cleanup via fixtures
  • Realism: Real API code, realistic mocks for external services
  • Debuggability: OS_DEBUG=1 for verbose logs, clear error messages
  • Maintainability: Fixtures, not inheritance; composition, not duplication

Files to Create

  1. <project>/tests/functional/base.py - Base test class
  2. <project>/tests/local_fixtures/__init__.py - Package init
  3. <project>/tests/local_fixtures/conf.py - Configuration fixtures
  4. <project>/tests/local_fixtures/database.py - Database fixture (single DB)
  5. <project>/tests/local_fixtures/rpc.py - RPC fixtures
  6. <project>/tests/local_fixtures/notifications.py - Notification fixtures
  7. <project>/tests/local_fixtures/<service>.py - External service mocks
  8. <project>/tests/local_fixtures/api.py - API fixture
  9. <project>/tests/local_fixtures/service.py - Service fixture
  10. <project>/tests/functional/test_*.py - Actual tests
  11. tox.ini - Tox environment configuration
  12. .zuul.yaml - CI job configuration

Important: Use local_fixtures/ instead of fixtures/ to avoid import conflicts with the fixtures package when debuggers add <project>/tests to their import path.

This infrastructure enables fast, isolated, comprehensive functional testing without requiring external dependencies or complex deployment.


Appendix: Key Implementation Patterns

A. Using fixtures.MonkeyPatch Instead of stub_out

Nova historically used a stub_out() helper, but this can be replaced with the fixtures library:

# Nova's stub_out (for reference)
def stub_out(self, old, new):
    """Replace a function for the duration of the test."""
    self.useFixture(fixtures.MonkeyPatch(old, new))

# Modern approach - use fixtures.MonkeyPatch directly
class MyTest(base.BaseTestCase):
    def test_something(self):
        # Instead of: self.stub_out('module.function', fake_function)
        # Use:
        self.useFixture(fixtures.MonkeyPatch(
            'module.function', fake_function))

B. Implementing self.flags() Helper

The flags() helper makes it easy to override configuration options with automatic cleanup:

from oslo_config import cfg

CONF = cfg.CONF


class BaseTestCase(oslotest.base.BaseTestCase):
    """Base test class with configuration helper."""
    
    def flags(self, **kw):
        """Override flag variables for a test.
        
        Automatically registers cleanup to restore original values.
        
        Examples:
            self.flags(debug=True)
            self.flags(enabled_filters=['FilterA', 'FilterB'],
                       group='scheduler')
        
        :param kw: Keyword arguments where keys are config option names
                   and values are the values to set. Special keyword 'group'
                   specifies the config group (defaults to DEFAULT).
        """
        group = kw.pop('group', None)
        for k, v in kw.items():
            CONF.set_override(k, v, group)
            # Automatically clean up after the test
            self.addCleanup(CONF.clear_override, k, group)

C. Threading vs Eventlet

Since eventlet is being removed from OpenStack, use standard threading:

import threading
import queue

# OLD (eventlet) - Don't use
import eventlet
gt = eventlet.spawn(my_function, arg1, arg2)
result = gt.wait()

# NEW (threading) - Use this
import concurrent.futures

# For single background task
thread = threading.Thread(target=my_function, args=(arg1, arg2))
thread.start()
thread.join()

# For thread pool
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    future = executor.submit(my_function, arg1, arg2)
    result = future.result(timeout=30)

In Tests:

class MyServiceFixture(fixtures.Fixture):
    """Run a service in a background thread."""
    
    def setUp(self):
        super().setUp()
        self.service = MyService()
        
        # Start service in thread (not eventlet greenthread)
        self.thread = threading.Thread(
            target=self.service.start,
            daemon=True)  # Daemon thread dies when main thread exits
        self.thread.start()
        
        # Register cleanup
        self.addCleanup(self._cleanup)
    
    def _cleanup(self):
        """Stop the service and join the thread."""
        self.service.stop()
        self.thread.join(timeout=10)
        if self.thread.is_alive():
            # Log warning but don't fail - daemon threads will die anyway
            LOG.warning('Service thread did not stop cleanly')

D. Single Database Pattern (Common Case)

Most OpenStack projects only need a single database. Here's the complete pattern:

"""Database fixture for single database projects."""

import fixtures
from oslo_config import cfg
from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import test_fixtures as db_fixtures

from myproject.db import api as db_api
from myproject.db import migration


CONF = cfg.CONF
DB_SCHEMA = {}  # Global cache for schema


class Database(fixtures.Fixture):
    """Create a single database fixture with SQLite.
    
    This is the common case for most OpenStack projects.
    If you need multiple databases (like Nova's cells),
    you'll need a more complex fixture.
    """
    
    def __init__(self):
        super().__init__()
    
    def setUp(self):
        super().setUp()
        
        # Create a new transaction context for this test
        new_engine = enginefacade.transaction_context()
        
        # Replace the global context manager with test-specific one
        self.useFixture(
            db_fixtures.ReplaceEngineFacadeFixture(
                db_api.context_manager, new_engine))
        
        # Configure database with test settings
        db_api.configure(CONF)
        
        self.get_engine = db_api.get_engine
        self._apply_schema()
        self.addCleanup(self.cleanup)
    
    def _apply_schema(self):
        """Apply database schema using cached SQL for speed."""
        global DB_SCHEMA
        
        if not DB_SCHEMA:
            # First test: run migrations and cache result
            engine = self.get_engine()
            conn = engine.connect()
            
            # Run migrations to create schema
            migration.db_sync()
            
            # Cache the schema as SQL statements
            # (SQLite-specific: iterdump() returns SQL statements)
            DB_SCHEMA = "".join(
                line for line in conn.connection.iterdump())
        else:
            # Subsequent tests: apply cached schema (much faster!)
            engine = self.get_engine()
            conn = engine.connect()
            conn.connection.executescript(DB_SCHEMA)
    
    def cleanup(self):
        """Dispose of database engine."""
        engine = self.get_engine()
        engine.dispose()


# Usage in tests
class MyTestCase(oslotest.base.BaseTestCase):
    def setUp(self):
        super().setUp()
        self.useFixture(Database())
        # Now you can use the database!

E. Complete init.py for local_fixtures

File: <project>/tests/local_fixtures/__init__.py

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""Local fixtures for project functional tests.

This package is named 'local_fixtures' instead of 'fixtures' to avoid
import conflicts with the 'fixtures' package when debuggers add the
tests directory to sys.path for test discovery.
"""

# Import commonly used fixtures for convenience
from .api import APIFixture  # noqa: F401
from .conf import ConfFixture  # noqa: F401
from .conf import ConfPatcher  # noqa: F401
from .database import Database  # noqa: F401
from .neutron import NeutronFixture  # noqa: F401
from .notifications import NotificationFixture  # noqa: F401
from .rpc import RPCFixture  # noqa: F401
from .service import ServiceFixture  # noqa: F401

__all__ = [
    'APIFixture',
    'ConfFixture',
    'ConfPatcher',
    'Database',
    'NeutronFixture',
    'NotificationFixture',
    'RPCFixture',
    'ServiceFixture',
]

F. Avoiding the CellDatabases Complexity

Nova's CellDatabases fixture is complex because Nova has a unique architecture with multiple databases per deployment. Most OpenStack projects don't need this. Key differences:

Feature Nova (cells) Most Projects
Databases API DB + N cell DBs Single DB
Context targeting context.target_cell() Not needed
DB routing Dynamic based on instance Simple
Complexity High Low

When porting:

  1. Use the simple Database fixture (shown above)
  2. Skip CellDatabases entirely
  3. Skip context targeting logic
  4. If you really need multiple DBs, add them incrementally

Glossary of Terms

Fixture: A test component that provides setup/teardown with automatic cleanup. Part of the fixtures library.
Example: self.useFixture(NeutronFixture(self))

Mixin: A class that provides methods to be used by other classes through multiple inheritance.
Example: class MyTest(test.TestCase, HelperMixin)

oslo.messaging: OpenStack library for RPC and notifications. Provides abstraction over message transports (RabbitMQ, etc.)

Transport: The messaging backend (e.g., RabbitMQ, fake://). In functional tests, we use fake:// for synchronous in-memory messaging.

wsgi-intercept: Library that intercepts HTTP requests and routes them to WSGI applications in-process, avoiding real HTTP connections.

Cell: Nova concept for scaling. Most projects don't need this.

Placement: OpenStack service for tracking resource inventory and usage. Often needed in Nova tests.

RPC Cast vs Call:

  • Cast: Fire-and-forget, no return value, asynchronous
  • Call: Synchronous, waits for and returns response

Stestr: Test runner for OpenStack projects. Replaces older testtools and testr.

addCleanup: Fixture/test method that registers cleanup functions to run after test, even if test fails.


End of Guide

Version: 1.0
Last Updated: October 2025

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment