348 lines
11 KiB
Python
348 lines
11 KiB
Python
#! /usr/bin/python
|
|
"""usage serialEventHanlder.py -h -c <name> -d/--debug= <level> -p/--port= <serial port> <path/>in_file.xml
|
|
in_file - input xml-file describing what knobs and/or button are on the pendant
|
|
-c <name> # name of component in HAL. 'my-mpg' default
|
|
-d/--debug= <level> # debug level, default 0
|
|
-p/--port= <serial port> # serial port to use. '/dev/ttyUSB0' default
|
|
-h # Help
|
|
python serialEventHandler.py -w mpg_pendant/config/mpg.xml
|
|
"""
|
|
|
|
### https://docs.python.org/2/library/xml.etree.elementtree.html
|
|
|
|
import time
|
|
import getopt
|
|
import sys
|
|
import comms
|
|
import xml.etree.ElementTree as ET
|
|
import hal
|
|
from collections import namedtuple
|
|
|
|
class Pin:
|
|
""" General representation of a Pin and it's data"""
|
|
def __init__(self, name, type, observer = None):
|
|
self.name = name # HAL pin name
|
|
self.val = 0 # current value of pin, e.g. 1 - on, 0 - off
|
|
self.type = type # type (string read from xml)
|
|
self.observer = None
|
|
|
|
if observer != None:
|
|
self.attach(observer)
|
|
|
|
def __repr__(self):
|
|
return 'pin name: ' + self.name + '\tval: ' + str(self.val) + '\ttype: ' + self.type
|
|
|
|
def attach(self, observer):
|
|
self.observer = observer
|
|
|
|
def _notify(self):
|
|
if self.observer != None:
|
|
self.observer.update(self.name, self.val)
|
|
|
|
def update_hal(self, v):
|
|
""" to be overriden in child-class"""
|
|
pass
|
|
|
|
def set(self, v):
|
|
pass
|
|
|
|
def _type_saturate(self, type, val):
|
|
""" helper function to convert type read from xml to HAL-type """
|
|
retVal = 0
|
|
|
|
if type == 'bit':
|
|
if val >= 1:
|
|
retVal = 1
|
|
|
|
if type == 'float':
|
|
retVal = val
|
|
|
|
if type == 's32':
|
|
retVal = val
|
|
|
|
if type == 'u32':
|
|
retVal = val
|
|
|
|
return retVal
|
|
|
|
def _get_hal_type(self, str):
|
|
""" helper function to convert type read from xml to HAL-type """
|
|
retVal = ''
|
|
|
|
if str == 'bit':
|
|
retVal = hal.HAL_BIT
|
|
|
|
if str == 'float':
|
|
retVal = hal.HAL_FLOAT
|
|
|
|
if str == 's32':
|
|
retVal = hal.HAL_S32
|
|
|
|
if str == 'u32':
|
|
retVal = hal.HAL_U32
|
|
|
|
return retVal
|
|
|
|
class InPin(Pin):
|
|
""" Specialization of Pin-class"""
|
|
def __init__(self, hal_ref, name, type, observer = None):
|
|
Pin.__init__(self, name, type, observer)
|
|
|
|
hal_ref.newpin(name, self._get_hal_type(type), hal.HAL_IN) # create the user space HAL-pin
|
|
|
|
def __repr__(self):
|
|
return 'Input pin ' + Pin.__repr__(self)
|
|
|
|
def update_hal(self, hal):
|
|
if self.val != hal[self.name]:
|
|
self.val = hal[self.name]
|
|
self._notify()
|
|
|
|
class OutPin(Pin):
|
|
""" Specialization of Pin-class"""
|
|
def __init__(self, hal_ref, name, type, observer = None):
|
|
Pin.__init__(self, name, type, observer)
|
|
|
|
hal_ref.newpin(name, self._get_hal_type(type), hal.HAL_OUT) # create the user space HAL-pin
|
|
|
|
def __repr__(self):
|
|
return 'Output pin ' + Pin.__repr__(self)
|
|
|
|
def update_hal(self, hal):
|
|
hal[self.name] = self.val
|
|
|
|
def set(self, v):
|
|
try:
|
|
self.val = self._type_saturate(self.type, int(v))
|
|
except ValueError:
|
|
print('OutPin::set() value error catched on: ' + self.name)
|
|
|
|
class Observer:
|
|
""" container for notification-function """
|
|
def __init__(self, update_cb):
|
|
self.update_cb = update_cb
|
|
|
|
def update(self, name, val):
|
|
#pass
|
|
try:
|
|
#print 'observer::update name: ' + name + ' val: ' + str(val)
|
|
self.update_cb(name, val)
|
|
except ValueError:
|
|
print('Observer::notify() value error catched on: ' + self.name)
|
|
|
|
class HALComponentWrapper:
|
|
def __init__(self, name):
|
|
self.pin_dict = {} # dictionary used to map event to pin
|
|
self.hal = hal.component(name) # instanciate the HAL-component
|
|
self.observer = None
|
|
|
|
def __repr__(self):
|
|
tmp_str = ''
|
|
for k in self.pin_dict:
|
|
tmp_str += 'event: ' + k + '\t' + str(self.pin_dict[k]) + '\n'
|
|
return tmp_str
|
|
|
|
def __getitem__(self, key):
|
|
if key in self.pin_dict:
|
|
return self.pin_dict[key].val
|
|
|
|
def __setitem__(self, key, val):
|
|
self.set_pin(key, val)
|
|
|
|
def add_pin(self, event_name, hal_name, type, direction = 'out'):
|
|
self.pin_dict[event_name] = self._createPin(hal_name, type, direction)
|
|
|
|
def event_set_pin(self, event):
|
|
""" updates pin value with new data
|
|
input: pin name, set value'
|
|
output: nothing. """
|
|
if event.name in self.pin_dict:
|
|
self.pin_dict[event.name].set(event.data)
|
|
|
|
def set_pin(self, key, value):
|
|
""" updates pin value with new data
|
|
input: event name, set value'
|
|
output: nothing. """
|
|
if key in self.pin_dict:
|
|
self.pin_dict[key].set(value)
|
|
|
|
def setReady(self):
|
|
self.hal.ready()
|
|
|
|
def update_hal(self):
|
|
for key in self.pin_dict:
|
|
self.pin_dict[key].update_hal(self.hal)
|
|
|
|
def attach(self, observer):
|
|
self.observer = observer
|
|
|
|
def _createPin(self, hal_name, type, direction):
|
|
""" factory function to create pin"""
|
|
if direction == 'in':
|
|
return InPin(self.hal, hal_name, type, Observer(self.notify))
|
|
|
|
if direction == 'out':
|
|
return OutPin(self.hal, hal_name, type)
|
|
|
|
def notify(self, hal_name, val):
|
|
# convert pin-name to event-name
|
|
for key in self.pin_dict:
|
|
if self.pin_dict[key].name == hal_name and self.observer != None:
|
|
self.observer.update(key, val)
|
|
|
|
class OptParser:
|
|
def __init__(self, argv):
|
|
self.xml_file = '' # input xml-file describing what knobs and/or button are on the pendant
|
|
self.name = 'my-mpg' # default name of component in HAL
|
|
self.port = '/dev/ttyUSB0' # default serial port to use
|
|
self.watchdog_reset = False
|
|
|
|
self._get_options(argv)
|
|
|
|
def __repr__(self):
|
|
return 'xml_file: ' + self.xml_file + '\tname: ' + self.name + '\tport: ' + self.port
|
|
|
|
def _get_options(self, argv):
|
|
try:
|
|
opts, args = getopt.getopt(argv, "hwp:c:", ["input=", "port="])
|
|
except getopt.GetoptError as err:
|
|
# print help information and exit:
|
|
print(err) # will print something like "option -a not recognized"
|
|
sys.exit(2)
|
|
|
|
### parse input command line
|
|
for o, a in opts:
|
|
if o == "-h":
|
|
self._usage()
|
|
sys.exit()
|
|
if o == "-c":
|
|
self.name = a
|
|
elif o == "--input":
|
|
self.xml_file = a
|
|
elif o in ("-p", "--port"):
|
|
self.port = a
|
|
elif o == "-w":
|
|
self.watchdog_reset = True
|
|
else:
|
|
print(o, a)
|
|
assert False, "unhandled option"
|
|
|
|
if self.xml_file == '':
|
|
if len(sys.argv) < 2:
|
|
self._usage()
|
|
sys.exit(2)
|
|
else:
|
|
self.xml_file = argv[-1]
|
|
|
|
def get_name(self):
|
|
return self.name
|
|
|
|
def get_port(self):
|
|
return self.port
|
|
|
|
def get_XML_file(self):
|
|
return self.xml_file
|
|
|
|
def get_watchdog_reset(self):
|
|
return self.watchdog_reset
|
|
|
|
def _usage(self):
|
|
""" print command line options """
|
|
print("usage serialEventHandler.py -h -c <name> -d/--debug=<level> -p/--port= <serial port> <path/>in_file.xml\n"\
|
|
"in_file - input xml-file describing what knobs and/or button are on the pendant\n"\
|
|
"-c <name> # name of component in HAL. 'mpg' default\n"\
|
|
"-p/--port= <serial port> # default serial port to use. '/dev/ttyS2' default\n"\
|
|
"-w # start watchdog deamon" \
|
|
"-h # Help test")
|
|
|
|
""" Parser data container"""
|
|
HalPin = namedtuple("HalPin", ['name', 'event', 'type', 'direction'])
|
|
|
|
class XmlParser:
|
|
def __init__(self, f):
|
|
self.tree = []
|
|
self.parsed_data = [] #array of named tuples (HalPin)
|
|
|
|
self._parse_file(f)
|
|
|
|
def __repr__(self):
|
|
tmp_str = ''
|
|
|
|
for element in self.parsed_data:
|
|
tmp_str += 'name: ' + element.name + '\t' + 'event: ' + element.event + '\t' + 'type: ' + element.type + '\n'
|
|
return tmp_str
|
|
|
|
def get_parsed_data(self):
|
|
return self.parsed_data
|
|
|
|
def _parse_file(self, f):
|
|
self.tree = ET.parse(f)
|
|
root = self.tree.getroot()
|
|
|
|
for halpin in root.iter('halpin'):
|
|
name = halpin.text.strip('"')
|
|
type = 'u32' if halpin.find('type') is None else halpin.find('type').text
|
|
event = name if halpin.find('event') is None else halpin.find('event').text
|
|
direction = 'out' if halpin.find('direction') is None else halpin.find('direction').text
|
|
|
|
if self._check_supported_HAL_type(type) and self._check_supported_HAL_direction(direction):
|
|
self.parsed_data.append(HalPin(name, event, type, direction))
|
|
|
|
def _check_supported_HAL_type(self, str):
|
|
""" helper function to check if type is supported """
|
|
retVal = False
|
|
|
|
if str == 'bit' or str == 'float' or str == 's32' or str == 'u32':
|
|
retVal = True
|
|
|
|
return retVal
|
|
|
|
def _check_supported_HAL_direction(self, str):
|
|
""" helper function to check if direction is supported """
|
|
retVal = False
|
|
|
|
if str == 'in' or str == 'out':
|
|
retVal = True
|
|
|
|
return retVal
|
|
|
|
################################################
|
|
def main():
|
|
optParser = OptParser(sys.argv[1:])
|
|
componentName = optParser.get_name()
|
|
portName = optParser.get_port()
|
|
xmlFile = optParser.get_XML_file()
|
|
watchdogEnabled = optParser.get_watchdog_reset()
|
|
print(optParser)
|
|
|
|
xmlParser = XmlParser(xmlFile)
|
|
|
|
c = HALComponentWrapper(componentName) #HAL adaptor, takes care of mapping incomming events to actual hal-pin
|
|
serialEventGenerator = comms.instrument(portName, c.event_set_pin, watchdogEnabled, 5, 1) #serial adaptor
|
|
c.attach(Observer(serialEventGenerator.generateEvent))
|
|
|
|
# add/create the HAL-pins from parsed xml and attach them to the adaptor event handler
|
|
parsed_data = xmlParser.get_parsed_data()
|
|
for pin in parsed_data:
|
|
c.add_pin(pin.event, pin.name, pin.type, pin.direction)
|
|
|
|
print(c)
|
|
|
|
# ready signal to HAL, component and it's pins are ready created
|
|
c.setReady()
|
|
|
|
time.sleep(0.5)
|
|
|
|
try:
|
|
while 1:
|
|
serialEventGenerator.readMessages() #blocks until '\n' received or timeout
|
|
c.update_hal()
|
|
|
|
time.sleep(0.1)
|
|
|
|
except KeyboardInterrupt:
|
|
raise SystemExit
|
|
|
|
if __name__ == '__main__':
|
|
main()
|