Version: 1.0
Date: October 2025
Author: AI-assisted analysis of Nova functional test infrastructure
This comprehensive guide documents Nova's functional test infrastructure and provides a blueprint for replicating this pattern in other OpenStack projects. It covers fixture architecture, RPC/messaging, database management, external service mocking, and CI/CD integration.
- Overview
- Quick Start for New Contributors
- Architecture Principles
- Base Test Classes
- Database Fixtures (oslo.db)
- RPC and Messaging Fixtures (oslo.messaging)
- Notification Fixtures
- Configuration Fixtures (oslo.config)
- External Service Fixtures
- API Fixtures
- Complete Test Example
- Regression Tests
- Creating Reusable Test Infrastructure
- Tox Environment Configuration
- Zuul CI Integration
- Porting to Another Project
- Glossary of Terms
Nova's functional test infrastructure is built on a layered fixture architecture that provides:
- In-memory SQLite databases for fast, isolated database testing
- Fake RPC transport (oslo.messaging) for synchronous message passing
- Mock external services (Cinder, Neutron, Glance, Placement)
- Real API server via WSGI intercept
- Versioned notification capture and verification
- Multi-cell database support
- Service lifecycle management
┌─────────────────────────────────────────────────────────────┐
│ Test Case (oslotest.base) │
├─────────────────────────────────────────────────────────────┤
│ Configuration (oslo.config) │
│ ├─ ConfFixture: defaults for tests │
│ └─ ConfPatcher: per-test overrides │
├─────────────────────────────────────────────────────────────┤
│ Database Layer (oslo_db.enginefacade) │
│ ├─ Database('api'): API DB (cell mappings, etc.) │
│ ├─ CellDatabases: Per-cell DB with routing │
│ └─ DB_SCHEMA cache: Fast schema application │
├─────────────────────────────────────────────────────────────┤
│ RPC/Messaging (oslo.messaging) │
│ ├─ RPCFixture: Fake transport (fake://) │
│ ├─ CastAsCallFixture: Synchronous RPC casts │
│ └─ CheatingSerializer: Preserve DB connection context │
├─────────────────────────────────────────────────────────────┤
│ Notifications (oslo.messaging) │
│ ├─ NotificationFixture: Capture notifications │
│ ├─ FakeNotifier: Legacy notification capture │
│ └─ FakeVersionedNotifier: Versioned notification queue │
├─────────────────────────────────────────────────────────────┤
│ External Service Mocking │
│ ├─ PlacementFixture: Real WSGI app with DB │
│ ├─ CinderFixture: Volume operations mock │
│ ├─ NeutronFixture: Network operations mock │
│ └─ GlanceFixture: Image service mock │
├─────────────────────────────────────────────────────────────┤
│ API Layer │
│ ├─ OSAPIFixture: WSGI app with wsgi-intercept │
│ └─ TestOpenStackClient: HTTP client for API calls │
├─────────────────────────────────────────────────────────────┤
│ Service Management │
│ ├─ ServiceFixture: Run nova-compute, nova-scheduler │
│ ├─ start_service(): Helper to start services │
│ └─ Service lifecycle: start, stop, restart │
└─────────────────────────────────────────────────────────────┘
If you're new to functional testing and want to get started quickly:
1. Copy this minimal functional test template:
from myproject import test
from myproject.tests import local_fixtures
class TestMyFeature(test.TestCase):
"""Test my new feature."""
def setUp(self):
super().setUp()
# Add fixtures you need
self.useFixture(local_fixtures.Database())
self.api = self.useFixture(local_fixtures.APIFixture()).api
def test_my_feature(self):
"""Test that my feature works."""
# Your test code here
pass2. Common questions answered:
- "How do I mock Neutron?" → See External Service Fixtures
- "How do I wait for something?" → See Creating Reusable Infrastructure
- "How do I test RPC?" → See RPC and Messaging Fixtures
3. Next steps after your first test:
- Read "Regression Tests" if fixing a bug
- Read "Creating Reusable Infrastructure" if writing multiple tests
- Read individual component sections as needed
4. Recommended reading order:
- Overview (5 minutes)
- Architecture Principles (10 minutes)
- Complete Test Example (15 minutes)
- Skip to "Porting to Another Project" for your project
Nova uses fixtures from the fixtures library extensively. Fixtures provide:
- Setup/Teardown: Automatic cleanup via
addCleanup() - Composition:
useFixture()allows nesting - Isolation: Each test gets fresh state
# SQLite in-memory with schema caching for speed
CONF.set_default('connection', "sqlite://", group='database')
CONF.set_default('connection', "sqlite://", group='api_database')# oslo.messaging fake driver for synchronous testing
transport_url = 'fake:/'External services are mocked at the API boundary, not with HTTP mocks:
# Stub the module-level API, not HTTP calls
self.test.stub_out('nova.volume.cinder.API.get', self.fake_get)Nova's multi-cell architecture is reflected in tests:
- API database: Cell mappings, host mappings
- Cell databases: Instance data per cell
- Context targeting:
context.target_cell()switches databases
The base test infrastructure starts with oslotest.base.BaseTestCase and layers Nova-specific functionality.
class TestCase(base.BaseTestCase):
"""Test case base class for all unit tests.
Due to the slowness of DB access, please consider deriving from
`NoDBTestCase` first.
"""
USES_DB = True
USES_DB_SELF = False
REQUIRES_LOCKING = False
STUB_RPC = True
NUMBER_OF_CELLS = 1
STUB_COMPUTE_ID = True
def setUp(self):
super(TestCase, self).setUp()
# Fixture for isolated greenpool per test
self.useFixture(nova_fixtures.IsolatedGreenPoolFixture(self.id()))
# Standard logging with DEBUG support (OS_DEBUG=1)
self.stdlog = self.useFixture(nova_fixtures.StandardLogging())
# Locking for tests that need it (deprecated pattern)
if self.REQUIRES_LOCKING:
lock_path = self.useFixture(fixtures.TempDir()).path
self.fixture = self.useFixture(
config_fixture.Config(lockutils.CONF))
self.fixture.config(lock_path=lock_path,
group='oslo_concurrency')
# Configuration defaults
self.useFixture(nova_fixtures.ConfFixture(CONF))
# RPC setup
if self.STUB_RPC:
self.useFixture(nova_fixtures.RPCFixture('nova.test'))
CONF.set_default('driver', ['test'],
group='oslo_messaging_notifications')
# Object indirection API (for RPC serialization)
objects_base.NovaObject.indirection_api = None
# Database setup
if self.USES_DB:
self.useFixture(nova_fixtures.Database(database='api'))
self._setup_cells()
self.useFixture(nova_fixtures.DefaultFlavorsFixture())
elif not self.USES_DB_SELF:
self.useFixture(nova_fixtures.SingleCellSimple())
self.useFixture(nova_fixtures.DatabasePoisonFixture())
# Policy fixture
self.policy = self.useFixture(nova_fixtures.PolicyFixture())
def _setup_cells(self):
"""Setup a normal cellsv2 environment.
This sets up the CellDatabase fixture with two cells, one cell0
and one normal cell. CellMappings are created for both so that
cells-aware code can find those two databases.
"""
celldbs = nova_fixtures.CellDatabases()
ctxt = context.get_context()
fake_transport = 'fake://nowhere/'
# cell0: special cell for instances that fail scheduling
c0 = objects.CellMapping(
context=ctxt,
uuid=objects.CellMapping.CELL0_UUID,
name='cell0',
transport_url=fake_transport,
database_connection=objects.CellMapping.CELL0_UUID)
c0.create()
self.cell_mappings[c0.name] = c0
celldbs.add_cell_database(objects.CellMapping.CELL0_UUID)
# cell1, cell2, ...: normal cells for instances
for x in range(self.NUMBER_OF_CELLS):
name = 'cell%i' % (x + 1)
uuid = getattr(uuids, name)
cell = objects.CellMapping(
context=ctxt,
uuid=uuid,
name=name,
transport_url=fake_transport,
database_connection=uuid)
cell.create()
self.cell_mappings[name] = cell
# cell1 is the default cell
celldbs.add_cell_database(uuid, default=(x == 0))
self.useFixture(celldbs)
def start_service(self, name, host=None, cell_name=None, **kwargs):
"""Start a Nova service (compute, conductor, scheduler, etc.)
:param name: Service name (compute, conductor, scheduler)
:param host: Hostname for the service
:param cell_name: Cell to run the service in
:returns: The service object
"""
cell = None
if host is not None:
self.useFixture(nova_fixtures.ConfPatcher(host=host))
if name == 'compute' and self.USES_DB:
ctxt = context.get_context()
cell_name = cell_name or 'cell1'
cell = self.cell_mappings[cell_name]
if (host or name) not in self.host_mappings:
hm = objects.HostMapping(context=ctxt,
host=host or name,
cell_mapping=cell)
hm.create()
self.host_mappings[hm.host] = hm
svc = self.useFixture(
nova_fixtures.ServiceFixture(name, host, cell=cell, **kwargs))
return svc.serviceclass NoDBTestCase(TestCase):
"""Test case base class for tests that don't need the database.
This makes tests run significantly faster. If possible, all new tests
should derive from this class.
"""
USES_DB = FalseNova uses oslo.db's enginefacade for database abstraction with in-memory SQLite.
class Database(fixtures.Fixture):
"""Create a database fixture.
:param database: The type of database, 'main', or 'api'
:param connection: The connection string to use (default: sqlite://)
"""
def __init__(self, database='main', version=None, connection=None):
super().__init__()
assert database in {'main', 'api'}
self.database = database
self.version = version
self.connection = connection
def setUp(self):
super().setUp()
if self.database == 'main':
if self.connection is not None:
ctxt_mgr = main_db_api.create_context_manager(
connection=self.connection)
self.get_engine = ctxt_mgr.writer.get_engine
else:
# Inject a new factory for each test
new_engine = enginefacade.transaction_context()
self.useFixture(
db_fixtures.ReplaceEngineFacadeFixture(
main_db_api.context_manager, new_engine))
main_db_api.configure(CONF)
self.get_engine = main_db_api.get_engine
elif self.database == 'api':
new_engine = enginefacade.transaction_context()
self.useFixture(
db_fixtures.ReplaceEngineFacadeFixture(
api_db_api.context_manager, new_engine))
api_db_api.configure(CONF)
self.get_engine = api_db_api.get_engine
self._apply_schema()
self.addCleanup(self.cleanup)
def _apply_schema(self):
"""Apply database schema (cached for speed)"""
global DB_SCHEMA
if not DB_SCHEMA[(self.database, self.version)]:
# Apply and cache schema
engine = self.get_engine()
conn = engine.connect()
migration.db_sync(database=self.database, version=self.version)
# Cache the schema as SQL
DB_SCHEMA[(self.database, self.version)] = "".join(
line for line in conn.connection.iterdump())
else:
# Apply the cached schema (much faster!)
engine = self.get_engine()
conn = engine.connect()
conn.connection.executescript(
DB_SCHEMA[(self.database, self.version)])
def cleanup(self):
engine = self.get_engine()
engine.dispose()Key Points:
- Schema caching: The first test runs migrations, subsequent tests use cached SQL
- In-memory SQLite: Fast, isolated, no cleanup needed
- enginefacade: oslo.db abstraction for transaction management
class CellDatabases(fixtures.Fixture):
"""Create per-cell databases for testing.
Usage::
fix = CellDatabases()
fix.add_cell_database('connection1')
fix.add_cell_database('connection2', default=True)
self.useFixture(fix)
"""
def __init__(self):
self._ctxt_mgrs = {}
self._last_ctxt_mgr = None
self._default_ctxt_mgr = None
self._cell_lock = ReaderWriterLock()
def add_cell_database(self, connection_str, default=False):
"""Add a cell database to the fixture.
:param connection_str: Identifier for the database connection
:param default: Whether this is the default cell
"""
# Create a new context manager for the cell
ctxt_mgr = main_db_api.create_context_manager()
self._ctxt_mgrs[connection_str] = ctxt_mgr
# The first DB access is local, so initialize with this
self._last_ctxt_mgr = ctxt_mgr
if self._default_ctxt_mgr is None or default:
self._default_ctxt_mgr = ctxt_mgr
# Apply schema
def get_context_manager(context):
return ctxt_mgr
with fixtures.MonkeyPatch(
'nova.db.main.api.get_context_manager',
get_context_manager,
):
engine = ctxt_mgr.writer.get_engine()
engine.dispose()
self._cache_schema(connection_str)
conn = engine.connect()
conn.connection.executescript(DB_SCHEMA[('main', None)])
def _wrap_target_cell(self, context, cell_mapping):
"""Context manager for cell targeting.
This switches the global database state to point to the specified
cell, allowing compute node code to work without cell awareness.
"""
if cell_mapping:
desired = self._ctxt_mgrs[cell_mapping.database_connection]
else:
desired = self._default_ctxt_mgr
# Fast path: already in the right cell
with self._cell_lock.read_lock():
if self._last_ctxt_mgr == desired:
with self._real_target_cell(context, cell_mapping) as c:
yield c
return
# Switch cells with write lock
with self._cell_lock.write_lock():
if cell_mapping is not None:
self._last_ctxt_mgr = desired
# Yield with read lock (allows other threads to work)
with self._cell_lock.read_lock():
try:
with self._real_target_cell(context, cell_mapping) as ccontext:
yield ccontext
except Exception as exc:
raised_exc = exc
# Restore default
with self._cell_lock.write_lock():
self._last_ctxt_mgr = self._default_ctxt_mgr
if raised_exc:
raise raised_exc
def setUp(self):
super(CellDatabases, self).setUp()
self.addCleanup(self.cleanup)
self._real_target_cell = context.target_cell
# Monkey-patch database and RPC functions
self.useFixture(fixtures.MonkeyPatch(
'nova.db.main.api.get_context_manager',
self._wrap_get_context_manager))
self.useFixture(fixtures.MonkeyPatch(
'nova.context.target_cell',
self._wrap_target_cell))
self.useFixture(fixtures.MonkeyPatch(
'nova.rpc.get_server',
self._wrap_get_server))
self.useFixture(fixtures.MonkeyPatch(
'nova.rpc.get_client',
self._wrap_get_client))Key Points:
- Per-cell isolation: Each cell has its own in-memory database
- Context targeting:
context.target_cell()switches which DB to use - Thread-safe: ReaderWriterLock for concurrent cell access
- RPC multiplexing: Each cell can have its own RPC bus
class MyFunctionalTest(test.TestCase):
"""Example showing database fixture usage"""
def setUp(self):
super().setUp()
# Database is already set up by TestCase base class
# self.cell_mappings contains cell0 and cell1
def test_instance_in_cell(self):
"""Create an instance and verify it's in the database"""
ctxt = context.get_admin_context()
# Create an instance in cell1
with context.target_cell(ctxt, self.cell_mappings['cell1']) as cctxt:
instance = objects.Instance(
context=cctxt,
uuid=uuidutils.generate_uuid(),
project_id='fake-project')
instance.create()
# Verify it's not in cell0
with context.target_cell(ctxt, self.cell_mappings['cell0']) as cctxt:
self.assertRaises(
exception.InstanceNotFound,
objects.Instance.get_by_uuid,
cctxt, instance.uuid)Nova uses oslo.messaging with a fake driver for synchronous RPC testing.
class RPCFixture(fixtures.Fixture):
"""Set up RPC with the fake:// transport for testing."""
def __init__(self, *exmods):
super(RPCFixture, self).__init__()
self.exmods = []
self.exmods.extend(exmods)
self._buses = {}
def _fake_create_transport(self, url):
"""Create or return cached fake transport.
NOTE: Currently collapses all connections to a single bus.
This is how our tests expect things to work.
"""
url = None # Collapse all to single bus
if url not in self._buses:
exmods = rpc.get_allowed_exmods()
self._buses[url] = messaging.get_rpc_transport(
CONF,
url=url,
allowed_remote_exmods=exmods)
return self._buses[url]
def setUp(self):
super(RPCFixture, self).setUp()
self.addCleanup(rpc.cleanup)
# Register exception modules
rpc.add_extra_exmods(*self.exmods)
self.addCleanup(rpc.clear_extra_exmods)
# Configure fake transport
self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
self.messaging_conf.transport_url = 'fake:/'
self.useFixture(self.messaging_conf)
# Patch transport creation
self.useFixture(fixtures.MonkeyPatch(
'nova.rpc.create_transport', self._fake_create_transport))
# Initialize RPC
with mock.patch('nova.rpc.get_transport_url') as mock_gtu:
mock_gtu.return_value = None
rpc.init(CONF)
# Cleanup in-flight messages between tests
def cleanup_in_flight_rpc_messages():
messaging._drivers.impl_fake.FakeExchangeManager._exchanges = {}
self.addCleanup(cleanup_in_flight_rpc_messages)Configuration:
# In nova/tests/fixtures/conf.py (ConfFixture)
# The fake:// transport is set by RPCFixture
# No additional configuration neededclass CastAsCallFixture(fixtures.Fixture):
"""Make RPC casts behave as calls for synchronous testing.
Normally, RPC casts are fire-and-forget. This fixture makes them
synchronous by converting them to calls, making tests deterministic.
"""
def __init__(self, testcase):
super().__init__()
self.testcase = testcase
@staticmethod
def _stub_out(testcase, obj=None):
if obj:
orig_prepare = obj.prepare
else:
orig_prepare = messaging.RPCClient.prepare
def prepare(self, *args, **kwargs):
# Casts with fanout=True would throw errors if monkeypatched
# to call method, so we override fanout to False
if 'fanout' in kwargs:
kwargs['fanout'] = False
cctxt = orig_prepare(self, *args, **kwargs)
CastAsCallFixture._stub_out(testcase, cctxt) # Recurse!
return cctxt
if obj:
cls = getattr(sys.modules[obj.__class__.__module__],
obj.__class__.__name__)
testcase.stub_out('%s.%s.prepare' % (obj.__class__.__module__,
obj.__class__.__name__),
prepare)
testcase.stub_out('%s.%s.cast' % (obj.__class__.__module__,
obj.__class__.__name__),
cls.call)
else:
testcase.stub_out('oslo_messaging.RPCClient.prepare', prepare)
testcase.stub_out('oslo_messaging.RPCClient.cast',
messaging.RPCClient.call)
def setUp(self):
super().setUp()
self._stub_out(self.testcase)Usage:
class MyTest(test.TestCase):
CAST_AS_CALL = True # Default in _IntegratedTestBase
# Or manually:
def setUp(self):
super().setUp()
self.useFixture(nova_fixtures.CastAsCallFixture(self))class CheatingSerializer(rpc.RequestContextSerializer):
"""A messaging.RequestContextSerializer that helps with cells.
Our normal serializer does not pass db_connection and mq_connection,
for good reason. However, during tests, since we're all in the same
process, we want cell-targeted RPC calls to preserve these values.
"""
def serialize_context(self, context):
"""Serialize context with the db_connection inside."""
values = super(CheatingSerializer, self).serialize_context(context)
values['db_connection'] = context.db_connection
values['mq_connection'] = context.mq_connection
return values
def deserialize_context(self, values):
"""Deserialize context and honor db_connection if present."""
ctxt = super(CheatingSerializer, self).deserialize_context(values)
ctxt.db_connection = values.pop('db_connection', None)
ctxt.mq_connection = values.pop('mq_connection', None)
return ctxtThis is used automatically by CellDatabases fixture.
Nova uses versioned notifications (oslo.messaging). The fixture captures notifications for verification.
class NotificationFixture(fixtures.Fixture):
"""Fixture to capture oslo.messaging notifications."""
def __init__(self, test):
self.test = test
def setUp(self):
super().setUp()
self.addCleanup(self.reset)
# Create fake notifiers
self.fake_notifier = FakeNotifier(
rpc.LEGACY_NOTIFIER.transport,
rpc.LEGACY_NOTIFIER.publisher_id,
serializer=getattr(rpc.LEGACY_NOTIFIER, '_serializer', None))
self.fake_versioned_notifier = FakeVersionedNotifier(
rpc.NOTIFIER.transport,
rpc.NOTIFIER.publisher_id,
serializer=getattr(rpc.NOTIFIER, '_serializer', None),
test_case_id=self.test.id())
# Stub out the global notifiers
if rpc.LEGACY_NOTIFIER and rpc.NOTIFIER:
self.test.stub_out('nova.rpc.LEGACY_NOTIFIER', self.fake_notifier)
self.test.stub_out('nova.rpc.NOTIFIER',
self.fake_versioned_notifier)
def reset(self):
self.fake_notifier.reset()
self.fake_versioned_notifier.reset()
def wait_for_versioned_notifications(
self, event_type, n_events=1, timeout=10.0,
):
"""Wait for n_events of event_type to be emitted.
:param event_type: Notification event type (e.g., 'instance.create.end')
:param n_events: Number of events to wait for
:param timeout: Timeout in seconds
:returns: List of notification dicts
"""
return self.fake_versioned_notifier.wait_for_versioned_notifications(
event_type, n_events, timeout)
@property
def versioned_notifications(self):
"""List of all versioned notifications emitted."""
return self.fake_versioned_notifier.versioned_notifications
@property
def notifications(self):
"""List of all legacy notifications emitted."""
return self.fake_notifier.notificationsclass FakeVersionedNotifier(FakeNotifier):
"""Captures versioned notifications with subscription support."""
def __init__(
self, transport, publisher_id, serializer=None, parent=None,
test_case_id=None
):
super().__init__(
transport, publisher_id, serializer, test_case_id=test_case_id)
if parent:
self.versioned_notifications = parent.versioned_notifications
self.subscriptions = parent.subscriptions
else:
self.versioned_notifications = []
self.subscriptions = collections.defaultdict(_Sub)
def _notify(self, priority, ctxt, event_type, payload):
"""Capture notification and notify subscribers."""
sender_test_case_id = self._get_sender_test_case_id()
# Prevent late notifications from finished tests
if sender_test_case_id != self.test_case_id:
raise RuntimeError(
'FakeVersionedNotifier received %s notification from '
'test case %s which differs from current test %s' %
(event_type, sender_test_case_id, self.test_case_id))
payload = self._serializer.serialize_entity(ctxt, payload)
notification = {
'publisher_id': self.publisher_id,
'priority': priority,
'event_type': event_type,
'payload': payload,
}
self.versioned_notifications.append(notification)
self.subscriptions[event_type].received(notification)
def wait_for_versioned_notifications(
self, event_type, n_events=1, timeout=10.0,
):
"""Wait for notifications with timeout."""
return self.subscriptions[event_type].wait_n(
n_events, event_type, timeout)class _Sub(object):
"""Allow a subscriber to efficiently wait for an event."""
def __init__(self):
self._cond = threading.Condition()
self._notifications = []
def received(self, notification):
with self._cond:
self._notifications.append(notification)
self._cond.notify_all()
def wait_n(self, n, event, timeout):
"""Wait until at least n notifications have been received."""
with timeutils.StopWatch(timeout) as timer:
with self._cond:
while len(self._notifications) < n:
if timer.expired():
raise AssertionError(
"Notification %s hasn't been received" % event)
self._cond.wait(timer.leftover())
return list(self._notifications)def test_instance_create_notification(self):
"""Verify instance.create.end notification is emitted."""
# Create instance via API
server_req = self._build_server()
server = self.api.post_server({'server': server_req})
# Wait for notification
notifications = self.notifier.wait_for_versioned_notifications(
'instance.create.end', n_events=1, timeout=10.0)
# Verify notification content
self.assertEqual(1, len(notifications))
self.assertEqual(server['id'],
notifications[0]['payload']['nova_object.data']['uuid'])Configuration uses oslo.config with test-specific defaults.
class ConfFixture(config_fixture.Config):
"""Fixture to manage global conf settings."""
def setUp(self):
super(ConfFixture, self).setUp()
# default group
self.conf.set_default('compute_driver', 'fake.SmallFakeDriver')
self.conf.set_default('host', 'fake-mini')
self.conf.set_default('periodic_enable', False)
# api_database group
self.conf.set_default('connection', "sqlite://", group='api_database')
self.conf.set_default('sqlite_synchronous', False,
group='api_database')
# database group
self.conf.set_default('connection', "sqlite://", group='database')
self.conf.set_default('sqlite_synchronous', False, group='database')
# key_manager group
self.conf.set_default('backend',
'nova.keymgr.conf_key_mgr.ConfKeyManager',
group='key_manager')
# wsgi group
self.conf.set_default('api_paste_config',
paths.state_path_def('etc/nova/api-paste.ini'),
group='wsgi')
# api group
self.conf.set_default('response_validation', 'error', group='api')
# notifications
self.conf.set_default(
'notification_format', "both", group="notifications")
# oslo.limit
self.conf.set_default('endpoint_id', 'ENDPOINT_ID', group='oslo_limit')
config.parse_args([], default_config_files=[], configure_db=False,
init_rpc=False)class ConfPatcher(fixtures.Fixture):
"""Fixture to patch and restore global CONF.
Usage::
self.useFixture(nova_fixtures.ConfPatcher(host='compute1'))
self.useFixture(nova_fixtures.ConfPatcher(
enabled_filters=['FilterA', 'FilterB'],
group='filter_scheduler'))
"""
def __init__(self, **kwargs):
super(ConfPatcher, self).__init__()
self.group = kwargs.pop('group', None)
self.args = kwargs
def setUp(self):
super(ConfPatcher, self).setUp()
for k, v in self.args.items():
self.addCleanup(CONF.clear_override, k, self.group)
CONF.set_override(k, v, self.group)def test_with_custom_config(self):
"""Test with custom scheduler configuration."""
# Override configuration for this test
self.flags(enabled_filters=['ComputeFilter', 'ImagePropertiesFilter'],
group='filter_scheduler')
self.flags(disk_allocation_ratio=2.0)
# Or use ConfPatcher
self.useFixture(nova_fixtures.ConfPatcher(
weight_classes=['nova.scheduler.weights.ram.RAMWeigher'],
group='filter_scheduler'))
# Configuration is automatically cleaned up after testFile: nova/tests/functional/fixtures.py
Placement is special: it runs a real WSGI app with a real database (imported from placement repo).
class PlacementFixture(placement_fixtures.PlacementFixture):
"""A fixture to run Placement operations.
Runs a local WSGI server with the Placement application using
NoAuth middleware.
"""
def setUp(self):
super(PlacementFixture, self).setUp()
# Fix socket options for wsgi-intercept
self.useFixture(fixtures.MonkeyPatch(
'keystoneauth1.session.TCPKeepAliveAdapter.init_poolmanager',
adapters.HTTPAdapter.init_poolmanager))
self._client = ka.Adapter(ks.Session(auth=None), raise_exc=False)
# Monkey-patch Nova's scheduler report client
self.useFixture(fixtures.MonkeyPatch(
'nova.scheduler.client.report.SchedulerReportClient.get',
self._fake_get))
self.useFixture(fixtures.MonkeyPatch(
'nova.scheduler.client.report.SchedulerReportClient.post',
self._fake_post))
self.useFixture(fixtures.MonkeyPatch(
'nova.scheduler.client.report.SchedulerReportClient.put',
self._fake_put))
self.useFixture(fixtures.MonkeyPatch(
'nova.scheduler.client.report.SchedulerReportClient.delete',
self._fake_delete))
self.api = PlacementApiClient(self)
def _fake_get(self, client, url, version=None, global_request_id=None):
headers = {'x-auth-token': self.token}
self._update_headers_with_version(headers, version)
return self._client.get(
url,
endpoint_override=self.endpoint,
headers=headers)Key Points:
- Uses placement.tests.functional.fixtures.PlacementFixture as base
- Real Placement WSGI app with SQLite database
- wsgi-intercept for HTTP calls
- NoAuth middleware (no Keystone)
File: nova/tests/fixtures/cinder.py
Cinder is mocked at the API layer with stateful volume tracking.
class CinderFixture(fixtures.Fixture):
"""A fixture to mock volume operations (Cinder v3 API)."""
# Volume IDs for common test scenarios
SWAP_OLD_VOL = 'a07f71dc-8151-4e7d-a0cc-cd24a3f11113'
SWAP_NEW_VOL = '227cc671-f30b-4488-96fd-7d0bf13648d8'
MULTIATTACH_VOL = '4757d51f-54eb-4442-8684-3399a6431f67'
IMAGE_BACKED_VOL = '6ca404f3-d844-4169-bb96-bc792f37de98'
def __init__(self, test, az='nova'):
super().__init__()
self.test = test
self.az = az
# State tracking
self.volumes = collections.defaultdict(dict)
self.volume_to_attachment = collections.defaultdict(dict)
def setUp(self):
super().setUp()
self._create_fakes()
def _create_fakes(self):
"""Stub out all nova.volume.cinder.API methods."""
self.useFixture(fixtures.MockPatch(
'nova.volume.cinder.API.attachment_create',
side_effect=self.fake_attachment_create, autospec=False))
self.useFixture(fixtures.MockPatch(
'nova.volume.cinder.API.attachment_update',
side_effect=self.fake_attachment_update, autospec=False))
self.useFixture(fixtures.MockPatch(
'nova.volume.cinder.API.attachment_delete',
side_effect=self.fake_attachment_delete, autospec=False))
self.useFixture(fixtures.MockPatch(
'nova.volume.cinder.API.get',
side_effect=self.fake_get, autospec=False))
# ... more methods
def fake_attachment_create(
self, context, volume_id, instance_uuid, connector=None,
mountpoint=None
):
"""Mock attachment_create."""
attachment_id = uuidutils.generate_uuid()
attachment = {
'id': attachment_id,
'connection_info': {
'driver_volume_type': 'fake_type',
'data': {'foo': 'bar'}
}
}
# Track attachment
self.volume_to_attachment[volume_id][attachment_id] = {
'id': attachment_id,
'instance_uuid': instance_uuid,
'connector': connector,
'status': 'reserved',
}
return attachment
def fake_get(self, context, volume_id, microversion=None):
"""Mock get volume."""
return {
'id': volume_id,
'status': 'available',
'size': 1,
'attach_time': '',
'availability_zone': self.az,
'attachments': {},
'multiattach': volume_id == self.MULTIATTACH_VOL,
}Key Points:
- Stateful: tracks volumes, attachments, snapshots
- Mocked at
nova.volume.cinder.API(not HTTP) - Supports multi-attach, volume swap, etc.
File: nova/tests/fixtures/neutron.py
Neutron is mocked with port, network, and subnet state.
class NeutronFixture(fixtures.Fixture):
"""A fixture to boot instances with neutron ports."""
tenant_id = nova_fixtures.PROJECT_ID
# Default network
network_1 = {
'id': '3cb9bc59-5699-4588-a4b1-b87f96708bc6',
'name': 'private',
'subnets': [],
'tenant_id': tenant_id,
'provider:network_type': 'vxlan',
}
def __init__(self, test):
super().__init__()
self.test = test
self._ports = {}
self._networks = {self.network_1['id']: self.network_1}
def setUp(self):
super().setUp()
# Disable vif_plugging_timeout for tests
self.test.flags(vif_plugging_timeout=0)
# Stub out Nova's Neutron API
self.test.stub_out(
'nova.network.neutron.get_client', self._get_client)
def _get_client(self, context, admin=False):
"""Return a fake Neutron client."""
admin = admin or context.is_admin and not context.auth_token
return _FakeNeutronClient(self, admin)
def create_port(self, body):
"""Mock neutronclient.v2_0.client.Client.create_port."""
port_req = body.get('port')
port_id = port_req.get('id') or uuidutils.generate_uuid()
port = {
'id': port_id,
'network_id': port_req['network_id'],
'tenant_id': port_req.get('tenant_id', self.tenant_id),
'mac_address': port_req.get(
'mac_address', 'fa:16:3e:xx:xx:xx'),
'fixed_ips': port_req.get('fixed_ips', []),
'status': 'ACTIVE',
'binding:vif_type': 'ovs',
}
self._ports[port_id] = port
return {'port': copy.deepcopy(port)}
def show_port(self, port_id, **_params):
"""Mock neutronclient.v2_0.client.Client.show_port."""
if port_id not in self._ports:
raise neutron_client_exc.PortNotFoundClient()
return {'port': copy.deepcopy(self._ports[port_id])}Key Points:
- Stateful: tracks ports, networks, subnets
- Supports port binding, security groups, QoS
- Mocked at
nova.network.neutron.get_client
File: nova/tests/fixtures/glance.py
Glance is mocked with in-memory image storage.
class GlanceFixture(fixtures.Fixture):
"""A fixture for simulating Glance."""
# Default test images
image1 = {
'id': '155d900f-4e14-4e4c-a73d-069cbf4541e6',
'name': 'fakeimage123456',
'created_at': '2011-01-01T01:02:03Z',
'updated_at': '2011-01-01T01:02:03Z',
'status': 'active',
'properties': {
'kernel_id': 'nokernel',
'ramdisk_id': 'nokernel',
},
'min_ram': 0,
'min_disk': 0,
'size': 25165824,
}
def __init__(self, test):
super().__init__()
self.test = test
self.images = {}
def setUp(self):
super().setUp()
# Configure Glance endpoint
self.test.useFixture(nova_fixtures.ConfPatcher(
group='glance', api_servers=['http://localhost:9292']))
# Stub out Glance API
self.test.stub_out(
'nova.image.glance.API.get_remote_image_service',
lambda context, image_href: (self, image_href))
self.test.stub_out(
'nova.image.glance.get_default_image_service',
lambda: self)
# Pre-create default images
self.create(None, self.image1)
# ... more images
self._imagedata = {}
def create(self, context, metadata, data=None):
"""Create an image."""
image_id = metadata.get('id', uuidutils.generate_uuid())
metadata['id'] = image_id
self.images[image_id] = copy.deepcopy(metadata)
return self.images[image_id]
def show(self, context, image_id, include_locations=False,
show_deleted=True):
"""Get image metadata."""
if image_id not in self.images:
raise exception.ImageNotFound(image_id=image_id)
return copy.deepcopy(self.images[image_id])Key Points:
- In-memory image storage
- Supports image metadata, downloads
- Mocked at
nova.image.glance.API
The API fixture runs a real Nova WSGI application using wsgi-intercept.
class OSAPIFixture(fixtures.Fixture):
"""Create an OS API server as a fixture.
This spawns an OS API server in a new greenthread. The fixture has
a .api parameter which is a simple REST client.
This fixture has the following clients:
self.api - Project user with "member" role
self.admin_api - Project user with "admin" role
self.reader_api - Project user with "reader" role
"""
def __init__(
self, api_version='v2', project_id=PROJECT_ID,
use_project_id_in_urls=False, stub_keystone=True,
):
super(OSAPIFixture, self).__init__()
self.api_version = api_version
self.project_id = project_id
self.use_project_id_in_urls = use_project_id_in_urls
self.stub_keystone = stub_keystone
def setUp(self):
super(OSAPIFixture, self).setUp()
# Unique hostname for wsgi-intercept
hostname = uuidsentinel.osapi_host
port = 80
service_name = 'osapi_compute'
endpoint = 'http://%s:%s/' % (hostname, port)
self.useFixture(ConfPatcher(debug=True))
if self.stub_keystone:
self._stub_keystone()
# Fix socket options for wsgi-intercept
self.useFixture(fixtures.MonkeyPatch(
'keystoneauth1.session.TCPKeepAliveAdapter.init_poolmanager',
adapters.HTTPAdapter.init_poolmanager))
# Load WSGI app
loader = wsgi.Loader().load_app(service_name)
app = lambda: loader
# Register service
wsgi_app._setup_service(CONF.host, service_name)
# Install wsgi-intercept
intercept = interceptor.RequestsInterceptor(app, url=endpoint)
intercept.install_intercept()
self.addCleanup(intercept.uninstall_intercept)
# Create API clients
base_url = 'http://%(host)s:%(port)s/%(api_version)s' % ({
'host': hostname, 'port': port, 'api_version': self.api_version})
if self.use_project_id_in_urls:
base_url += '/' + self.project_id
self.api = client.TestOpenStackClient(
'fake', base_url, project_id=self.project_id,
roles=['reader', 'member'])
self.admin_api = client.TestOpenStackClient(
'admin', base_url, project_id=self.project_id,
roles=['reader', 'member', 'admin'])
self.reader_api = client.TestOpenStackClient(
'reader', base_url, project_id=self.project_id,
roles=['reader'])
def _stub_keystone(self):
"""Stub out authentication middleware."""
self.useFixture(fixtures.MockPatch(
'keystonemiddleware.auth_token.filter_factory',
return_value=lambda _app: _app))
# Stub out context middleware
def fake_ctx(env, **kwargs):
user_id = env['HTTP_X_AUTH_USER']
project_id = env['HTTP_X_AUTH_PROJECT_ID']
is_admin = user_id == 'admin'
roles = env['HTTP_X_ROLES'].split(',')
return context.RequestContext(
user_id, project_id, is_admin=is_admin, roles=roles, **kwargs)
self.useFixture(fixtures.MonkeyPatch(
'nova.api.auth.NovaKeystoneContext._create_context', fake_ctx))Key Points:
- Real Nova WSGI app (not mocked!)
- wsgi-intercept: HTTP requests stay in-process
- Multiple clients with different roles
- NoAuth: Keystone middleware is stubbed
File: nova/tests/functional/api/client.py
class TestOpenStackClient(object):
"""Simple client for making Nova API requests."""
def __init__(self, user_id, base_url, project_id, roles=None):
self.user_id = user_id
self.project_id = project_id
self.roles = ','.join(roles or ['member'])
self.base_url = base_url
self.microversion = None
def api_request(self, url, method='GET', body=None, headers=None):
"""Make an API request."""
headers = headers or {}
headers.update({
'x-auth-user': self.user_id,
'x-auth-project-id': self.project_id,
'x-roles': self.roles,
})
if self.microversion:
headers['X-OpenStack-Nova-API-Version'] = self.microversion
full_url = self.base_url + url
if method == 'GET':
response = requests.get(full_url, headers=headers)
elif method == 'POST':
response = requests.post(full_url, json=body, headers=headers)
elif method == 'PUT':
response = requests.put(full_url, json=body, headers=headers)
elif method == 'DELETE':
response = requests.delete(full_url, headers=headers)
return response
def post_server(self, server_dict):
"""Create a server (instance)."""
response = self.api_request('/servers', method='POST',
body=server_dict)
return response.json()['server']
def get_server(self, server_id):
"""Get server details."""
response = self.api_request('/servers/%s' % server_id)
return response.json()['server']Here's a complete example showing how all fixtures work together:
"""Example functional test for server operations."""
from oslo_utils.fixture import uuidsentinel as uuids
from nova import context
from nova import objects
from nova import test
from nova.tests import fixtures as nova_fixtures
from nova.tests.functional import fixtures as func_fixtures
from nova.tests.functional import integrated_helpers
class ServerCreateTest(test.TestCase,
integrated_helpers.InstanceHelperMixin):
"""Test server create operations with full stack."""
# Class attributes
api_major_version = 'v2.1'
microversion = 'latest'
ADMIN_API = False
def setUp(self):
super(ServerCreateTest, self).setUp()
# Configuration
self.flags(compute_driver='fake.SmallFakeDriver')
# External service fixtures
self.useFixture(nova_fixtures.RealPolicyFixture())
self.glance = self.useFixture(nova_fixtures.GlanceFixture(self))
self.neutron = self.useFixture(nova_fixtures.NeutronFixture(self))
self.cinder = self.useFixture(nova_fixtures.CinderFixture(self))
self.placement = self.useFixture(
func_fixtures.PlacementFixture()).api
# Notification fixture
self.notifier = self.useFixture(
nova_fixtures.NotificationFixture(self))
# API fixture
self.api_fixture = self.useFixture(nova_fixtures.OSAPIFixture(
api_version='v2.1'))
self.api = self.api_fixture.api
self.api.microversion = self.microversion
self.admin_api = self.api_fixture.admin_api
self.admin_api.microversion = self.microversion
# Start services
self.start_service('conductor')
self.scheduler = self.start_service('scheduler')
self.compute = self.start_service('compute', host='compute1')
def _build_server(self, name='test-server', image_uuid=None,
flavor_id=None, networks=None):
"""Helper to build server request."""
return {
'name': name,
'imageRef': image_uuid or nova_fixtures.GlanceFixture.image1['id'],
'flavorRef': flavor_id or '1',
'networks': networks or [{'uuid':
nova_fixtures.NeutronFixture.network_1['id']}],
}
def _wait_for_state_change(self, server, expected_status, max_retries=50):
"""Helper to wait for server status."""
for i in range(max_retries):
server = self.api.get_server(server['id'])
if server['status'] == expected_status:
return server
if server['status'] == 'ERROR':
self.fail('Server went to ERROR state')
time.sleep(0.1)
self.fail('Timed out waiting for server to reach %s' % expected_status)
def test_create_server(self):
"""Test basic server creation."""
# Build server request
server_req = self._build_server()
# Create server via API
server = self.api.post_server({'server': server_req})
self.assertEqual('BUILD', server['status'])
# Wait for ACTIVE
server = self._wait_for_state_change(server, 'ACTIVE')
self.assertEqual('ACTIVE', server['status'])
# Verify in database
ctxt = context.get_admin_context()
instance = objects.Instance.get_by_uuid(ctxt, server['id'])
self.assertEqual('active', instance.vm_state)
self.assertEqual('compute1', instance.host)
# Verify notification was emitted
notifications = self.notifier.wait_for_versioned_notifications(
'instance.create.end', n_events=1, timeout=10.0)
self.assertEqual(1, len(notifications))
self.assertEqual(server['id'],
notifications[0]['payload']['nova_object.data']['uuid'])
def test_create_server_with_volume(self):
"""Test server creation with Cinder volume."""
# Create volume via Cinder fixture
volume_id = '9c6d9c2d-7a8f-4c80-938d-3bf062b8d489'
# Build server with block_device_mapping_v2
server_req = self._build_server()
server_req['block_device_mapping_v2'] = [{
'boot_index': 0,
'uuid': volume_id,
'source_type': 'volume',
'destination_type': 'volume',
}]
server_req['imageRef'] = '' # No image for boot-from-volume
# Create server
server = self.api.post_server({'server': server_req})
server = self._wait_for_state_change(server, 'ACTIVE')
# Verify volume attachment
attachments = self.cinder.volume_to_attachment[volume_id]
self.assertEqual(1, len(attachments))
# Get the attachment
attachment = list(attachments.values())[0]
self.assertEqual(server['id'], attachment['instance_uuid'])
self.assertIsNotNone(attachment['connector'])
def test_create_server_placement_allocation(self):
"""Test that placement allocations are created."""
server_req = self._build_server()
server = self.api.post_server({'server': server_req})
server = self._wait_for_state_change(server, 'ACTIVE')
# Get resource provider UUID for compute1
compute_rp_uuid = self._get_provider_uuid_by_host('compute1')
# Verify allocation in Placement
allocations_url = '/allocations/%s' % server['id']
allocations = self.placement.get(allocations_url).body
self.assertIn(compute_rp_uuid,
allocations['allocations'])
self.assertIn('VCPU',
allocations['allocations'][compute_rp_uuid]['resources'])
def test_delete_server(self):
"""Test server deletion."""
# Create server
server_req = self._build_server()
server = self.api.post_server({'server': server_req})
server = self._wait_for_state_change(server, 'ACTIVE')
# Delete server
self.api.delete_server(server['id'])
# Wait for it to be gone
self._wait_until_deleted(server)
# Verify notification
notifications = self.notifier.wait_for_versioned_notifications(
'instance.delete.end', n_events=1, timeout=10.0)
self.assertEqual(1, len(notifications))
# Verify allocation is cleaned up in Placement
allocations_url = '/allocations/%s' % server['id']
allocations = self.placement.get(allocations_url).body
self.assertEqual({}, allocations['allocations'])Regression tests are specialized functional tests designed to reproduce and prevent specific bugs from reoccurring. Nova maintains a dedicated nova/tests/functional/regressions/ directory for these tests, which provides long-term stability and clear documentation of historical bugs.
- Bug Reproduction First: Write the test to reproduce the bug before fixing it
- Explicit Dependencies: Make all dependencies clear in the test code. Reuse fixtures and helper functions, but do so explicitly rather than through deep inheritance chains
- Pragmatic Reuse: Use stable fixtures (e.g.,
NeutronFixture) and helpers (e.g.,InstanceHelperMixin) to avoid duplication, while keeping dependencies visible - Self-Contained Setup: Set up the full stack in
setUp()with explicit fixture declarations - Clear Documentation: Include detailed docstrings explaining the bug, its impact, and the fix
- Stable Over Time: Avoid hidden dependencies so tests remain valid as unrelated code evolves
================================
Tests for Specific Regressions
================================
When we have a bug reported by end users that we can write a full
stack reproduce on, we should. And we should keep a regression test
for that bug in our tree. It can be deleted at some future date if
needed, but largely should not be changed.
Writing Regression Tests
========================
- These should be full stack tests which inherit from
nova.test.TestCase directly or with explicit stable mixins
(e.g., InstanceHelperMixin). This prevents coupling with other tests
while allowing pragmatic reuse of stable helpers.
- They should setup a full stack cloud in their setUp via fixtures.
All fixture usage should be explicit in the test's setUp() method.
- Reusing stable fixtures (RealPolicyFixture, NeutronFixture, etc.) and
helper functions (integrated_helpers) is encouraged, but dependencies
must be explicit and visible in the test code.
- They should each live in a file which is named test_bug_######.py
- Avoid deep inheritance chains that hide dependencies. The goal is
stability: changes to unrelated test infrastructure should not break
regression tests.
Writing Tests Before the Bug is Fixed
=====================================
When possible, write the regression test to demonstrate the bug before
fixing it:
1. Write test that reproduces the broken behavior
2. Assert the current (broken) behavior
3. Comment out the expected (correct) assertions
4. Commit the test with "Related-Bug: #XXXXXX"
5. Fix the bug in production code
6. Update test: swap assertions (broken → commented, expected → active)
7. Commit with "Closes-Bug: #XXXXXX"
This approach provides clear documentation of the bug lifecycle and
verifies that the fix actually works.<project>/tests/functional/regressions/
├── __init__.py
├── README.rst
├── test_bug_1234567.py
├── test_bug_1234568.py
└── test_bug_1234569.py
- File:
test_bug_<launchpad_bug_id>.py - Class: Descriptive name explaining the bug scenario
- Test method:
test_<specific_scenario>
When a bug is reported, the first step is to write a regression test that reproduces the broken behavior. At this stage:
- Write the test to demonstrate the bug
- Include assertions that show the current broken behavior
- Add commented-out assertions that show the expected correct behavior
- Document the bug thoroughly in the docstring
Example: Pre-Fix State
File: nova/tests/functional/regressions/test_bug_1234567.py
# Copyright 2024 ACME Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova import test
from nova.tests import fixtures as nova_fixtures
class TestDeleteServerWithReservedVolumes(test.TestCase):
"""Regression test for bug #1234567.
When deleting a server that failed to schedule, reserved volumes
are not being cleaned up in Cinder, leaving volumes stuck in
'attaching' state.
The bug occurs because:
1. Server create fails to schedule (no valid host)
2. Server goes to ERROR state
3. User deletes the server
4. API deletes the server locally (no compute to contact)
5. Volume reservations are never cleaned up
Expected behavior:
- Volume attachments should be deleted during local delete
- Volumes should return to 'available' state
"""
def setUp(self):
super(TestDeleteServerWithReservedVolumes, self).setUp()
self.useFixture(nova_fixtures.RealPolicyFixture())
self.useFixture(nova_fixtures.NeutronFixture(self))
self.useFixture(nova_fixtures.GlanceFixture(self))
self.cinder = self.useFixture(nova_fixtures.CinderFixture(self))
api_fixture = self.useFixture(nova_fixtures.OSAPIFixture(
api_version='v2.1'))
self.api = api_fixture.api
self.start_service('conductor')
self.start_service('scheduler')
# NOTE: Intentionally NOT starting compute so instance fails to schedule
self.useFixture(nova_fixtures.CastAsCallFixture(self))
def test_delete_error_server_cleans_up_volume_attachments(self):
"""Test that volume attachments are cleaned up on local delete."""
# Use a pre-existing volume
volume_id = '9c6d9c2d-7a8f-4c80-938d-3bf062b8d489'
# Create a boot-from-volume server
server_req = {
'name': 'test-server',
'networks': 'none',
'block_device_mapping_v2': [{
'boot_index': 0,
'uuid': volume_id,
'source_type': 'volume',
'destination_type': 'volume',
}],
}
server = self.api.post_server({'server': server_req})
server_id = server['id']
# Wait for server to go to ERROR (no valid host)
server = self._wait_for_state_change(server, 'ERROR')
# Verify volume attachment was created
self.assertIn(volume_id,
self.cinder.volume_ids_for_instance(server_id))
# Delete the server (local delete since no compute)
self.api.delete_server(server_id)
self._wait_until_deleted(server)
# BUG: Volume attachment is NOT cleaned up
# This assertion demonstrates the broken behavior:
self.assertIn(volume_id,
self.cinder.volume_ids_for_instance(server_id))
# EXPECTED (commented out until bug is fixed):
# self.assertNotIn(volume_id,
# self.cinder.volume_ids_for_instance(server_id))
def _wait_for_state_change(self, server, expected_status):
"""Helper to wait for server status change."""
for i in range(50):
server = self.api.get_server(server['id'])
if server['status'] == expected_status:
return server
time.sleep(0.1)
self.fail('Timed out waiting for server %s' % expected_status)
def _wait_until_deleted(self, server):
"""Helper to wait for server deletion."""
# implementation omitted for brevity
passAt this stage, the test fails in the expected way, demonstrating the bug. This test is committed to the repository with:
git add nova/tests/functional/regressions/test_bug_1234567.py
git commit -m "Add regression test for bug 1234567
This test reproduces the issue where volume attachments are not
cleaned up when deleting a server that failed to schedule.
The test currently asserts the broken behavior and has the
expected correct assertion commented out. This will be updated
when the bug is fixed.
Related-Bug: #1234567"While fixing the bug, the regression test serves as verification that the fix works. The fix commit includes:
- The actual bug fix in the production code
- Update to the regression test: swap the assertions
Example: Fix Commit
Changes to: nova/tests/functional/regressions/test_bug_1234567.py
# Delete the server (local delete since no compute)
self.api.delete_server(server_id)
self._wait_until_deleted(server)
# BUG FIXED: Volume attachment is now cleaned up correctly
# Old assertion (demonstrating broken behavior):
# self.assertIn(volume_id,
# self.cinder.volume_ids_for_instance(server_id))
# Correct behavior:
self.assertNotIn(volume_id,
self.cinder.volume_ids_for_instance(server_id))The fix commit message:
git commit -m "Fix volume attachment cleanup on local delete
When a server fails to schedule and is then deleted, we perform
a local delete in the API. This was not cleaning up volume
attachments in Cinder, leaving volumes in 'attaching' state.
This fix ensures that during local delete, we iterate through
all volume attachments and delete them via the Cinder API.
The regression test is updated to assert the correct behavior.
Closes-Bug: #1234567"After the fix is merged, the regression test serves its long-term purpose: preventing the bug from reoccurring. If anyone modifies the delete path in a way that reintroduces the bug, the regression test will fail immediately in CI.
File: nova/tests/functional/regressions/test_bug_1404867.py
from nova.tests.functional import integrated_helpers
class DeleteWithReservedVolumes(integrated_helpers._IntegratedTestBase):
"""Test deleting of an instance in error state that has a reserved volume.
This test boots a server from volume which will fail to be scheduled,
ending up in ERROR state with no host assigned and then deletes the server.
Since the server failed to be scheduled, a local delete should run which
will make sure that reserved volumes at the API layer are properly cleaned
up.
The regression is that Nova would not clean up the reserved volumes and
the volume would be stuck in 'attaching' state.
"""
api_major_version = 'v2.1'
microversion = 'latest'
def _setup_compute_service(self):
# Override to NOT start compute, ensuring scheduling failure
pass
def test_delete_with_reserved_volumes_new(self):
# Create a server which should go to ERROR state
volume_id = nova_fixtures.CinderFixture.IMAGE_BACKED_VOL
server = self._create_error_server(volume_id)
server_id = server['id']
# Volume attachment should exist
self.assertIn(volume_id,
self.cinder.volume_ids_for_instance(server_id))
# Delete the server
self.api.delete_server(server['id'])
# The volume attachment should be cleaned up
self.assertNotIn(volume_id,
self.cinder.volume_ids_for_instance(server_id))Key Observations:
- Inherits from
_IntegratedTestBase: A stable base class from integrated_helpers that provides common setup - Explicit override:
_setup_compute_service()override makes the test's special requirements visible - Uses inherited helpers: Methods like
_create_error_server()from the base class (explicit reuse) - Clear documentation: Docstring explains the bug, its cause, and the fix
File: nova/tests/functional/regressions/test_bug_1522536.py
from nova import test
from nova.tests import fixtures as nova_fixtures
from nova.tests.functional.api import client
class TestServerGet(test.TestCase):
"""Regression test for bug #1522536.
Before fixing this bug, getting a numeric id caused a 500
error. After the fix it returns a 404, which is expected.
"""
REQUIRES_LOCKING = True
def setUp(self):
super(TestServerGet, self).setUp()
# Explicit fixture setup - clear what this test needs
self.useFixture(nova_fixtures.RealPolicyFixture())
self.useFixture(nova_fixtures.NeutronFixture(self))
self.useFixture(nova_fixtures.GlanceFixture(self))
api_fixture = self.useFixture(nova_fixtures.OSAPIFixture(
api_version='v2.1'))
self.api = api_fixture.api
# Explicit service startup
self.start_service('conductor')
self.start_service('scheduler')
self.compute = self.start_service('compute')
# Explicit helper fixture
self.useFixture(nova_fixtures.CastAsCallFixture(self))
self.image_id = self.api.get_images()[0]['id']
self.flavor_id = self.api.get_flavors()[0]['id']
def test_id_overlap(self):
"""Regression test for bug #1522536."""
server = dict(name='server1',
imageRef=self.image_id,
flavorRef=self.flavor_id)
self.api.post_server({'server': server})
# Should raise 404 (not 500) when getting numeric id
self.assertRaises(client.OpenStackApiNotFoundException,
self.api.get_server, 1)Key Observations:
- Direct inheritance: Only from
test.TestCase- minimal dependencies - All fixtures explicit: Every
useFixture()call is visible in setUp() - Reuses stable fixtures:
NeutronFixture,GlanceFixture,CastAsCallFixture- all explicit - Simple and clear: Anyone can understand dependencies by reading setUp()
File: nova/tests/functional/regressions/test_bug_1670627.py
This test ensures that when deleting a server from cell0 (failed to schedule), quota is properly decremented.
class TestDeleteFromCell0CheckQuota(test.TestCase):
"""Regression test for quota cleanup when deleting from cell0.
In Ocata, servers that fail to schedule are placed in cell0.
When deleted, quota was not being properly cleaned up because
the delete happened in cell0 but the quota reservation was in
the main database.
"""
def test_delete_error_instance_in_cell0_and_check_quota(self):
# Get starting quota
starting_usage = self.api.get_limits()
# Create server (will fail to schedule, go to ERROR in cell0)
server = self.api.post_server({'server': server_req})
self._wait_for_instance_status(server['id'], 'ERROR')
# Verify quota was incremented
current_usage = self.api.get_limits()
self.assertEqual(starting_usage['totalInstancesUsed'] + 1,
current_usage['totalInstancesUsed'])
# Delete the server
self.api.delete_server(server['id'])
self._wait_for_instance_delete(server['id'])
# BUG FIXED: Quota should be decremented
ending_usage = self.api.get_limits()
self.assertEqual(starting_usage['totalInstancesUsed'],
ending_usage['totalInstancesUsed'])Let's create a complete example for our fictional "mystique" networking service.
Scenario: Bug #7890123 - Network port not cleaned up when network delete fails
File: mystique/tests/functional/regressions/test_bug_7890123.py
# Copyright 2025 ACME Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Regression test for bug 7890123.
When deleting a network fails due to an external error (e.g., Neutron
timeout), any ports that were successfully deleted as part of the cascade
delete are not being restored or tracked properly.
This leaves the system in an inconsistent state where:
1. The network still exists in Mystique's database
2. The ports are deleted from Neutron
3. Users cannot reconnect to the network because port creation fails
Expected behavior:
- If network delete fails, rollback port deletions OR
- Mark ports as deleted in Mystique's database to reflect reality
"""
import fixtures
from unittest import mock
from mystique import test
from mystique.tests import local_fixtures as mystique_fixtures
class TestNetworkDeletePortCleanup(test.TestCase):
"""Regression test for bug #7890123.
This test verifies that when a network delete operation fails
after some ports have been deleted, the system handles the
inconsistency correctly.
"""
def setUp(self):
super(TestNetworkDeletePortCleanup, self).setUp()
# Configuration
self.useFixture(mystique_fixtures.ConfFixture())
# Database
self.useFixture(mystique_fixtures.Database())
# RPC
self.useFixture(mystique_fixtures.RPCFixture())
# External services
self.neutron = self.useFixture(
mystique_fixtures.NeutronFixture(self))
# API
self.api_fixture = self.useFixture(
mystique_fixtures.APIFixture())
self.api = self.api_fixture.api
self.admin_api = self.api_fixture.admin_api
# Start services
self.start_service('mystique-server')
def test_network_delete_failure_port_cleanup(self):
"""Test port cleanup when network delete fails.
This test demonstrates bug #7890123 where ports are left in an
inconsistent state when network deletion fails partway through.
"""
# Create a network
network_req = {
'name': 'test-network',
'admin_state_up': True,
}
network = self.api.create_network(network_req)
network_id = network['id']
# Create two ports on the network
port1 = self.api.create_port({
'network_id': network_id,
'name': 'test-port-1'
})
port2 = self.api.create_port({
'network_id': network_id,
'name': 'test-port-2'
})
# Verify ports exist
ports = self.api.list_ports(network_id=network_id)
self.assertEqual(2, len(ports))
# Mock Neutron to fail network delete after port deletion
original_delete = self.neutron.delete_network
def fake_delete_network(network_id):
# Delete ports first (this is what Neutron does)
for port in list(self.neutron._ports.values()):
if port['network_id'] == network_id:
del self.neutron._ports[port['id']]
# Then fail to delete the network
raise Exception("Neutron timeout during network delete")
self.useFixture(fixtures.MonkeyPatch(
'mystique.tests.local_fixtures.neutron.NeutronFixture.delete_network',
fake_delete_network))
# Attempt to delete the network (should fail)
exc = self.assertRaises(
Exception,
self.admin_api.delete_network,
network_id)
self.assertIn('timeout', str(exc))
# BUG: Ports are deleted in Neutron but still shown by Mystique API
# This demonstrates the broken behavior:
ports = self.api.list_ports(network_id=network_id)
self.assertEqual(2, len(ports),
"Bug: Ports still shown even though deleted in Neutron")
# Verify ports are actually gone from Neutron
self.assertEqual(0, len(self.neutron._ports))
# EXPECTED BEHAVIOR (commented out until bug is fixed):
# After a failed network delete, ports should be marked as deleted
# or the system should be in a consistent state
# ports = self.api.list_ports(network_id=network_id)
# self.assertEqual(0, len(ports),
# "Ports should be marked deleted to match Neutron")
# OR we should have a way to sync/reconcile the state:
# self.admin_api.sync_network_ports(network_id)
# ports = self.api.list_ports(network_id=network_id)
# self.assertEqual(0, len(ports))Commit message for pre-fix:
Add regression test for bug 7890123
This test reproduces the issue where ports are left in an inconsistent
state when network deletion fails in Neutron after ports have been
deleted.
The test currently asserts the broken behavior (ports still visible in
Mystique even though deleted in Neutron) and has the expected correct
behavior commented out.
This will be updated when the bug is fixed.
Related-Bug: #7890123
After investigation, the team decides to implement a reconciliation mechanism. The fix involves:
- Adding a
_sync_ports_with_neutron()method to the network manager - Calling this during network delete failure recovery
- Updating the test to verify the fix
Updated test file (showing the changes):
def test_network_delete_failure_port_cleanup(self):
"""Test port cleanup when network delete fails.
This test verifies that when network deletion fails in Neutron
after ports have been deleted, Mystique reconciles its port
state with Neutron to maintain consistency.
"""
# ... setup code unchanged ...
# Attempt to delete the network (should fail)
exc = self.assertRaises(
Exception,
self.admin_api.delete_network,
network_id)
self.assertIn('timeout', str(exc))
# BUG FIXED: After the failed delete, the system reconciles port state
# Old assertion (demonstrated broken behavior):
# ports = self.api.list_ports(network_id=network_id)
# self.assertEqual(2, len(ports),
# "Bug: Ports still shown even though deleted in Neutron")
# Correct behavior: Ports are automatically reconciled
ports = self.api.list_ports(network_id=network_id)
self.assertEqual(0, len(ports),
"Ports reconciled with Neutron state")
# Verify the network still exists (delete failed)
network = self.api.show_network(network_id)
self.assertEqual(network_id, network['id'])
# Verify we can now successfully delete the empty network
self.admin_api.delete_network(network_id)
self.assertRaises(
Exception, # NetworkNotFound
self.api.show_network,
network_id)Commit message for fix:
Fix port consistency after failed network delete
When a network delete operation fails in Neutron after ports have
been deleted, Mystique's database was left with stale port records.
This caused confusion and prevented proper cleanup.
This fix adds a reconciliation mechanism that:
1. Detects when Neutron delete fails
2. Queries Neutron for actual port state
3. Updates Mystique's database to match
4. Allows retry of the delete operation
The regression test is updated to assert the correct behavior.
Closes-Bug: #7890123
1. Explicit Dependencies Over Hidden Inheritance
The key principle: Make dependencies explicit. It's perfectly acceptable to reuse fixtures and helper functions, but do so explicitly rather than through deep inheritance chains.
Good - Explicit Fixture Reuse:
from nova.tests import fixtures as nova_fixtures
from nova.tests.functional import integrated_helpers
class TestBug123(test.TestCase, integrated_helpers.InstanceHelperMixin):
"""Direct inheritance from base TestCase with explicit mixin usage.
Uses InstanceHelperMixin for stable helper methods like
_wait_for_state_change() and _build_server().
"""
def setUp(self):
super(TestBug123, self).setUp()
# Explicitly set up fixtures - clear what dependencies exist
self.useFixture(nova_fixtures.RealPolicyFixture())
self.useFixture(nova_fixtures.NeutronFixture(self))
self.glance = self.useFixture(nova_fixtures.GlanceFixture(self))
self.cinder = self.useFixture(nova_fixtures.CinderFixture(self))
def test_something(self):
# Use helper from mixin - explicit and clear
server = self._build_server()
self._wait_for_state_change(server, 'ACTIVE')Good - Reusing Standalone Helpers:
from nova.tests.functional import integrated_helpers
class TestBug456(test.TestCase):
"""Using stable helper functions from integrated_helpers module."""
def test_something(self):
# Explicit call to standalone helper function
rp_uuid = integrated_helpers.get_provider_uuid_by_host(
self.placement, 'compute1')
# Clear what's being used and where it comes from
allocations = integrated_helpers.get_allocations_for_server(
self.placement, server['id'])Avoid - Deep Inheritance Chains:
class TestBug789(SomeSpecificTestClass):
"""Deep inheritance hides dependencies.
Problem: If SomeSpecificTestClass changes its setUp(), fixture
usage, or is refactored/removed, this test breaks even though
the bug hasn't regressed.
"""
passWhy This Matters:
- Stability: When dependencies are explicit, changes to unrelated test infrastructure don't break regression tests
- Clarity: Anyone reading the test can see exactly what it depends on
- Pragmatism: Reusing stable fixtures and helpers avoids duplication (DRY principle)
- Maintainability: If a shared fixture changes, it's clear which tests are affected
What's Safe to Reuse:
✅ Fixtures (via self.useFixture()):
self.useFixture(nova_fixtures.RealPolicyFixture())
self.useFixture(nova_fixtures.NeutronFixture(self))✅ Stable mixins (like InstanceHelperMixin):
class TestBug123(test.TestCase, integrated_helpers.InstanceHelperMixin):
pass✅ Standalone helper functions:
from nova.tests.functional.api import client
exc = self.assertRaises(client.OpenStackApiNotFoundException, ...)❌ Deep class hierarchies:
# Avoid: Multiple levels of inheritance
class TestBug123(SomeOtherTestClass): # which inherits from another class...
passThe Balance: DRY vs Stability
This approach strikes a balance between three goals:
- DRY (Don't Repeat Yourself): Reuse fixtures and helpers instead of duplicating code
- Stability: Tests survive refactoring of unrelated code because dependencies are explicit
- Clarity: Anyone can understand what a test depends on by reading it
Example of this balance:
# Good: Explicit reuse - stable and DRY
class TestBug123(test.TestCase, integrated_helpers.InstanceHelperMixin):
def setUp(self):
super().setUp()
self.useFixture(nova_fixtures.NeutronFixture(self)) # Explicit
self.useFixture(nova_fixtures.GlanceFixture(self)) # Explicit
def test_something(self):
server = self._build_server() # From mixin - visible import
self._wait_for_state_change(server, 'ACTIVE') # From mixin
# Bad: Hidden dependencies - breaks when parent changes
class TestBug456(SomeComplexTestClass): # What does this provide?
def test_something(self):
# Relies on self.api, self.neutron, etc from parent
# If parent changes, this breaks even if bug hasn't regressed
passThe test's setUp() method should explicitly declare all fixtures and dependencies, even when reusing shared components.
Good - Explicit Fixture Declaration:
def setUp(self):
super().setUp()
# Explicit fixture setup - clear what this test needs
self.useFixture(nova_fixtures.RealPolicyFixture())
self.useFixture(nova_fixtures.NeutronFixture(self))
self.glance = self.useFixture(nova_fixtures.GlanceFixture(self))
self.cinder = self.useFixture(nova_fixtures.CinderFixture(self))
# Start services explicitly
self.start_service('conductor')
self.start_service('scheduler')
self.start_service('compute', host='compute1')
# API fixture
self.api_fixture = self.useFixture(nova_fixtures.OSAPIFixture(
api_version='v2.1'))
self.api = self.api_fixture.apiAlso Good - Using Stable Helpers:
from nova.tests.functional import integrated_helpers
class TestBug123(test.TestCase, integrated_helpers.InstanceHelperMixin):
"""Using InstanceHelperMixin provides _build_server(), etc."""
def setUp(self):
super().setUp()
# Still explicitly set up fixtures even with mixin
self.useFixture(nova_fixtures.NeutronFixture(self))
# ...etc
# Mixin provides _build_server(), _wait_for_state_change()
# but you still control fixture setupAvoid - Hidden Dependencies:
def setUp(self):
super().setUp()
# Problem: What fixtures does this set up? You have to check parent class
self._setup_inherited_fixtures()
# Problem: Relies on parent's setUp() creating self.api, self.cinder, etc
# If parent changes, this breaksKey Principle: Anyone reading the test should understand its dependencies by reading setUp(), not by tracing through parent classes.
Every regression test should include:
class TestBugXXXXXX(test.TestCase):
"""One-line summary of the bug.
Detailed explanation:
- What was the bug?
- How did it manifest to users?
- What sequence of operations triggers it?
- What was the root cause?
- How was it fixed?
The test ensures the bug does not regress by:
- Describing what the test does
"""def _setup_compute_service(self):
# Override parent to create specific conditions for the bug
# In this case, NOT starting compute ensures scheduling failure
pass
def setUp(self):
super().setUp()
# Set specific config that triggers the bug
self.flags(allow_resize_to_same_host=False)
# etc.Good:
self.assertNotIn(volume_id,
self.cinder.volume_ids_for_instance(server_id),
"Volume should be detached after local delete")Avoid:
self.assertNotIn(volume_id, vol_list) # What does this test?When developing a regression test, you can verify the lifecycle:
# Phase 1: Verify test reproduces the bug (should pass with broken assertion)
tox -e functional -- mystique.tests.functional.regressions.test_bug_7890123
# Phase 2: Apply the fix, test should now pass with correct assertion
# (After updating test)
tox -e functional -- mystique.tests.functional.regressions.test_bug_7890123
# Phase 3: Verify test catches regression
# Temporarily revert the fix, test should fail
tox -e functional -- mystique.tests.functional.regressions.test_bug_7890123When creating a regression test:
- File named
test_bug_<id>.pyintests/functional/regressions/ - Inherits from
test.TestCaseor minimal base class (stable mixins likeInstanceHelperMixinare OK) - All dependencies are explicit (fixtures, helpers, imports)
- Self-contained
setUp()with all fixtures declared inline - Reuses stable fixtures and helpers where appropriate (DRY principle)
- Avoids deep inheritance chains that hide dependencies
- Comprehensive docstring explaining the bug
- Test reproduces the bug (initial commit)
- Broken assertions demonstrate current behavior
- Expected assertions commented out
- After fix: swap assertions (broken → commented, expected → active)
- Test verifies the fix prevents regression
- Clear, descriptive assertion messages
- Stable over time: changes to unrelated code don't break this test
For more examples, examine Nova's regression tests:
nova/tests/functional/regressions/test_bug_1404867.py- Volume cleanupnova/tests/functional/regressions/test_bug_1670627.py- Quota handlingnova/tests/functional/regressions/test_bug_1718455.py- Multi-createnova/tests/functional/regressions/test_bug_1790204.py- Same-host resize
Each demonstrates the pattern of:
- Clear documentation of the bug
- Self-contained setup
- Explicit reproduction of the failure scenario
- Verification of the fix
While regression tests should have explicit dependencies, creating reusable fixtures, mixins, and helper functions is strongly encouraged for common operations. This promotes the DRY (Don't Repeat Yourself) principle while maintaining clarity and stability.
Key principle: Make helpers that are stable, well-documented, and easy to discover.
Create reusable components when you notice:
- Repeated patterns across multiple tests
- Complex operations that would benefit from abstraction
- Common assertions that could be standardized
- Setup sequences used in multiple test files
IMPORTANT: When creating helper functions or methods that create resources (networks, servers, volumes, etc.), they MUST register cleanup functions by default. This ensures:
- No resource leaks between tests
- Test isolation - each test starts with a clean slate
- Reliable test runs - prevents cascading failures
Pattern to follow:
def _create_resource(self, name='test-resource'):
"""Create a resource and automatically register cleanup."""
resource = self.api.create_resource({'name': name})
# CRITICAL: Always register cleanup for created resources
self.addCleanup(self._cleanup_resource, resource['id'])
return resource
def _cleanup_resource(self, resource_id):
"""Cleanup helper - deletes resource, ignoring NotFound errors."""
try:
self.api.delete_resource(resource_id)
except Exception as e:
# Ignore if already deleted (e.g., test explicitly deleted it)
if 'NotFound' not in str(e):
raiseWhy this matters:
- Tests run in random order in CI
- Multiple tests may create resources with similar names
- Leftover resources can cause future test failures
- Cleanup functions run even if the test fails
Fixtures are the primary mechanism for code reuse in functional tests. They provide setup/teardown with automatic cleanup.
Example: Creating a Stable Fixture
File: mystique/tests/local_fixtures/neutron.py
"""Neutron fixture for Mystique tests."""
import copy
import fixtures
from oslo_utils import uuidutils
class NeutronFixture(fixtures.Fixture):
"""Mock Neutron API for Mystique tests.
This fixture provides a stateful mock of Neutron's API, tracking
networks, ports, and subnets. It's designed to be stable and
reusable across all functional tests.
Usage:
self.neutron = self.useFixture(NeutronFixture(self))
network = self.neutron.create_network({'name': 'test-net'})
"""
def __init__(self, test):
super().__init__()
self.test = test
self._networks = {}
self._ports = {}
self._subnets = {}
def setUp(self):
super().setUp()
# Mock Neutron client using fixtures.MonkeyPatch
self.test.useFixture(fixtures.MonkeyPatch(
'mystique.network.neutron.get_client',
self._get_client))
def _get_client(self, context, admin=False):
"""Return fake Neutron client."""
return _FakeNeutronClient(self)
def create_network(self, body):
"""Create a network - stable API for tests to use."""
network_req = body.get('network')
network_id = network_req.get('id') or uuidutils.generate_uuid()
network = {
'id': network_id,
'name': network_req.get('name'),
'status': 'ACTIVE',
'admin_state_up': network_req.get('admin_state_up', True),
}
self._networks[network_id] = network
return {'network': copy.deepcopy(network)}
def get_network(self, network_id):
"""Get network by ID - stable helper."""
if network_id not in self._networks:
raise Exception('NetworkNotFound: %s' % network_id)
return copy.deepcopy(self._networks[network_id])
class _FakeNeutronClient:
"""Fake Neutron client wrapper."""
def __init__(self, fixture):
self.fixture = fixture
def create_network(self, body):
return self.fixture.create_network(body)
def show_network(self, network_id):
return {'network': self.fixture.get_network(network_id)}Why this works:
- Stable interface: Methods like
create_network()won't change - Well-documented: Clear docstrings explain usage
- Self-contained: All Neutron mocking in one place
- Discoverable: In
local_fixtures/directory with clear name
Mixins provide helper methods that can be added to any test class. Use them for common operations that don't require fixtures.
Example: Creating a Stable Mixin
File: mystique/tests/functional/integrated_helpers.py
"""Helper mixins and functions for functional tests."""
import time
from mystique.tests.functional.api import client
class NetworkHelperMixin:
"""Mixin providing common network test operations.
This mixin is stable and designed for long-term reuse across
functional tests, including regression tests.
Usage:
class MyTest(test.TestCase, NetworkHelperMixin):
def test_something(self):
network = self._create_network('test-net')
self._wait_for_network_active(network)
"""
def _create_network(self, name='test-network', **kwargs):
"""Create a network and return the response.
IMPORTANT: This helper automatically registers cleanup to delete
the network when the test completes. This ensures tests don't
leave resources behind.
Args:
name: Network name (default: 'test-network')
**kwargs: Additional network properties
Returns:
dict: Network creation response
"""
network_req = {
'name': name,
'admin_state_up': kwargs.get('admin_state_up', True),
}
network_req.update(kwargs)
network = self.api.create_network(network_req)
# CRITICAL: Always register cleanup for created resources
self.addCleanup(self._cleanup_network, network['id'])
return network
def _cleanup_network(self, network_id):
"""Cleanup helper - deletes network, ignoring NotFound errors."""
try:
self.api.delete_network(network_id)
except Exception as e:
# Ignore if already deleted
if 'NotFound' not in str(e):
raise
def _wait_for_network_active(self, network, timeout=30):
"""Wait for network to reach ACTIVE status.
Args:
network: Network dict with 'id' key
timeout: Maximum time to wait in seconds
Returns:
dict: Updated network dict
Raises:
AssertionError: If network doesn't reach ACTIVE in time
"""
network_id = network['id']
for i in range(timeout * 10):
network = self.api.show_network(network_id)
if network['status'] == 'ACTIVE':
return network
if network['status'] == 'ERROR':
self.fail('Network %s went to ERROR' % network_id)
time.sleep(0.1)
self.fail('Timeout waiting for network %s to be ACTIVE' %
network_id)
def _delete_network_and_wait(self, network_id, timeout=30):
"""Delete a network and wait for it to be removed.
Args:
network_id: UUID of network to delete
timeout: Maximum time to wait in seconds
"""
self.api.delete_network(network_id)
for i in range(timeout * 10):
try:
self.api.show_network(network_id)
time.sleep(0.1)
except client.NotFoundException:
return
self.fail('Timeout waiting for network %s deletion' % network_id)
class PortHelperMixin:
"""Mixin for common port operations."""
def _create_port(self, network_id, name='test-port', **kwargs):
"""Create a port on the given network.
IMPORTANT: This helper automatically registers cleanup to delete
the port when the test completes. This ensures tests don't
leave resources behind.
Stable helper for port creation across all functional tests.
"""
port_req = {
'network_id': network_id,
'name': name,
}
port_req.update(kwargs)
port = self.api.create_port(port_req)
# CRITICAL: Always register cleanup for created resources
self.addCleanup(self._cleanup_port, port['id'])
return port
def _cleanup_port(self, port_id):
"""Cleanup helper - deletes port, ignoring NotFound errors."""
try:
self.api.delete_port(port_id)
except Exception as e:
# Ignore if already deleted
if 'NotFound' not in str(e):
raiseWhy this works:
- Composable: Mix multiple helpers into one test class
- Documented: Each method has clear docstring
- Stable: Interface won't change, only implementation
- Explicit usage:
class MyTest(test.TestCase, NetworkHelperMixin)
For operations that don't need test instance state, create standalone functions. These are the most explicit form of reuse.
Example: Creating Standalone Helpers
File: mystique/tests/functional/integrated_helpers.py
"""Standalone helper functions for functional tests."""
def wait_for_condition(condition_func, timeout=30, sleep_time=0.1,
error_message=None):
"""Generic wait helper for any condition.
This is a stable utility function that can be reused anywhere.
Args:
condition_func: Callable that returns True when condition met
timeout: Maximum time to wait in seconds
sleep_time: Time to sleep between checks
error_message: Custom error message (optional)
Returns:
The return value of condition_func when it becomes truthy
Raises:
TimeoutError: If condition not met within timeout
Example:
def check_ready():
return api.get_network(net_id)['status'] == 'ACTIVE'
wait_for_condition(check_ready, timeout=60,
error_message='Network never became active')
"""
import time
end_time = time.time() + timeout
while time.time() < end_time:
result = condition_func()
if result:
return result
time.sleep(sleep_time)
if error_message:
raise TimeoutError(error_message)
raise TimeoutError('Condition not met within %s seconds' % timeout)
def get_provider_uuid_by_name(placement_client, provider_name):
"""Get resource provider UUID by name from Placement.
Stable helper for Placement interactions.
Args:
placement_client: Placement API client
provider_name: Name of the resource provider
Returns:
str: UUID of the resource provider
Raises:
ValueError: If provider not found
"""
resp = placement_client.get('/resource_providers')
providers = resp.body['resource_providers']
for provider in providers:
if provider['name'] == provider_name:
return provider['uuid']
raise ValueError('Provider %s not found' % provider_name)
def assert_network_state(test_case, network, expected_status,
expected_name=None):
"""Assert network is in expected state.
Reusable assertion helper that can be used across all tests.
Args:
test_case: Test case instance (for assertions)
network: Network dict to check
expected_status: Expected status value
expected_name: Expected name (optional)
"""
test_case.assertEqual(expected_status, network['status'],
'Network status mismatch')
if expected_name:
test_case.assertEqual(expected_name, network['name'],
'Network name mismatch')Why this works:
- No hidden state: Pure functions, easy to understand
- Explicit imports: Clear where functionality comes from
- Composable: Combine multiple helpers in any test
- Testable: Helper functions can be unit tested themselves
Example: Using Reusable Components
"""Regression test demonstrating reusable component usage."""
from mystique import test
from mystique.tests import local_fixtures as mystique_fixtures
from mystique.tests.functional import integrated_helpers
class TestNetworkDeletion(test.TestCase,
integrated_helpers.NetworkHelperMixin):
"""Regression test for bug #7891234.
Demonstrates proper use of reusable components while maintaining
explicit dependencies for stability.
"""
def setUp(self):
super().setUp()
# Explicit fixture setup - clear what we depend on
self.useFixture(mystique_fixtures.ConfFixture())
self.useFixture(mystique_fixtures.Database())
self.neutron = self.useFixture(
mystique_fixtures.NeutronFixture(self))
# API fixture
self.api_fixture = self.useFixture(
mystique_fixtures.APIFixture())
self.api = self.api_fixture.api
# Start services
self.start_service('mystique-server')
def test_network_delete_with_ports(self):
"""Test network deletion when ports exist."""
# Use mixin helper - explicit, stable, and DRY
network = self._create_network('test-net')
network_id = network['id']
# Use mixin helper for port creation
port = self._create_port(network_id, 'test-port')
# Use standalone helper function - explicit import
integrated_helpers.wait_for_condition(
lambda: self.api.show_port(port['id'])['status'] == 'ACTIVE',
timeout=30,
error_message='Port never became active')
# Delete network (should cascade to ports)
self._delete_network_and_wait(network_id)
# Verify cleanup using standalone helper
with self.assertRaises(Exception):
self.api.show_network(network_id)What makes this good:
- Explicit fixture usage: All fixtures declared in
setUp() - Mixin for common operations:
NetworkHelperMixinprovides_create_network(), etc. - Standalone helpers:
wait_for_condition()imported explicitly - DRY principle: No duplicated wait logic, network creation, etc.
- Stability: If helpers change, it's clear this test uses them
# Good: Stable interface that won't change
def wait_for_network_active(api, network_id, timeout=30):
"""Wait for network to reach ACTIVE status.
This function signature is stable and won't change.
"""
pass
# Bad: Unstable, might need new parameters later
def wait_for_network(api, network_id):
# What if we need to wait for different statuses?
# Or different timeout values?
passCRITICAL: Any helper that creates resources MUST register cleanup by default.
# Good: Automatic cleanup registration
def _create_server(self, name='test-server', **kwargs):
"""Create a server and automatically register cleanup.
IMPORTANT: Cleanup is automatic - the server will be deleted
when the test completes, even if the test fails.
"""
server = self.api.create_server({'name': name, **kwargs})
self.addCleanup(self._cleanup_server, server['id'])
return server
def _cleanup_server(self, server_id):
"""Cleanup helper - safely delete server."""
try:
self.api.delete_server(server_id)
except Exception as e:
if 'NotFound' not in str(e):
raise
# Bad: No cleanup - causes resource leaks
def _create_server(self, name='test-server'):
"""Create a server."""
return self.api.create_server({'name': name})
# Missing cleanup registration!Why this is critical:
- Prevents resource leaks between tests
- Ensures test isolation
- Cleanup runs even if test fails or times out
- Tests can run in any order safely
class NetworkHelperMixin:
"""Mixin for network operations in functional tests.
This mixin is designed for long-term stability and reuse.
All methods maintain backward compatibility.
Usage:
class MyTest(test.TestCase, NetworkHelperMixin):
def test_something(self):
network = self._create_network('test')
Available methods:
- _create_network(name, **kwargs): Create a network
- _wait_for_network_active(network, timeout): Wait for ACTIVE
- _delete_network_and_wait(network_id, timeout): Delete network
"""
def _create_network(self, name='test-network', **kwargs):
"""Create a network.
Args:
name: Network name (default: 'test-network')
**kwargs: Additional network properties (admin_state_up,
description, etc.)
Returns:
dict: Network response from API
Example:
network = self._create_network('my-net',
admin_state_up=False)
"""
passOrganize reusable components in well-known locations:
<project>/tests/
├── functional/
│ ├── integrated_helpers.py # Mixins and standalone helpers
│ ├── base.py # Base test classes
│ └── api/
│ └── client.py # API client helpers
├── local_fixtures/
│ ├── __init__.py # Export common fixtures
│ ├── database.py # Database fixture
│ ├── rpc.py # RPC fixture
│ ├── neutron.py # Neutron fixture
│ └── api.py # API fixture
Export commonly-used components in __init__.py:
# <project>/tests/local_fixtures/__init__.py
"""Local fixtures for project functional tests."""
# Export the most commonly used fixtures for easy import
from .api import APIFixture # noqa: F401
from .conf import ConfFixture # noqa: F401
from .database import Database # noqa: F401
from .neutron import NeutronFixture # noqa: F401
from .rpc import RPCFixture # noqa: F401
__all__ = [
'APIFixture',
'ConfFixture',
'Database',
'NeutronFixture',
'RPCFixture',
]When you need to change a helper:
def _wait_for_network_status(self, network_id, status='ACTIVE',
timeout=30):
"""Wait for network to reach specified status.
This replaces the old _wait_for_network_active() method with a
more flexible version.
Args:
network_id: UUID of network
status: Target status (default: 'ACTIVE')
timeout: Maximum wait time in seconds
"""
pass
def _wait_for_network_active(self, network, timeout=30):
"""Wait for network to reach ACTIVE status.
DEPRECATED: Use _wait_for_network_status() instead.
This method is maintained for backward compatibility and will be
removed in the Z release.
"""
import warnings
warnings.warn('_wait_for_network_active is deprecated, use '
'_wait_for_network_status instead',
DeprecationWarning)
return self._wait_for_network_status(network['id'], 'ACTIVE',
timeout)- Reduced Duplication: Write wait logic once, use everywhere
- Consistency: All tests use the same patterns
- Easier Maintenance: Fix a bug once in the helper
- Better Documentation: Helpers document common patterns
- Faster Test Writing: New tests compose existing helpers
- Explicit Dependencies: Tests show what helpers they use
Do create:
- ✅ Stable fixtures for common service mocks
- ✅ Mixins for common test operations
- ✅ Standalone helpers for reusable logic
- ✅ Well-documented, discoverable components
Do use explicitly:
- ✅
self.useFixture(CommonFixture(self)) - ✅
class MyTest(test.TestCase, HelperMixin) - ✅
from tests.functional import integrated_helpers
Don't do:
- ❌ Create helpers that hide dependencies
- ❌ Make unstable interfaces that change frequently
- ❌ Bury helpers in obscure locations
- ❌ Create deep inheritance chains
The goal: Make it easy to write tests that are DRY while keeping dependencies explicit for long-term stability.
[tox]
minversion = 3.18.0
envlist = py3,functional,pep8
[testenv]
usedevelop = True
install_command = python -I -m pip install -c{env:TOX_CONSTRAINTS_FILE:...} {opts} {packages}
setenv =
VIRTUAL_ENV={envdir}
LANGUAGE=en_US
LC_ALL=en_US.utf-8
OS_STDOUT_CAPTURE=1
OS_STDERR_CAPTURE=1
OS_TEST_TIMEOUT=160
PYTHONDONTWRITEBYTECODE=1
deps =
-r{toxinidir}/test-requirements.txt
extras =
# Project-specific extras (e.g., zvm, vmware for Nova)
passenv =
# Allow OS_DEBUG for verbose logging
OS_DEBUG
# For greenlet leak detection
NOVA_RAISE_ON_GREENLET_LEAK
commands =
stestr run {posargs}
stestr slowest
[testenv:functional{,-py310,-py311,-py312}]
description =
Run functional tests.
setenv =
{[testenv]setenv}
# Enforce no greenlet leaks
NOVA_RAISE_ON_GREENLET_LEAK=True
deps =
{[testenv]deps}
# Placement is required for functional tests
openstack-placement>=9.0.0.0b1
commands =
stestr --test-path=./nova/tests/functional run {posargs}
stestr slowest- OS_DEBUG=1: Enable DEBUG level logging in tests
- OS_TEST_TIMEOUT: Timeout for individual tests (seconds)
- NOVA_RAISE_ON_GREENLET_LEAK: Make greenlet leaks fail tests
# All functional tests
tox -e functional
# Specific test
tox -e functional -- nova.tests.functional.test_servers.TestServers.test_create_server
# With debug logging
OS_DEBUG=1 tox -e functional -- nova.tests.functional.test_servers
# With failing tests only (reruns failures)
tox -e functional -- --failing- job:
name: nova-tox-functional-py312
parent: openstack-tox-functional-py312
description: |
Run tox-based functional tests for the OpenStack Nova project
under cPython version 3.12.
required-projects:
# Nova functional tests need placement
- openstack/nova
- openstack/placement
irrelevant-files:
# Skip job if only docs changed
- ^.*\.rst$
- ^api-.*$
- ^doc/(source|test)/.*$
- ^nova/locale/.*$
- ^releasenotes/.*$
vars:
zuul_work_dir: src/opendev.org/openstack/nova
bindep_profile: test py312
timeout: 3600 # 1 hour timeout- required-projects: Zuul clones these repos before running tests
- irrelevant-files: Skip job if only these files changed
- bindep_profile: System dependencies (e.g., libvirt-dev)
- timeout: Maximum job runtime
# System dependencies for functional tests
libvirt-dev [platform:dpkg test]
libvirt-devel [platform:rpm test]
Let's port this to a fictional OpenStack project called "mystique" (a networking service).
Important Notes:
- Use
mystique/tests/local_fixtures/(notfixtures/) to avoid import conflicts with thefixturespackage - Use
fixtures.MonkeyPatchanduseFixture()instead ofstub_out() - Use standard threading instead of eventlet (eventlet is being removed from OpenStack)
- Focus on single database (multi-DB like CellDatabases is Nova-specific)
File: mystique/tests/functional/base.py
"""Base classes for Mystique functional tests."""
import fixtures
from oslo_config import cfg
from oslo_log import log as logging
from oslotest import base
from mystique import context
from mystique import rpc
from mystique.tests import local_fixtures as mystique_fixtures
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class MystiqueFunctionalTestCase(base.BaseTestCase):
"""Base class for Mystique functional tests.
Sets up:
- Database (oslo.db with SQLite)
- RPC (oslo.messaging with fake driver)
- Notifications
- API server
- External service mocks
"""
# Class attributes
USES_DB = True
STUB_RPC = True
def setUp(self):
super(MystiqueFunctionalTestCase, self).setUp()
# Configuration
self.useFixture(mystique_fixtures.ConfFixture(CONF))
# Database
if self.USES_DB:
self.useFixture(mystique_fixtures.Database())
# RPC
if self.STUB_RPC:
self.useFixture(mystique_fixtures.RPCFixture())
CONF.set_default('driver', ['test'],
group='oslo_messaging_notifications')
# Notifications
self.notifier = self.useFixture(
mystique_fixtures.NotificationFixture(self))
# External services
self.neutron = self.useFixture(
mystique_fixtures.NeutronFixture(self))
self.nova = self.useFixture(
mystique_fixtures.NovaFixture(self))
# API
self.api_fixture = self.useFixture(
mystique_fixtures.APIFixture())
self.api = self.api_fixture.api
self.admin_api = self.api_fixture.admin_api
# Start services
self.start_service('mystique-server')
def flags(self, **kw):
"""Override flag variables for a test.
Example:
self.flags(enabled_filters=['FilterA', 'FilterB'],
group='scheduler')
"""
group = kw.pop('group', None)
for k, v in kw.items():
CONF.set_override(k, v, group)
# Register cleanup to restore original value
self.addCleanup(CONF.clear_override, k, group)
def start_service(self, name, host=None, **kwargs):
"""Start a Mystique service."""
if host is not None:
self.flags(host=host)
svc = self.useFixture(
mystique_fixtures.ServiceFixture(name, host, **kwargs))
return svc.serviceFile: mystique/tests/local_fixtures/conf.py
"""Configuration fixture for Mystique tests."""
from oslo_config import cfg
from oslo_config import fixture as config_fixture
from mystique import config
from mystique import paths
CONF = cfg.CONF
class ConfFixture(config_fixture.Config):
"""Fixture to manage global conf settings."""
def setUp(self):
super(ConfFixture, self).setUp()
# Default group
self.conf.set_default('debug', True)
self.conf.set_default('host', 'test-host')
# Database group
self.conf.set_default('connection', 'sqlite://', group='database')
self.conf.set_default('sqlite_synchronous', False, group='database')
# API group
self.conf.set_default('api_workers', 1, group='api')
# Parse args with no config files
config.parse_args([], default_config_files=[], configure_db=False,
init_rpc=False)
class ConfPatcher(fixtures.Fixture):
"""Fixture to patch and restore global CONF."""
def __init__(self, **kwargs):
super(ConfPatcher, self).__init__()
self.group = kwargs.pop('group', None)
self.args = kwargs
def setUp(self):
super(ConfPatcher, self).setUp()
for k, v in self.args.items():
self.addCleanup(CONF.clear_override, k, self.group)
CONF.set_override(k, v, self.group)File: mystique/tests/local_fixtures/database.py
Note: This is the single database case. If your project needs multiple databases (like Nova's cells), you would need a more complex fixture, but that's uncommon.
"""Database fixture for Mystique tests."""
import fixtures
from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import test_fixtures as db_fixtures
from mystique.db import api as db_api
from mystique.db import migration
CONF = cfg.CONF
DB_SCHEMA = {} # Schema cache
class Database(fixtures.Fixture):
"""Create a database fixture with SQLite."""
def __init__(self):
super().__init__()
def setUp(self):
super().setUp()
# Inject a new factory for each test
new_engine = enginefacade.transaction_context()
self.useFixture(
db_fixtures.ReplaceEngineFacadeFixture(
db_api.context_manager, new_engine))
db_api.configure(CONF)
self.get_engine = db_api.get_engine
self._apply_schema()
self.addCleanup(self.cleanup)
def _apply_schema(self):
"""Apply database schema (cached for speed)."""
global DB_SCHEMA
if not DB_SCHEMA:
engine = self.get_engine()
conn = engine.connect()
migration.db_sync()
DB_SCHEMA = "".join(
line for line in conn.connection.iterdump())
else:
engine = self.get_engine()
conn = engine.connect()
conn.connection.executescript(DB_SCHEMA)
def cleanup(self):
engine = self.get_engine()
engine.dispose()File: mystique/tests/local_fixtures/rpc.py
"""RPC fixture for Mystique tests."""
import fixtures
from oslo_config import cfg
import oslo_messaging as messaging
from oslo_messaging import conffixture as messaging_conffixture
from mystique import rpc
CONF = cfg.CONF
class RPCFixture(fixtures.Fixture):
"""Set up RPC with the fake:// transport for testing."""
def __init__(self, *exmods):
super(RPCFixture, self).__init__()
self.exmods = []
self.exmods.extend(exmods)
self._buses = {}
def _fake_create_transport(self, url):
"""Create or return cached fake transport."""
url = None # Collapse all to single bus
if url not in self._buses:
exmods = rpc.get_allowed_exmods()
self._buses[url] = messaging.get_rpc_transport(
CONF,
url=url,
allowed_remote_exmods=exmods)
return self._buses[url]
def setUp(self):
super(RPCFixture, self).setUp()
self.addCleanup(rpc.cleanup)
rpc.add_extra_exmods(*self.exmods)
self.addCleanup(rpc.clear_extra_exmods)
self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
self.messaging_conf.transport_url = 'fake:/'
self.useFixture(self.messaging_conf)
self.useFixture(fixtures.MonkeyPatch(
'mystique.rpc.create_transport', self._fake_create_transport))
from unittest import mock
with mock.patch('mystique.rpc.get_transport_url') as mock_gtu:
mock_gtu.return_value = None
rpc.init(CONF)
def cleanup_in_flight_rpc_messages():
messaging._drivers.impl_fake.FakeExchangeManager._exchanges = {}
self.addCleanup(cleanup_in_flight_rpc_messages)File: mystique/tests/local_fixtures/notifications.py
Note: Uses standard threading instead of eventlet (which is being removed from OpenStack).
"""Notification fixture for Mystique tests."""
import collections
import functools
import threading
import fixtures
from oslo_log import log as logging
import oslo_messaging
from oslo_serialization import jsonutils
from oslo_utils import timeutils
from mystique import rpc
LOG = logging.getLogger(__name__)
class _Sub(object):
"""Subscription helper for waiting on notifications."""
def __init__(self):
self._cond = threading.Condition()
self._notifications = []
def received(self, notification):
with self._cond:
self._notifications.append(notification)
self._cond.notify_all()
def wait_n(self, n, event, timeout):
"""Wait until at least n notifications have been received."""
with timeutils.StopWatch(timeout) as timer:
with self._cond:
while len(self._notifications) < n:
if timer.expired():
raise AssertionError(
"Notification %s not received" % event)
self._cond.wait(timer.leftover())
return list(self._notifications)
class FakeVersionedNotifier(object):
"""Captures versioned notifications."""
def __init__(self, transport, publisher_id, serializer=None):
self.transport = transport
self.publisher_id = publisher_id
self._serializer = serializer or \
oslo_messaging.serializer.NoOpSerializer()
self.versioned_notifications = []
self.subscriptions = collections.defaultdict(_Sub)
for priority in ['debug', 'info', 'warn', 'error', 'critical']:
setattr(
self, priority,
functools.partial(self._notify, priority.upper()))
def prepare(self, publisher_id=None):
if publisher_id is None:
publisher_id = self.publisher_id
return self.__class__(
self.transport, publisher_id, serializer=self._serializer)
def _notify(self, priority, ctxt, event_type, payload):
"""Capture notification."""
payload = self._serializer.serialize_entity(ctxt, payload)
jsonutils.to_primitive(payload) # Verify serialization
notification = {
'publisher_id': self.publisher_id,
'priority': priority,
'event_type': event_type,
'payload': payload,
}
self.versioned_notifications.append(notification)
self.subscriptions[event_type].received(notification)
def wait_for_versioned_notifications(
self, event_type, n_events=1, timeout=10.0
):
"""Wait for notifications with timeout."""
return self.subscriptions[event_type].wait_n(
n_events, event_type, timeout)
def reset(self):
self.versioned_notifications.clear()
self.subscriptions.clear()
class NotificationFixture(fixtures.Fixture):
"""Fixture to capture oslo.messaging notifications."""
def __init__(self, test):
self.test = test
def setUp(self):
super().setUp()
self.addCleanup(self.reset)
self.fake_versioned_notifier = FakeVersionedNotifier(
rpc.NOTIFIER.transport,
rpc.NOTIFIER.publisher_id,
serializer=getattr(rpc.NOTIFIER, '_serializer', None))
if rpc.NOTIFIER:
self.test.useFixture(fixtures.MonkeyPatch(
'mystique.rpc.NOTIFIER',
self.fake_versioned_notifier))
def reset(self):
self.fake_versioned_notifier.reset()
def wait_for_versioned_notifications(
self, event_type, n_events=1, timeout=10.0
):
return self.fake_versioned_notifier.wait_for_versioned_notifications(
event_type, n_events, timeout)
@property
def versioned_notifications(self):
return self.fake_versioned_notifier.versioned_notificationsFile: mystique/tests/local_fixtures/neutron.py
"""Neutron fixture for Mystique tests."""
import copy
import fixtures
from mystique.tests.local_fixtures import conf as conf_fixtures
class NeutronFixture(fixtures.Fixture):
"""Mock Neutron API for Mystique tests."""
def __init__(self, test):
super().__init__()
self.test = test
self._networks = {}
self._ports = {}
def setUp(self):
super().setUp()
# Mock Neutron client using fixtures.MonkeyPatch
self.test.useFixture(fixtures.MonkeyPatch(
'mystique.network.neutron.get_client',
self._get_client))
def _get_client(self, context, admin=False):
"""Return fake Neutron client."""
return _FakeNeutronClient(self)
def create_network(self, body):
"""Mock network creation."""
network_req = body.get('network')
network_id = network_req.get('id') or str(uuid.uuid4())
network = {
'id': network_id,
'name': network_req.get('name'),
'status': 'ACTIVE',
'admin_state_up': network_req.get('admin_state_up', True),
}
self._networks[network_id] = network
return {'network': copy.deepcopy(network)}
def show_network(self, network_id):
"""Mock get network."""
if network_id not in self._networks:
raise Exception('NetworkNotFound')
return {'network': copy.deepcopy(self._networks[network_id])}
def list_networks(self, **filters):
"""Mock list networks."""
networks = list(self._networks.values())
return {'networks': copy.deepcopy(networks)}
class _FakeNeutronClient:
"""Fake Neutron client wrapper."""
def __init__(self, fixture):
self.fixture = fixture
def create_network(self, body):
return self.fixture.create_network(body)
def show_network(self, network_id):
return self.fixture.show_network(network_id)
def list_networks(self, **filters):
return self.fixture.list_networks(**filters)File: mystique/tests/local_fixtures/api.py
"""API fixture for Mystique tests."""
import fixtures
from oslo_utils.fixture import uuidsentinel
from wsgi_intercept import interceptor
from mystique.api import wsgi
from mystique.tests.local_fixtures import conf as conf_fixtures
from mystique.tests.functional.api import client
class APIFixture(fixtures.Fixture):
"""Create a Mystique API server as a fixture."""
def __init__(self, api_version='v1'):
super(APIFixture, self).__init__()
self.api_version = api_version
def setUp(self):
super(APIFixture, self).setUp()
hostname = uuidsentinel.api_host
port = 80
service_name = 'mystique_api'
endpoint = 'http://%s:%s/' % (hostname, port)
self.useFixture(conf_fixtures.ConfPatcher(debug=True))
# Load WSGI app
loader = wsgi.Loader().load_app(service_name)
app = lambda: loader
# Install wsgi-intercept
intercept = interceptor.RequestsInterceptor(app, url=endpoint)
intercept.install_intercept()
self.addCleanup(intercept.uninstall_intercept)
# Create API clients
base_url = 'http://%(host)s:%(port)s/%(api_version)s' % {
'host': hostname, 'port': port, 'api_version': self.api_version}
self.api = client.TestMystiqueClient('user', base_url)
self.admin_api = client.TestMystiqueClient('admin', base_url,
is_admin=True)File: mystique/tests/local_fixtures/service.py
"""Service fixture for Mystique tests."""
import fixtures
from unittest import mock
from mystique import context
from mystique import service
class ServiceFixture(fixtures.Fixture):
"""Run a Mystique service as a test fixture.
Services are started in background threads (not eventlet greenthreads,
as eventlet is being removed from OpenStack).
"""
def __init__(self, name, host=None, **kwargs):
self.name = name
self.host = host or name
kwargs.setdefault('host', self.host)
kwargs.setdefault('binary', 'mystique-%s' % name)
self.kwargs = kwargs
def setUp(self):
super(ServiceFixture, self).setUp()
self.ctxt = context.get_admin_context()
# Use fixtures.MonkeyPatch instead of mock.patch context manager
mock_ctx = mock.MagicMock(return_value=self.ctxt)
self.useFixture(fixtures.MonkeyPatch(
'mystique.context.get_admin_context',
mock_ctx))
self.service = service.Service.create(**self.kwargs)
self.service.start()
self.addCleanup(self.service.kill)File: mystique/tests/functional/test_networks.py
"""Functional tests for Mystique network operations."""
from mystique.tests.functional import base
class NetworkTestCase(base.MystiqueTestCase):
"""Test network creation and management."""
def test_create_network(self):
"""Test basic network creation."""
# Create network via API
network_req = {
'name': 'test-network',
'admin_state_up': True,
}
network = self.api.create_network(network_req)
self.assertEqual('ACTIVE', network['status'])
# Verify notification
notifications = self.notifier.wait_for_versioned_notifications(
'network.create.end', n_events=1, timeout=10.0)
self.assertEqual(1, len(notifications))
self.assertEqual(network['id'],
notifications[0]['payload']['network_id'])[tox]
minversion = 3.18.0
envlist = py3,functional,pep8
[testenv]
usedevelop = True
setenv =
VIRTUAL_ENV={envdir}
OS_STDOUT_CAPTURE=1
OS_STDERR_CAPTURE=1
OS_TEST_TIMEOUT=160
deps =
-r{toxinidir}/test-requirements.txt
commands =
stestr run {posargs}
[testenv:functional]
description = Run functional tests
setenv =
{[testenv]setenv}
commands =
stestr --test-path=./mystique/tests/functional run {posargs}- job:
name: mystique-functional
parent: openstack-tox-functional-py312
description: Run Mystique functional tests
required-projects:
- openstack/mystique
irrelevant-files:
- ^.*\.rst$
- ^doc/.*$
- ^releasenotes/.*$
vars:
zuul_work_dir: src/opendev.org/openstack/mystique
tox_envlist: functional
timeout: 1800This guide provides a complete blueprint for replicating Nova's functional test infrastructure in any OpenStack project. The key components are:
-
Base Test Class (
oslotest.base.BaseTestCase)- Test isolation and cleanup
- Fixture management
-
Configuration (
oslo.config)- Test-specific defaults
- Per-test overrides
-
Database (
oslo.db)- In-memory SQLite
- Schema caching
- Multi-database support (cells)
-
RPC/Messaging (
oslo.messaging)- Fake transport
- Synchronous casts
- Message capture
-
Notifications (
oslo.messaging)- Versioned notification capture
- Subscription and waiting
-
External Services
- Mocked at API boundary
- Stateful tracking
- Realistic behavior
-
API Testing
- Real WSGI app
- wsgi-intercept
- Multiple clients (admin, user, reader)
-
Service Lifecycle
- Start/stop services
- Service fixtures
- Clean shutdown
- Speed: Schema caching, in-memory databases, synchronous RPC
- Isolation: Fresh state per test, cleanup via fixtures
- Realism: Real API code, realistic mocks for external services
- Debuggability: OS_DEBUG=1 for verbose logs, clear error messages
- Maintainability: Fixtures, not inheritance; composition, not duplication
<project>/tests/functional/base.py- Base test class<project>/tests/local_fixtures/__init__.py- Package init<project>/tests/local_fixtures/conf.py- Configuration fixtures<project>/tests/local_fixtures/database.py- Database fixture (single DB)<project>/tests/local_fixtures/rpc.py- RPC fixtures<project>/tests/local_fixtures/notifications.py- Notification fixtures<project>/tests/local_fixtures/<service>.py- External service mocks<project>/tests/local_fixtures/api.py- API fixture<project>/tests/local_fixtures/service.py- Service fixture<project>/tests/functional/test_*.py- Actual teststox.ini- Tox environment configuration.zuul.yaml- CI job configuration
Important: Use local_fixtures/ instead of fixtures/ to avoid import conflicts with the fixtures package when debuggers add <project>/tests to their import path.
This infrastructure enables fast, isolated, comprehensive functional testing without requiring external dependencies or complex deployment.
Nova historically used a stub_out() helper, but this can be replaced with the fixtures library:
# Nova's stub_out (for reference)
def stub_out(self, old, new):
"""Replace a function for the duration of the test."""
self.useFixture(fixtures.MonkeyPatch(old, new))
# Modern approach - use fixtures.MonkeyPatch directly
class MyTest(base.BaseTestCase):
def test_something(self):
# Instead of: self.stub_out('module.function', fake_function)
# Use:
self.useFixture(fixtures.MonkeyPatch(
'module.function', fake_function))The flags() helper makes it easy to override configuration options with automatic cleanup:
from oslo_config import cfg
CONF = cfg.CONF
class BaseTestCase(oslotest.base.BaseTestCase):
"""Base test class with configuration helper."""
def flags(self, **kw):
"""Override flag variables for a test.
Automatically registers cleanup to restore original values.
Examples:
self.flags(debug=True)
self.flags(enabled_filters=['FilterA', 'FilterB'],
group='scheduler')
:param kw: Keyword arguments where keys are config option names
and values are the values to set. Special keyword 'group'
specifies the config group (defaults to DEFAULT).
"""
group = kw.pop('group', None)
for k, v in kw.items():
CONF.set_override(k, v, group)
# Automatically clean up after the test
self.addCleanup(CONF.clear_override, k, group)Since eventlet is being removed from OpenStack, use standard threading:
import threading
import queue
# OLD (eventlet) - Don't use
import eventlet
gt = eventlet.spawn(my_function, arg1, arg2)
result = gt.wait()
# NEW (threading) - Use this
import concurrent.futures
# For single background task
thread = threading.Thread(target=my_function, args=(arg1, arg2))
thread.start()
thread.join()
# For thread pool
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
future = executor.submit(my_function, arg1, arg2)
result = future.result(timeout=30)In Tests:
class MyServiceFixture(fixtures.Fixture):
"""Run a service in a background thread."""
def setUp(self):
super().setUp()
self.service = MyService()
# Start service in thread (not eventlet greenthread)
self.thread = threading.Thread(
target=self.service.start,
daemon=True) # Daemon thread dies when main thread exits
self.thread.start()
# Register cleanup
self.addCleanup(self._cleanup)
def _cleanup(self):
"""Stop the service and join the thread."""
self.service.stop()
self.thread.join(timeout=10)
if self.thread.is_alive():
# Log warning but don't fail - daemon threads will die anyway
LOG.warning('Service thread did not stop cleanly')Most OpenStack projects only need a single database. Here's the complete pattern:
"""Database fixture for single database projects."""
import fixtures
from oslo_config import cfg
from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import test_fixtures as db_fixtures
from myproject.db import api as db_api
from myproject.db import migration
CONF = cfg.CONF
DB_SCHEMA = {} # Global cache for schema
class Database(fixtures.Fixture):
"""Create a single database fixture with SQLite.
This is the common case for most OpenStack projects.
If you need multiple databases (like Nova's cells),
you'll need a more complex fixture.
"""
def __init__(self):
super().__init__()
def setUp(self):
super().setUp()
# Create a new transaction context for this test
new_engine = enginefacade.transaction_context()
# Replace the global context manager with test-specific one
self.useFixture(
db_fixtures.ReplaceEngineFacadeFixture(
db_api.context_manager, new_engine))
# Configure database with test settings
db_api.configure(CONF)
self.get_engine = db_api.get_engine
self._apply_schema()
self.addCleanup(self.cleanup)
def _apply_schema(self):
"""Apply database schema using cached SQL for speed."""
global DB_SCHEMA
if not DB_SCHEMA:
# First test: run migrations and cache result
engine = self.get_engine()
conn = engine.connect()
# Run migrations to create schema
migration.db_sync()
# Cache the schema as SQL statements
# (SQLite-specific: iterdump() returns SQL statements)
DB_SCHEMA = "".join(
line for line in conn.connection.iterdump())
else:
# Subsequent tests: apply cached schema (much faster!)
engine = self.get_engine()
conn = engine.connect()
conn.connection.executescript(DB_SCHEMA)
def cleanup(self):
"""Dispose of database engine."""
engine = self.get_engine()
engine.dispose()
# Usage in tests
class MyTestCase(oslotest.base.BaseTestCase):
def setUp(self):
super().setUp()
self.useFixture(Database())
# Now you can use the database!File: <project>/tests/local_fixtures/__init__.py
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Local fixtures for project functional tests.
This package is named 'local_fixtures' instead of 'fixtures' to avoid
import conflicts with the 'fixtures' package when debuggers add the
tests directory to sys.path for test discovery.
"""
# Import commonly used fixtures for convenience
from .api import APIFixture # noqa: F401
from .conf import ConfFixture # noqa: F401
from .conf import ConfPatcher # noqa: F401
from .database import Database # noqa: F401
from .neutron import NeutronFixture # noqa: F401
from .notifications import NotificationFixture # noqa: F401
from .rpc import RPCFixture # noqa: F401
from .service import ServiceFixture # noqa: F401
__all__ = [
'APIFixture',
'ConfFixture',
'ConfPatcher',
'Database',
'NeutronFixture',
'NotificationFixture',
'RPCFixture',
'ServiceFixture',
]Nova's CellDatabases fixture is complex because Nova has a unique architecture with multiple databases per deployment. Most OpenStack projects don't need this. Key differences:
| Feature | Nova (cells) | Most Projects |
|---|---|---|
| Databases | API DB + N cell DBs | Single DB |
| Context targeting | context.target_cell() |
Not needed |
| DB routing | Dynamic based on instance | Simple |
| Complexity | High | Low |
When porting:
- Use the simple
Databasefixture (shown above) - Skip
CellDatabasesentirely - Skip context targeting logic
- If you really need multiple DBs, add them incrementally
Fixture: A test component that provides setup/teardown with
automatic cleanup. Part of the fixtures library.
Example: self.useFixture(NeutronFixture(self))
Mixin: A class that provides methods to be used by other classes
through multiple inheritance.
Example: class MyTest(test.TestCase, HelperMixin)
oslo.messaging: OpenStack library for RPC and notifications. Provides abstraction over message transports (RabbitMQ, etc.)
Transport: The messaging backend (e.g., RabbitMQ, fake://). In
functional tests, we use fake:// for synchronous in-memory messaging.
wsgi-intercept: Library that intercepts HTTP requests and routes them to WSGI applications in-process, avoiding real HTTP connections.
Cell: Nova concept for scaling. Most projects don't need this.
Placement: OpenStack service for tracking resource inventory and usage. Often needed in Nova tests.
RPC Cast vs Call:
- Cast: Fire-and-forget, no return value, asynchronous
- Call: Synchronous, waits for and returns response
Stestr: Test runner for OpenStack projects. Replaces older testtools and testr.
addCleanup: Fixture/test method that registers cleanup functions to run after test, even if test fails.
End of Guide
Version: 1.0
Last Updated: October 2025