Source code for brainaccess.core.eeg_manager

import enum
import ctypes
import asyncio
import threading
import copy

from multimethod import multimethod

from brainaccess.core import _dll
from brainaccess.core.battery_info import BatteryInfo
from brainaccess.core.full_battery_info import FullBatteryInfo
from brainaccess.core.device_info import DeviceInfo
from brainaccess.core.gain_mode import GainMode
from brainaccess.core.impedance_measurement_mode import ImpedanceMeasurementMode
from brainaccess.core.annotation import Annotation
from brainaccess.core.polarity import Polarity

# ctypes
# new_eeg_manager
_dll.ba_eeg_manager_new.argtypes = []
_dll.ba_eeg_manager_new.restype = ctypes.c_void_p
# destructor
_dll.ba_eeg_manager_free.argtypes = [ctypes.c_void_p]
_dll.ba_eeg_manager_free.restype = None
# connect(port)
_dll.ba_eeg_manager_connect.argtypes = [
    ctypes.CFUNCTYPE(None, ctypes.c_bool, ctypes.c_void_p),
_dll.ba_eeg_manager_connect.restype = None
# is_connected()
_dll.ba_eeg_manager_is_connected.argtypes = [ctypes.c_void_p]
_dll.ba_eeg_manager_is_connected.restype = ctypes.c_bool
# disconnect()
_dll.ba_eeg_manager_disconnect.argtypes = [ctypes.c_void_p]
_dll.ba_eeg_manager_disconnect.restype = None
# start_stream()
_dll.ba_eeg_manager_start_stream.argtypes = [
    ctypes.CFUNCTYPE(None, ctypes.c_void_p),
_dll.ba_eeg_manager_start_stream.restype = ctypes.c_uint8
# stop_stream()
_dll.ba_eeg_manager_stop_stream.argtypes = [
    ctypes.CFUNCTYPE(None, ctypes.c_void_p),
_dll.ba_eeg_manager_stop_stream.restype = ctypes.c_uint8
# is_streaming()
_dll.ba_eeg_manager_is_streaming.argtypes = [ctypes.c_void_p]
_dll.ba_eeg_manager_is_streaming.restype = ctypes.c_bool
# set_io()
_dll.ba_eeg_manager_set_io.argtypes = [
    ctypes.CFUNCTYPE(None, ctypes.c_void_p),
_dll.ba_eeg_manager_set_io.restype = ctypes.c_uint8
# get_battery_info()
_dll.ba_eeg_manager_get_battery_info.argtypes = [
_dll.ba_eeg_manager_get_battery_info.restype = BatteryInfo
# get_full_battery_info()
_dll.ba_eeg_manager_get_full_battery_info.argtypes = [
    ctypes.CFUNCTYPE(None, ctypes.POINTER(FullBatteryInfo), ctypes.c_void_p),
_dll.ba_eeg_manager_get_full_battery_info.restype = ctypes.c_uint8
# get_latency()
_dll.ba_eeg_manager_get_latency.argtypes = [
    ctypes.CFUNCTYPE(None, ctypes.c_float, ctypes.c_void_p),
_dll.ba_eeg_manager_get_latency.restype = ctypes.c_uint8
# set_channel_enabled()
_dll.ba_eeg_manager_set_channel_enabled.argtypes = [
_dll.ba_eeg_manager_set_channel_enabled.restype = None
# set_channel_gain()
_dll.ba_eeg_manager_set_channel_gain.argtypes = [
_dll.ba_eeg_manager_set_channel_gain.restype = None
# set_channel_bias()
_dll.ba_eeg_manager_set_channel_bias.argtypes = [
_dll.ba_eeg_manager_set_channel_bias.restype = None
# set_impedance_mode()
_dll.ba_eeg_manager_set_impedance_mode.argtypes = [
_dll.ba_eeg_manager_set_impedance_mode.restype = None
# get_device_info()
_dll.ba_eeg_manager_get_device_info.argtypes = [ctypes.c_void_p]
_dll.ba_eeg_manager_get_device_info.restype = ctypes.POINTER(DeviceInfo)
# get_channel_index()
_dll.ba_eeg_manager_get_channel_index.argtypes = [ctypes.c_void_p, ctypes.c_uint16]
_dll.ba_eeg_manager_get_channel_index.restype = ctypes.c_size_t
# get_sample_frequency()
_dll.ba_eeg_manager_get_sample_frequency.argtypes = [ctypes.c_void_p]
_dll.ba_eeg_manager_get_sample_frequency.restype = ctypes.c_uint16
# set_callback_chunk()
_dll.ba_eeg_manager_set_callback_chunk.argtypes = [
_dll.ba_eeg_manager_set_callback_chunk.restype = None
# set_callback_battery()
_dll.ba_eeg_manager_set_callback_battery.argtypes = [
    ctypes.CFUNCTYPE(None, ctypes.POINTER(BatteryInfo), ctypes.c_void_p),
_dll.ba_eeg_manager_set_callback_battery.restype = None
# set_callback_disconnect()
_dll.ba_eeg_manager_set_callback_disconnect.argtypes = [
    ctypes.CFUNCTYPE(None, ctypes.c_void_p),
_dll.ba_eeg_manager_set_callback_disconnect.restype = None
# annotate()
_dll.ba_eeg_manager_annotate.argtypes = [ctypes.c_void_p, ctypes.c_char_p]
_dll.ba_eeg_manager_annotate.restype = None
# get_annotations()
_dll.ba_eeg_manager_get_annotations.argtypes = [
_dll.ba_eeg_manager_get_annotations.restype = None
# clear_annotations()
_dll.ba_eeg_manager_clear_annotations.argtypes = [ctypes.c_void_p]
_dll.ba_eeg_manager_clear_annotations.restype = None

# Stream size type info super secret function thingy
_dll.ba_eeg_manager_get_stream_channel_data_types.argtypes = [
_dll.ba_eeg_manager_get_stream_channel_data_types.restype = None

_managers_mtx = threading.Lock()
_managers: dict = dict()

_types_map = [
    ctypes.POINTER(ctypes.c_float),  # 0
    ctypes.POINTER(ctypes.c_bool),   # 1
    ctypes.POINTER(ctypes.c_size_t), # 2
    ctypes.POINTER(ctypes.c_double), # 3

# TODO: look into the possibility of moving callbacks to async loop with call_soon_threadsafe()

    None, ctypes.POINTER(ctypes.c_void_p), ctypes.c_size_t, ctypes.c_void_p
def _callback_chunk(chunk_data, chunk_size, data):
    with _managers_mtx:
        mgr = _managers.get(data)
        if mgr != None:
            with mgr._callback_chunk_mtx:
                cbk = mgr._callback_chunk
                if cbk != None:
                    # Get channel sizes and type information
                    types_ptr = ctypes.POINTER(ctypes.c_uint8)()
                    types_size = ctypes.c_size_t()
                        data, ctypes.pointer(types_ptr), ctypes.pointer(types_size)
                    types = [_types_map[types_ptr[i]] for i in range(types_size.value)]
                    # Convert chunk data to Python list and pass to manager
                    chunk_list = [
                            ctypes.cast(chunk_data[i], types[i])[j]
                            for j in range(chunk_size)
                        for i in range(len(types))
                    cbk(chunk_list, chunk_size)

@ctypes.CFUNCTYPE(None, ctypes.POINTER(BatteryInfo), ctypes.c_void_p)
def _callback_battery(b_info, data):
    with _managers_mtx:
        mgr = _managers.get(data)
        if mgr != None:
            with mgr._callback_battery_mtx:
                cbk = mgr._callback_battery
                if cbk != None:

@ctypes.CFUNCTYPE(None, ctypes.c_void_p)
def _callback_disconnect(data):
    with _managers_mtx:
        mgr = _managers.get(data)
        if mgr != None:
            with mgr._future_map_mtx:
                mgr._future_index = 0
                del_list = []
                for k, v in mgr._future_map.items():
                    _, loop, future = v
                        future.set_exception, RuntimeError("Disconnected")
                for item in del_list:
                    del mgr._future_map[item]
            with mgr._callback_disconnect_mtx:
                cbk = mgr._callback_disconnect
                if cbk != None:

class _FutureStruct(ctypes.Structure):
    _fields_ = [("manager_ptr", ctypes.c_void_p), ("future_index", ctypes.c_size_t)]

def _handle_future(data, arg):
    my_data = ctypes.cast(data, ctypes.POINTER(_FutureStruct))[0]
    with _managers_mtx:
        mgr = _managers.get(my_data.manager_ptr)
        if mgr != None:
            with mgr._future_map_mtx:
                future_obj = mgr._future_map.get(my_data.future_index)
                if future_obj != None:
                    _, loop, future = future_obj
                    del mgr._future_map[my_data.future_index]
                    loop.call_soon_threadsafe(future.set_result, arg)

@ctypes.CFUNCTYPE(None, ctypes.c_void_p)
def _future_callback_void(data):
    _handle_future(data, None)

@ctypes.CFUNCTYPE(None, ctypes.c_bool, ctypes.c_void_p)
def _future_callback_bool(val, data):
    _handle_future(data, val)

@ctypes.CFUNCTYPE(None, ctypes.c_float, ctypes.c_void_p)
def _future_callback_float(val, data):
    _handle_future(data, val)

@ctypes.CFUNCTYPE(None, ctypes.POINTER(FullBatteryInfo), ctypes.c_void_p)
def _future_callback_full_battery_info(val, data):
    _handle_future(data, copy.copy(val[0]))

class _Error(enum.Enum):
    OK = 0
    UNKNOWN = 0xFF

def _get_error(val):
        return _Error(val)
    except ValueError:
        return _Error.UNKNOWN

def _handle_error(val):
    err = _get_error(val)
    if err == _Error.OK:
    elif err == _Error.CONNECTION:
        raise RuntimeError("Connection error")
    elif err == _Error.UNSUPPORTED_DEVICE:
        raise RuntimeError("Unsupported device")
        raise RuntimeError("Unknown error")

[docs]class EEGManager: """ The EEG manager is the primary tool for communicating with the BrainAccess device. Note that the EEG manager is not thread-safe. """ def __init__(self): """Creates an EEG Manager. Warning --------- Make sure the core library has been initialized first! """ self._manager = _dll.ba_eeg_manager_new() self._callback_chunk_mtx = threading.Lock() self._callback_battery_mtx = threading.Lock() self._callback_disconnect_mtx = threading.Lock() self._future_map_mtx = threading.Lock() self._future_map = {} self._future_index = 0 with _managers_mtx: _managers[self._manager] = self self._callback_disconnect = None _dll.ba_eeg_manager_set_callback_disconnect( self._manager, _callback_disconnect, self._manager ) def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self.destroy()
[docs] def destroy(self): """Destroys an EEG manager instance. Warning --------- Must be called exactly once, after the manager is no longer needed """ self.disconnect() # prevent callback deadlock by disconnecting first. with _managers_mtx: _dll.ba_eeg_manager_free(self._manager) del _managers[self._manager]
def _create_future(self): loop = asyncio.get_running_loop() future = loop.create_future() with self._future_map_mtx: index = self._future_index struct = _FutureStruct() struct.manager_ptr = self._manager struct.future_index = index self._future_map[index] = (struct, loop, future) self._future_index = (self._future_index + 1) % ( ctypes.c_size_t(-1).value + 1 ) return future, ctypes.pointer(struct)
[docs] def connect(self, port: str): """Connects to a device via COM port and attempts to initialize it. Note ----- This function runs asynchronously. Parameters ---------- port: str `COMx` on Windows and `/dev/rfcommX` on Linux decided upon connecting device to Bluetooth Returns ------- future: asyncio.Future await future to complete connecting """ future, struct_ptr = self._create_future() _dll.ba_eeg_manager_connect( self._manager, port.encode("ascii"), _future_callback_bool, struct_ptr, ) return future
[docs] def is_connected(self): """Checks if the EEGManager is currently connected to an EEG device Returns ------- bool True if connected, False otherwise """ return _dll.ba_eeg_manager_is_connected(self._manager)
[docs] def disconnect(self): """Disconnects the EEGManager from the EEG device, if connected""" _dll.ba_eeg_manager_disconnect(self._manager)
[docs] def start_stream(self): """Starts streaming data from the device Note ----- This function runs asynchronously. Warning -------- You must not call this function twice without stopping the stream in between. Returns ------- future: asyncio.Future awaiting future starts stream """ future, struct_ptr = self._create_future() _handle_error( _dll.ba_eeg_manager_start_stream( self._manager, _future_callback_void, struct_ptr, ) ) return future
[docs] def stop_stream(self): """Stops streaming data from the device Note ----- This function runs asynchronously. Warning ------- You must not call this function twice without starting the stream in between. You must not call this function while the stream is not running. Calling this function resets all stream settings. If you want to stream again afterwards, you must re-enable all the channels, biases, gains, and impedance measurement mode that you set previously. Returns ------- future: asyncio.Future awaiting future stops stream """ future, struct_ptr = self._create_future() _handle_error( _dll.ba_eeg_manager_stop_stream( self._manager, _future_callback_void, struct_ptr, ) ) return future
[docs] def is_streaming(self): """Checks if the device is streaming Returns ------- bool True if the stream is active, False otherwise """ return _dll.ba_eeg_manager_is_streaming(self._manager)
[docs] def set_io(self, pin: int, state: bool): """Digital pin control The digital input pin, which by default is pulled high but can be pulled low by an external sensor, can also be pulled low by the device itself. By default, upon powering up or connecting/disconnecting the device, the digital input pin is pulled high. This can be useful, for example, in case you want to synchronize devices: connect device A and B's digital inputs, start both streams, then set A's digital input to pull low, which also pulls B's input with it. The falling edge can be recorded from both streams, and the data can then be aligned accordingly. This can also be used for low-speed communication with external devices, controlling LEDs via a mosfet, etc. Note ----- This function runs asynchronously. Parameters ---------- pin: int Number of digital input pin of the EEG device to set the IO state of (starting from 0) state: bool True to pull high, False to pull to ground Returns ------- future: asyncio.Future awaiting future sets digital pin state """ future, struct_ptr = self._create_future() _handle_error( _dll.ba_eeg_manager_set_io( self._manager, ctypes.c_uint8(pin), ctypes.c_bool(state), _future_callback_void, struct_ptr, ) ) return future
[docs] def get_battery_info(self): """Returns a structure containing standard battery information from the device Returns ------- BatteryInfo Battery information from the EEG device """ return _dll.ba_eeg_manager_get_battery_info(self._manager)
[docs] def get_full_battery_info(self): """ Returns a structure containing extended battery info from the device Note ----- This function runs asynchronously. Returns ------- future: asyncio.Future awaiting future returns FullBatteryInfo """ future, struct_ptr = self._create_future() _handle_error( _dll.ba_eeg_manager_get_full_battery_info( self._manager, _future_callback_full_battery_info, struct_ptr, ) ) return future
[docs] def get_latency(self): """Measure approximate communication latency with the device Note ----- This function runs asynchronously. Returns ------- future: asyncio.Future awaiting future returns number of seconds (float) """ future, struct_ptr = self._create_future() _handle_error( _dll.ba_eeg_manager_get_latency( self._manager, _future_callback_float, struct_ptr, ) ) return future
[docs] def set_channel_enabled(self, channel: int, state: bool): """ Enables or disables the channel on the device Warning --------- Enabled channels are reset by stream stop. Must be called with the appropriate arguments before every stream start Parameters ------------- channel: int Channel ID (brainaccess.core.eeg_channel) to enable/disable. state: bool True to enable channel, False to disable. """ _dll.ba_eeg_manager_set_channel_enabled( self._manager, ctypes.c_uint16(channel), ctypes.c_bool(state) )
[docs] def set_channel_gain(self, channel: int, gain: GainMode): """ Changes gain mode for a channel on the device. Setting gain values to lower will increase the measured voltage range, but would decrease the amplitude resolution, 12 is the optimum in most cases. Warning ------ This function takes effect on stream start, and its effects are reset by stream stop. Therefore, it must be called with the appropriate arguments before every stream start. This only affects channels that support it. For example, it affects the electrode measurement channels but not sample number or digital input. Parameters ----------- channel: int Channel ID (brainaccess.core.eeg_channel) whose gain to modify. gain: GainMode Gain mode. Default X12 """ _dll.ba_eeg_manager_set_channel_gain( self._manager, ctypes.c_uint16(channel), ctypes.c_uint8(gain.value) )
@multimethod def set_channel_bias(self, channel: int, bias: bool): """ DEPRECATED: use the version with Polarity instead. Set an electrode channel as a bias electrode Essentially the signals of these channels are inverted and injected into the bias channel/electrode. This helps in reducing common mode noise such as noise coming from the mains. Only select channels for bias feedback that have good contact with a skin. Typically one channel is sufficient for bias feedback to work effectively. Warning -------- This function takes effect on stream start, and its effects are reset by stream stop. Therefore, it must be called with the appropriate arguments before every stream start. Parameters ------------ channel: int Channel ID (brainaccess.core.eeg_channel) to set/unset as bias channel bias: bool True to enable channel, False to disable. """ return self.set_channel_bias(channel, Polarity.BOTH if bias else Polarity.NONE)
[docs] @multimethod def set_channel_bias(self, channel: int, p: Polarity): """Set an electrode channel as a bias electrode Essentially the signals of these channels are inverted and injected into the bias channel/electrode. This helps in reducing common mode noise such as noise coming from the mains. Only select channels for bias feedback that have good contact with a skin. Typically one channel is sufficient for bias feedback to work effectively. Warning -------- This function takes effect on stream start, and its effects are reset by stream stop. Therefore, it must be called with the appropriate arguments before every stream start. Parameters ------------ channel: int Channel ID (brainaccess.core.eeg_channel) to set/unset as bias channel p: Polarity Which side of the electrode to use (if device is not bipolar, use BOTH) """ _dll.ba_eeg_manager_set_channel_bias( self._manager, ctypes.c_uint16(channel), ctypes.c_uint8(p.value) )
[docs] def set_impedance_mode(self, mode: ImpedanceMeasurementMode): """Sets impedance measurement mode This function setups device for electrode impedance measurement. It injects a 7nA certain frequency current through the bias electrodes to measurement electrodes. Voltage recordings from each channel can then be used to calculate the impedance for each electrode: Impedance = Vpp/7nA Warning --------- This function takes effect on stream start, and its effects are reset by stream stop. Therefore, it must be called with the appropriate arguments before every stream start. Parameters ----------- mode: ImpedanceMeasurementMode Impedance mode to set """ _dll.ba_eeg_manager_set_impedance_mode( self._manager, ctypes.c_uint8(mode.value) )
[docs] def get_device_info(self): """Get device information Warning ---------- Must not be called unless device connection is successful Returns ------- DeviceInfo device model, version, firmware version and buffer size """ return _dll.ba_eeg_manager_get_device_info(self._manager)
[docs] def get_channel_index(self, channel: int): """Gets the index of a channel's data into the chunk Get the index into the array provided by the chunk callback that contains the data of the channel number specified Parameters ------------ channel: int The number of the channel whose index to get Returns --------- int Index into chunk representing a channel """ val = _dll.ba_eeg_manager_get_channel_index( self._manager, ctypes.c_uint16(channel) ) if val == ctypes.c_size_t(-1).value: raise IndexError("Channel does not exist or is not currently streaming") return val
[docs] def get_sample_frequency(self): """Get device sampling frequency Returns ------- int Sample frequency (Hz) """ return _dll.ba_eeg_manager_get_sample_frequency(self._manager)
[docs] def set_callback_chunk(self, f): """Sets a callback to be called every time a chunk is available Warning ------- The callback may or may not run in the reader thread, and as such, synchronization must be used to avoid race conditions, and the callback itself must be as short as possible to avoid blocking communication with the device. Parameters ------------ f callback Function to be called every time a chunk is available Set to null to disable. """ with self._callback_chunk_mtx: self._callback_chunk = f _dll.ba_eeg_manager_set_callback_chunk( self._manager, _callback_chunk if f != None else None, self._manager )
[docs] def set_callback_battery(self, f): """Sets a callback to be called every time the battery status is updated Warning --------- The callback may or may not run in the reader thread, and as such, synchronization must be used to avoid race conditions, and the callback itself must be as short as possible to avoid blocking communication with the device. Parameters ---------- f pass callback Function to be called every time a battery update is available Set to null to disable. """ with self._callback_battery_mtx: self._callback_battery = f _dll.ba_eeg_manager_set_callback_battery( self._manager, _callback_battery if f != None else None, self._manager )
[docs] def set_callback_disconnect(self, f): """ Sets a callback to be called every time the device disconnects Warning --------- The callback may or may not run in the reader thread, and as such, synchronization must be used to avoid race conditions, and the callback itself must be as short as possible to avoid blocking communication with the device. Parameters ---------- f callback Function to be called every time the device disconnects. Set to null to disable. """ with self._callback_disconnect_mtx: self._callback_disconnect = f """_dll.ba_eeg_manager_set_callback_disconnect( self._manager, _callback_disconnect if f != None else None, self._manager )"""
[docs] def annotate(self, annotation: str): """ Adds an annotation at the current time Warning --------- Annotations are cleared on disconnect Parameters ---------- annotation: str annotation text """ _dll.ba_eeg_manager_annotate( self._manager, ctypes.c_char_p(annotation.encode("ascii")) )
[docs] def get_annotations(self): """Retrieve all the accumulated annotations Warning --------- Annotations are cleared on disconnect Returns ------- list list of annotations """ ae = ctypes.POINTER(Annotation)() size = ctypes.c_size_t() _dll.ba_eeg_manager_get_annotations( self._manager, ctypes.pointer(ae), ctypes.pointer(size) ) return [ae[i] for i in range(size.value)]
[docs] def clear_annotations(self): """Clears annotations""" _dll.ba_eeg_manager_clear_annotations(self._manager)