diff --git a/pyaer/__init__.py b/pyaer/__init__.py
index 362cbd6..d014a35 100644
--- a/pyaer/__init__.py
+++ b/pyaer/__init__.py
@@ -6,7 +6,7 @@
LOG_LEVEL = log.DEBUG
try:
- from pyaer import libcaer_wrap as libcaer # noqa
+ from pyaer import libcaer_wrap as libcaer
except ImportError:
raise ImportError(
"libcaer might not be in the LD_LIBRARY_PATH "
@@ -14,3 +14,8 @@
"Try to load _libcaer_wrap.so from the package "
"directory, this will provide more information."
)
+
+from pyaer.event_camera import EventCamera # noqa # noreorder
+
+
+__all__ = ["libcaer", "EventCamera"]
diff --git a/pyaer/dvs128.py b/pyaer/dvs128.py
index fa4b8ff..afb2df8 100644
--- a/pyaer/dvs128.py
+++ b/pyaer/dvs128.py
@@ -1,9 +1,10 @@
from __future__ import annotations
+from typing import Any
+
import numpy as np
from pyaer import libcaer
-from pyaer import utils
from pyaer.device import USBDevice
from pyaer.filters import DVSNoise
@@ -11,51 +12,44 @@
class DVS128(USBDevice):
"""DVS128.
- # Arguments
- device_id: `int`
- a unique ID to identify the device from others.
- Will be used as the source for EventPackets being
- generate from its data.
- `default is 1`
- bus_number_restrict: `int`
- restrict the search for viable devices to only this USB
- bus number.
- `default is 0`
- dev_address_restrict: `int`
- restrict the search for viable devices to only this USB
- device address.
- `default is 0`
- serial_number: `str`
- restrict the search for viable devices to only devices which do
- possess the given Serial Number in their USB
- SerialNumber descriptor.
+ Args:
+ device_id: a unique ID to identify the device from others. Will be used as the
+ source for EventPackets being generate from its data. `default is 1`
+ bus_number_restrict: restrict the search for viable devices to only this USB
+ bus number. `default is 0`
+ dev_address_restrict: restrict the search for viable devices to only this USB
+ device address. `default is 0`
+ serial_number: restrict the search for viable devices to only devices which do
+ possess the given Serial Number in their USB SerialNumber descriptor.
`default is ""`
- noise_filter: `bool`
- if enable noise filter,
- `default is False`.
+ filter_noise: if enable noise filter, `default is False`.
"""
def __init__(
self,
- device_id=1,
- bus_number_restrict=0,
- dev_address_restrict=0,
- serial_number="",
- noise_filter=False,
- ):
- """DVS128."""
- super(DVS128, self).__init__()
- # open device
- self.open(device_id, bus_number_restrict, dev_address_restrict, serial_number)
- # get camera information
+ device_id: int = 1,
+ bus_number_restrict: int = 0,
+ dev_address_restrict: int = 0,
+ serial_number: str = "",
+ filter_noise: bool = False,
+ ) -> None:
+ super().__init__()
+ # Opens device.
+ self.open(
+ libcaer.CAER_DEVICE_DVS128,
+ device_id,
+ bus_number_restrict,
+ dev_address_restrict,
+ serial_number,
+ )
+ # Gets camera information.
self.obtain_device_info(self.handle)
- # noise filter
- self.filter_noise = noise_filter
- if noise_filter is True:
- self.noise_filter = DVSNoise(self.dvs_size_X, self.dvs_size_Y)
- else:
- self.noise_filter = None
+ # Sets noise filter
+ self.filter_noise = filter_noise
+ self.noise_filter = (
+ DVSNoise(self.dvs_size_X, self.dvs_size_Y) if filter_noise else None
+ )
# Bias configuration list
self.configs_list = [
@@ -73,39 +67,7 @@ def __init__(
("Pr", libcaer.DVS128_CONFIG_BIAS, libcaer.DVS128_CONFIG_BIAS_PR),
]
- def set_noise_filter(self, noise_filter):
- """Set noise filter.
-
- # Arguments
- noise_filter: `filters.DVSNoise`
- A valid `DVSNoise` object. This filter implements
- software-level background activity filter.
- """
- if noise_filter is not None:
- self.noise_filter = noise_filter
-
- def enable_noise_filter(self):
- """Enalbe DVS noise filter.
-
- This function enables the DVS noise filter. Note that this function will
- initialize a `DVSNoise` filter if there is None.
- """
- if self.filter_noise is False:
- self.filter_noise = True
-
- if self.noise_filter is None:
- self.noise_filter = DVSNoise(self.dvs_size_X, self.dvs_size_Y)
-
- def disable_noise_filter(self):
- """Disable noise filter.
-
- This method disable the noise filter. Note that this function doesn't destroy
- the existed noise filter. It simply switches off the function.
- """
- if self.filter_noise is True:
- self.filter_noise = False
-
- def obtain_device_info(self, handle):
+ def obtain_device_info(self, handle: Any) -> None:
"""Obtain DVS128 info.
This function collects the following information from the device:
@@ -119,7 +81,7 @@ def obtain_device_info(self, handle):
- Camera height
- Logic version
- # Arguments
+ Args:
handle: `caerDeviceHandle`
a valid device handle that can be used with the other
`libcaer` functions, or `None` on error.
@@ -127,7 +89,7 @@ def obtain_device_info(self, handle):
if handle is not None:
info = libcaer.caerDVS128InfoGet(handle)
- # port all info data field out
+ # Ports all info data field out
self.device_id = info.deviceID
self.device_serial_number = info.deviceSerialNumber
self.device_usb_bus_number = info.deviceUSBBusNumber
@@ -138,243 +100,55 @@ def obtain_device_info(self, handle):
self.dvs_size_X = info.dvsSizeX
self.dvs_size_Y = info.dvsSizeY
- def open(
- self,
- device_id=1,
- bus_number_restrict=0,
- dev_address_restrict=0,
- serial_number="",
- ):
- """Open device.
-
- # Arguments
- device_id: `int`
- a unique ID to identify the device from others.
- Will be used as the source for EventPackets being
- generate from its data.
- `default is 1`.
- bus_number_restrict: `int`
- restrict the search for viable devices to only this USB
- bus number.
- `default is 0`.
- dev_address_restrict: `int`
- restrict the search for viable devices to only this USB
- device address.
- `default is 0`.
- serial_number: `str`
- restrict the search for viable devices to only devices which do
- possess the given Serial Number in their USB
- SerialNumber descriptor.
- `default is ""`
- """
- super(DVS128, self).open(
- libcaer.CAER_DEVICE_DVS128,
- device_id,
- bus_number_restrict,
- dev_address_restrict,
- serial_number,
- )
-
- def set_bias_from_json(self, file_path, verbose=False):
- """Set bias from loading JSON configuration file.
-
- # Arguments
- file_path: `str`
- absolute path of the JSON bias file.
- verbose: `bool`
- optional debugging message.
- """
- bias_obj = utils.load_dvs_bias(file_path, verbose)
- self.set_bias(bias_obj)
-
- def set_bias(self, bias_obj):
- """Set bias from bias dictionary.
-
- # Arguments
- bias_obj: `dict`
- dictionary that contains DVS128 biases.
-
- # Returns
- flag: `bool`
- True if set successful, False otherwise.
- """
- for bias_name, module_address, parameter_address in self.configs_list:
- self.set_config(module_address, parameter_address, bias_obj[bias_name])
-
- # setting for noise filter
- if self.filter_noise is True:
- self.noise_filter.set_bias(bias_obj["noise_filter_configs"])
-
- def get_bias(self):
- """Get bias settings.
-
- # Returns
- bias_obj: `dict`
- dictionary that contains DVS128 current bias settings.
- """
- bias_obj = {}
-
- for bias_name, module_address, parameter_address in self.configs_list:
- bias_obj[bias_name] = self.get_config(module_address, parameter_address)
-
- # get noise filter configs
- if self.noise_filter is not None:
- bias_obj["noise_filter_configs"] = self.noise_filter.get_bias()
-
- return bias_obj
-
- def save_bias_to_json(self, file_path):
- """Save bias to JSON.
-
- # Arguments
- file_path: `str`
- the absolute path to the destiation.
-
- # Returns
- flag: `bool`
- returns True if success in writing, False otherwise.
- """
- bias_obj = self.get_bias()
- return utils.write_json(file_path, bias_obj)
-
- def start_data_stream(
- self, send_default_config=True, max_packet_size=None, max_packet_interval=None
- ):
- """Start streaming data.
-
- # Arguments
- send_default_config: `bool`
- send default config to the device before starting
- the data streaming.
- `default is True`
- max_packet_size: `int`
- set the maximum number of events any of a packet container's
- packets may hold before it's made available to the user.
- Set to zero to disable.
- The default is `None` (use default setting: 0).
- max_packet_interval: `int`
- set the time interval between subsequent packet containers.
- Must be at least 1 microsecond.
- The value is in microseconds, and is checked across all
- types of events contained in the EventPacketContainer.
- The default is `None` (use default setting: 10ms)
- """
- if send_default_config is True:
- self.send_default_config()
-
- if max_packet_size is not None:
- self.set_max_container_packet_size(max_packet_size)
- if max_packet_interval is not None:
- self.set_max_container_interval(max_packet_interval)
-
- self.data_start()
- self.set_data_exchange_blocking()
-
- def get_polarity_event(self, packet_header, noise_filter=False):
- """Get a packet of polarity event.
-
- # Arguments
- packet_header: `caerEventPacketHeader`
- the header that represents a event packet
- noise_filter: `bool`
- the background activity filter is applied if True.
-
- # Returns
- events: `numpy.ndarray`
- a 2-D array that has the shape of (N, 4) where N
- is the number of events in the event packet.
- Each row in the array represents a single polarity event.
- The first number is the timestamp.
- The second number is the X position of the event.
- The third number is the Y position of the event.
- The fourth number represents the polarity of the event
- (positive or negative).
- If the `noise_filter` option is set to `True`,
- this array has an additional column at the end.
- The last column represents the validity of the corresponding
- event. Filtered events will be marked as 0.
- num_events: `int`
- number of the polarity events available in the packet.
- """
- num_events, polarity = self.get_event_packet(
- packet_header, libcaer.POLARITY_EVENT
- )
-
- if noise_filter is True:
- polarity = self.noise_filter.apply(polarity)
-
- events = libcaer.get_filtered_polarity_event(
- polarity, num_events * 5
- ).reshape(num_events, 5)
- else:
- events = libcaer.get_polarity_event(polarity, num_events * 4).reshape(
- num_events, 4
- )
-
- return events, num_events
-
- def get_event(self, mode="events"):
+ def get_event( # type: ignore
+ self, mode: str = "events"
+ ) -> tuple[np.ndarray, int, np.ndarray, int] | None:
"""Get event.
- # Returns
- pol_events: `numpy.ndarray`
- a 2-D array that has the shape of (N, 4) where N
- is the number of events in the event packet.
- Each row in the array represents a single polarity event.
- The first number is the timestamp.
- The second number is the X position of the event.
- The third number is the Y position of the event.
- The fourth number represents the polarity of the event
- (positive or negative).
- If the `noise_filter` option is set to `True`,
- this array has an additional column at the end.
- The last column represents the validity of the corresponding
- event. Filtered events will be marked as 0.
- num_pol_events: `int`
- number of the polarity events available in the packet.
- special_events: `numpy.ndarray`
- a 2-D array that has the shape of (N, 2) where N
- is the number of events in the event packet.
- Each row in the array represents a single special event.
- The first value is the timestamp of the event.
+ Returns:
+ pol_events: a 2-D array of shape (N, 4) where N is the number of events in
+ the event packet. Each row in the array represents a single polarity
+ event. The first number is the timestamp. The second number is the X
+ position of the event. The third number is the Y position of the event.
+ The fourth number represents the polarity of the event (positive or
+ negative). If the `noise_filter` option is set to `True`, this array
+ has an additional column at the end. The last column represents the
+ validity of the corresponding event. Filtered events are marked as 0.
+ num_pol_events: number of the polarity events available in the packet.
+ special_events: a 2-D array that has the shape of (N, 2) where N is the
+ number of events in the event packet. Each row in the array represents
+ a single special event. The first value is the timestamp of the event.
The second value is the special event data.
- num_special_events: `int`
- number of the special events in the packet.
+ num_special_events: number of the special events in the packet.
"""
packet_container, packet_number = self.get_packet_container()
- if packet_container is not None:
+ if packet_container is not None and packet_number is not None:
num_pol_event = 0
num_special_event = 0
- pol_events = None
- special_events = None
+ pol_events = (
+ np.zeros((0, int(4 + self.filter_noise)), dtype=np.uint64)
+ if mode == "events"
+ else np.zeros((128, 128, 2), dtype=np.uint64)
+ )
+ special_events = np.zeros((0, 2), dtype=np.uint64)
for packet_id in range(packet_number):
packet_header, packet_type = self.get_packet_header(
packet_container, packet_id
)
if packet_type == libcaer.POLARITY_EVENT:
if mode == "events":
- events, num_events = self.get_polarity_event(
- packet_header, self.filter_noise
- )
- pol_events = (
- np.hstack((pol_events, events))
- if pol_events is not None
- else events
- )
+ events, num_events = self.get_polarity_event(packet_header)
+ pol_events = np.hstack((pol_events, events))
elif mode == "events_hist":
hist, num_events = self.get_polarity_hist(
packet_header, device_type="DVS128"
)
- pol_events = hist if pol_events is None else pol_events + hist
+ pol_events += hist
num_pol_event += num_events
elif packet_type == libcaer.SPECIAL_EVENT:
events, num_events = self.get_special_event(packet_header)
- special_events = (
- np.hstack((special_events, events))
- if special_events is not None
- else events
- )
+ special_events = np.hstack((special_events, events))
num_special_event += num_events
libcaer.caerEventPacketContainerFree(packet_container)
diff --git a/pyaer/event_camera.py b/pyaer/event_camera.py
new file mode 100644
index 0000000..7907a13
--- /dev/null
+++ b/pyaer/event_camera.py
@@ -0,0 +1,10 @@
+class EventCamera:
+ def __init__(self) -> None:
+ self.camera = None
+
+ def __enter__(self) -> None:
+ return self.camera
+
+ # context manager exist method with typing
+ def __exit__(self, exc_type, exc_value, traceback) -> None:
+ self.camera.shutdown()