Created
November 28, 2025 18:36
-
-
Save bodokaiser/dd0c5e34f50e6c5ead40e8139042ef0c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import argparse | |
| from sipyco import pc_rpc, common_args | |
| from driver import Camera | |
| def get_argparser(): | |
| parser = argparse.ArgumentParser( | |
| description="ARTIQ controller for the Alliedvision cameras" | |
| ) | |
| parser.add_argument( | |
| "-d", "--device", required=True, default=None, help="Camera device ID" | |
| ) | |
| common_args.simple_network_args(parser, 3255) | |
| common_args.verbosity_args(parser) | |
| return parser | |
| def main(): | |
| args = get_argparser().parse_args() | |
| print( | |
| f"found camera ids {', '.join(Camera.available_camera_ids())} using {args.device}" | |
| ) | |
| try: | |
| camera = Camera(args.device) | |
| camera.start_acquisition() | |
| pc_rpc.simple_server_loop( | |
| {"camera": camera}, common_args.bind_address_from_args(args), args.port | |
| ) | |
| except KeyboardInterrupt: | |
| pass | |
| finally: | |
| camera.stop_acquisition() | |
| if __name__ == "__main__": | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import vmbpy | |
| import threading | |
| import time | |
| import queue | |
| import numpy as np | |
| vmb = vmbpy.VmbSystem.get_instance() | |
| class Camera: | |
| """ | |
| Base class for Alliedvision PoE cameras. | |
| :param camera_id: camera identifier, e.g., "DEV_000F3100A8D1" | |
| """ | |
| def __init__( | |
| self, | |
| camera_id: str, | |
| ): | |
| self._id = camera_id | |
| # reference to the background thread | |
| self._thread = None | |
| # thread-safe queues for the images | |
| self._images = queue.Queue() | |
| # event to signal the thread to stop | |
| self._stop = threading.Event() | |
| # lock to avoid race conditions while starting/stopping the acquisition loop | |
| self._lock = threading.Lock() | |
| @staticmethod | |
| def available_camera_ids(): | |
| """ | |
| Returns a list of available camera identifiers. | |
| """ | |
| with vmb: | |
| return [camera.get_id() for camera in vmb.get_all_cameras()] | |
| @property | |
| def id(self) -> str: | |
| """ | |
| Returns the camera identifiers of cameras found in the local network. | |
| """ | |
| return self._id | |
| @property | |
| def name(self) -> str: | |
| """ | |
| Returns the camera name. | |
| """ | |
| with self._lock: | |
| with vmb, vmb.get_camera_by_id(self._id) as camera: | |
| return camera.get_name() | |
| @property | |
| def model(self) -> str: | |
| """ | |
| Returns the camera model. | |
| """ | |
| with self._lock: | |
| with vmb, vmb.get_camera_by_id(self._id) as camera: | |
| return camera.get_model() | |
| @property | |
| def serial(self) -> str: | |
| """ | |
| Returns the camera serial number. | |
| """ | |
| with self._lock: | |
| with vmb, vmb.get_camera_by_id(self._id) as camera: | |
| return camera.get_serial() | |
| @property | |
| def width(self) -> int: | |
| """ | |
| Returns the camera image width in number of pixels. | |
| """ | |
| with self._lock: | |
| with vmb, vmb.get_camera_by_id(self._id) as camera: | |
| return camera.Width.get() | |
| @property | |
| def height(self) -> int: | |
| """ | |
| Returns the camera image height in number of pixels. | |
| """ | |
| with self._lock: | |
| with vmb, vmb.get_camera_by_id(self._id) as camera: | |
| return camera.Height.get() | |
| def __enter__(self): | |
| self.start_acquisition() | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| self.stop_acquisition() | |
| def configure( | |
| self, | |
| gain_auto=False, | |
| gamma=1.0, | |
| exposure_auto=False, | |
| exposure_mode="TriggerWidth", | |
| trigger_source="Line1", | |
| trigger_selector="FrameStart", | |
| acquisition_mode="Continuous", | |
| action_device_key=1, | |
| action_group_key=1, | |
| action_group_mask=1, | |
| ): | |
| """ | |
| Configure the camera with the specified parameters, see Ref. [1] for details. | |
| [1]: https://cdn.alliedvision.com/fileadmin/content/documents/products/cameras/various/features/GigE_Features_Reference.pdf | |
| """ | |
| with self._lock: | |
| with vmb, vmb.get_camera_by_id(self._id) as camera: | |
| # adjust_package_size(camera) | |
| camera.Gamma.set(gamma) | |
| if not gain_auto: | |
| camera.GainAuto.set("Off") | |
| if not exposure_auto: | |
| camera.ExposureAuto.set("Off") | |
| pixelformats = camera.get_pixel_formats() | |
| self.pixelformats = pixelformats | |
| print(pixelformats) | |
| # camera.set_pixel_format(pixelformats[1]) | |
| camera.ActionDeviceKey.set(action_device_key) | |
| camera.ActionGroupKey.set(action_group_key) | |
| camera.ActionGroupMask.set(action_group_mask) | |
| camera.AcquisitionMode.set(acquisition_mode) | |
| # camera.ExposureMode.set(exposure_mode) | |
| camera.TriggerSource.set(trigger_source) | |
| camera.TriggerSelector.set(trigger_selector) | |
| camera.TriggerMode.set("On") | |
| def start_acquisition(self): | |
| """ | |
| Starts the acquisition loop thread to enqueue frames from the camera in the background. | |
| """ | |
| self._stop.clear() | |
| def frame_handler(camera: vmbpy.Camera, _: vmbpy.Stream, frame: vmbpy.Frame): | |
| image = frame.convert_pixel_format(self.pixelformats[1]).as_numpy_ndarray() | |
| self._images.put(image) | |
| camera.queue_frame(frame) | |
| def acquisition_loop(): | |
| self._lock.acquire() | |
| with vmb, vmb.get_camera_by_id(self._id) as camera: | |
| camera.start_streaming(frame_handler) | |
| camera.AcquisitionStart.run() | |
| self._lock.release() | |
| while not self._stop.is_set(): | |
| time.sleep(0.1) | |
| self._lock.acquire() | |
| camera.AcquisitionStop.run() | |
| camera.stop_streaming() | |
| self._thread = threading.Thread(target=acquisition_loop) | |
| self._thread.daemon = True | |
| self._thread.start() | |
| def stop_acquisition(self): | |
| """ | |
| Stops the acquisition loop thread. | |
| """ | |
| self._stop.set() | |
| if self._thread is not None: | |
| self._thread.join() | |
| self._thread = None | |
| def software_trigger(self): | |
| with self._lock: | |
| with vmb, vmb.get_camera_by_id(self._id) as camera: | |
| camera.TriggerSoftware.run() | |
| def action_command_trigger(self, device_key=1, group_key=1, group_mask=1): | |
| """ | |
| Executes an action command (ethernet) trigger. | |
| """ | |
| with self._lock: | |
| with vmb, vmb.get_camera_by_id(self._id) as camera: | |
| interface = camera.get_interface() | |
| interface.ActionDeviceKey.set(device_key) | |
| interface.ActionGroupKey.set(group_key) | |
| interface.ActionGroupMask.set(group_mask) | |
| interface.ActionCommand.run() | |
| def retrieve_image(self): | |
| """ | |
| Retrieves an image from the queue, blocking until an image is available. | |
| """ | |
| return self._images.get() | |
| def save_image(self, path): | |
| image = self.retrieve_image() | |
| np.save(path, image, allow_pickle=False) | |
| def close(self): | |
| """ | |
| Closes the camera. | |
| """ | |
| with self._lock: | |
| with vmb, vmb.get_camera_by_id(self._id) as camera: | |
| camera.close() | |
| def adjust_package_size(camera: Camera): | |
| stream = camera.get_streams()[0] | |
| stream.GVSPAdjustPacketSize.run() | |
| while not stream.GVSPAdjustPacketSize.is_done(): | |
| time.sleep(0.1) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import numpy | |
| import unittest | |
| from driver import Camera | |
| CAMERA_ID = os.getenv("CAMERA_ID") | |
| class TestCamera(unittest.TestCase): | |
| def setUp(self): | |
| if not CAMERA_ID: | |
| self.skipTest("CAMERA_ID not set") | |
| self.camera = Camera(CAMERA_ID) | |
| def test_available_camera_ids(self): | |
| ids = Camera.available_camera_ids() | |
| self.assertIsInstance(ids, list) | |
| self.assertGreater(len(ids), 0) | |
| def test_id(self): | |
| self.assertEqual(self.camera.id, CAMERA_ID) | |
| def test_name(self): | |
| name = self.camera.name | |
| self.assertIsInstance(name, str) | |
| self.assertGreater(len(name), 0) | |
| def test_model(self): | |
| model = self.camera.model | |
| self.assertIsInstance(model, str) | |
| self.assertGreater(len(model), 0) | |
| def test_serial(self): | |
| serial = self.camera.serial | |
| self.assertIsInstance(serial, str) | |
| self.assertGreater(len(serial), 0) | |
| def test_width(self): | |
| width = self.camera.width | |
| self.assertEqual(width, 1936) | |
| def test_height(self): | |
| height = self.camera.height | |
| self.assertEqual(height, 1216) | |
| def test_action_command_trigger(self): | |
| self.camera.configure(trigger_source="Line1") | |
| with self.camera as camera: | |
| camera.action_command_trigger() | |
| image = camera.retrieve_image() | |
| self.assertIsInstance(image, numpy.ndarray) | |
| def test_software_trigger(self): | |
| self.camera.configure(trigger_source="Software") | |
| with self.camera as camera: | |
| camera.software_trigger() | |
| image = self.camera.retrieve_image() | |
| self.assertIsInstance(image, numpy.ndarray) | |
| if __name__ == "__main__": | |
| unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment