review my pull request. this files has changed: RPi/fake_smbus.py dro.py test_model.py todo.txt test_model.py passes all tests, here is the result: ´´´bash smbus2 not available; using fake smbus2.py ........ ---------------------------------------------------------------------- Ran 8 tests in 0.000s OK ´´´ here is the diff between my version of RPi/fake_smbus.py and main: ´´´diff +import tkinter as tk + +encoders = [] +class SMBus: + def __init__(self, bus): + print(f"fake SMBus initialized on bus {bus}") + self.bus = bus + + + def read_i2c_block_data(self, addr, cmd, length): + data = [] + for encoder in encoders: + pos_as_byte_array = encoder.get_position().to_bytes(2, 'little', signed=True) + data.append(pos_as_byte_array[0]) + data.append(pos_as_byte_array[1]) + + #print(f"SMBus data set to: {data}") + return data + +def createTestEncoder(): + encoder = TestEncoder() + encoders.append(encoder) + return encoder + +class TestEncoder: + """ represent a test encoder controlled by keyboard input """ + def __init__(self): + self._position = 0 + + def _rotate_cw(self, event=None): + self._position += 1 + + def _rotate_ccw(self, event=None): + self._position -= 1 + + def get_position(self) -> int: + return self._position ´´´ --- here is the diff between my version of dro.py and main: ´´´diff diff --git a/dro.py b/dro.py index 5071466..6e3b919 100755 --- a/dro.py +++ b/dro.py @@ -1,396 +1,370 @@ #/usr/bin/python3 -import math -import encoder -import threading -import time import tkinter as tk from tkinter import simpledialog from enum import IntEnum -import RPi.GPIO as GPIO import atexit import subprocess import getopt import sys import shutil +import logging + +logger = logging.getLogger(__name__) + +# Prefer system-installed `smbus2`, otherwise fall back to local `smbus2.py`. +try: + import smbus2 +except ImportError: + import RPi.fake_smbus as smbus2 # type: ignore + logger.warning('smbus2 not available; using fake smbus2.py') class XMode(IntEnum): RADIUS = 0 DIAMETER = 1 class Updated(IntEnum): POS_X = 0 POS_Z = 1 X_MODE = 2 FULLSCREEN = 3 -X_DIRECTION_PIN = 13 #gpio 23 -X_CLOCK_PIN = 11 #gpio 24 -Z_DIRECTION_PIN = 18 #gpio 17 -Z_CLOCK_PIN = 16 #gpio 27 - class Axis: """ represent one axis in the model """ - def __init__(self, name): - self._position = 0 - self._scale = 1 + def __init__(self, name: str): + self._position = 0 # raw position from encoder + self._scale = 1.0 + self._offset = 0.0 self._name = name - def set_position(self, pos): + def get_raw_position(self): + """Return the uncompensated raw position (no offset applied).""" + return self._position + + def set_position(self, pos: int): self._position = pos - def get_position(self): - return self._position + def set_offset(self, offset: float): + self._offset = offset - def get_position_as_string(self): - return str("{:.2f}".format(self._position)) + def get_position(self): + return self._position + self._offset - def set_scale(self, scale): - self._scale = scale + def set_scale(self, scale: float): + self._scale = scale - def update_position(self, direction): - self._position += direction * self._scale + def get_scale(self) -> float: + """Return the scale factor (mm per step).""" + return self._scale class Model: """ represent the DRO model """ def __init__(self, fullscreen = False): self._observers = [] self._x_mode = XMode.RADIUS # denote x axis mode (radius/diameter) self._axis = {} self._axis['x'] = Axis('x') self._axis['z'] = Axis('z') self._fullscreen = fullscreen def attatch(self, o): self._observers.append(o) def notify(self, updated = None): for o in self._observers: o.update(updated) - def set_position(self, pos, axis): + def steps_to_position(self, axis, steps): if axis in self._axis: - factor = 0.5 if axis == 'x' and self.get_x_mode() == XMode.DIAMETER else 1 - self._axis[axis].set_position(factor*pos) + return steps * self._axis[axis].get_scale() + return 0 - self.notify(self._string_to_updated_axis(axis)) + def set_position(self, axis, pos): + """ set position in mm. Always radius for x axis """ + if axis in self._axis: + if abs(self._axis[axis].get_position() - pos) >= 0.005: + self._axis[axis].set_position(pos) + self.notify(self._string_to_updated_axis(axis)) - def get_position_as_string(self, axis): + def set_offset(self, axis, offset): + """ set offset in mm. Always radius for x axis """ if axis in self._axis: - return str("{:.2f}".format(self.get_position(axis))) - return None + self._axis[axis].set_offset(offset) + self.notify(self._string_to_updated_axis(axis)) def get_position(self, axis): if axis in self._axis: - factor = 2 if axis == 'x' and self.get_x_mode() == XMode.DIAMETER else 1 - return factor*self._axis[axis].get_position() + return self._axis[axis].get_position() return None - def update_position(self, direction, axis): - """ update position of given axis and notify observers """ + def get_position_uncompensated(self, axis): + """ get position without offset applied """ if axis in self._axis: - self._axis[axis].update_position(direction) - - self.notify(self._string_to_updated_axis(axis)) + return self._axis[axis].get_raw_position() + return None def get_x_mode(self): return self._x_mode def set_toggle_x_mode(self): if self._x_mode == XMode.RADIUS: self._x_mode = XMode.DIAMETER - elif self._x_mode == XMode.DIAMETER: self._x_mode = XMode.RADIUS self.notify(Updated.X_MODE) def set_scale(self, axis, scale): if axis in self._axis: self._axis[axis].set_scale(scale) def set_fullscreen(self, v): self._fullscreen = v self.notify(Updated.FULLSCREEN) def is_fullscreen(self): return self._fullscreen def _string_to_updated_axis(self, axis_str): if axis_str == 'x': return Updated.POS_X if axis_str == 'z': return Updated.POS_Z return None class Controller: """ represent the controller in the MVC pattern """ def __init__(self, model, test_mode = False): self._model = model self._test_mode = test_mode - def handle_x_position_update(self, dir): - self._model.update_position(dir, 'x') + self._bus = smbus2.SMBus(1) + self._address = 0x08 # Arduino I2C Address + + def handle_x_position_update(self, steps): + self._model.set_position('x', self._model.steps_to_position('x', steps)) - def handle_z_position_update(self, dir): - self._model.update_position(dir, 'z') + def handle_z_position_update(self, steps): + self._model.set_position('z', self._model.steps_to_position('z', steps)) def handle_btn_x0_press(self): - self._model.set_position(0, 'x') + self._model.set_offset('x', -self._model.get_position_uncompensated('x')) def handle_btn_z0_press(self): - self._model.set_position(0, 'z') + self._model.set_offset('z', -self._model.get_position_uncompensated('z')) def handle_btn_toggle_x_mode(self): self._model.set_toggle_x_mode() def hanlde_btn_x(self): title = 'current radius' if self._model.get_x_mode() == XMode.RADIUS else 'current diameter' pos = simpledialog.askfloat('x', title, initialvalue=0.0, minvalue=-4000.0, maxvalue=4000.0) + + if self._model.get_x_mode() == XMode.DIAMETER and pos is not None: + pos = pos / 2.0 + if pos is not None: - self._model.set_position(pos, 'x') + self._model.set_offset('x', pos-self._model.get_position_uncompensated('x')) def hanlde_btn_z(self): pos = simpledialog.askfloat('z', 'current z position', initialvalue=0.0, minvalue=-4000.0, maxvalue=4000.0) if pos is not None: - self._model.set_position(pos, 'z') + self._model.set_offset('z', pos-self._model.get_position_uncompensated('z')) def toggle_fullscreen(self, event=None): if self._model.is_fullscreen() == True: self._model.set_fullscreen(False) else: self._model.set_fullscreen(True) def end_fullscreen(self, event=None): self._model.set_fullscreen(False) def handle_btn_calc(self): calculator_cmd = shutil.which('galculator') if calculator_cmd != None: try: subprocess.Popen(calculator_cmd) except: print("can't start calculator") else: print("can't find calculator") def shutdown(self): if not self._test_mode: subprocess.call('sudo shutdown -h now', shell=True) else: print('will not shutdown in test mode') + def poll_i2c_data(self): + """ poll I2C data from Arduino """ + try: + data = self._bus.read_i2c_block_data(self._address, 0, 4) + #self.handle_x_position_update(int(data[0] + (data[1] << 8))) + self.handle_x_position_update(int.from_bytes(bytes(data[0:2]), 'little', signed=True)) + self.handle_z_position_update(int.from_bytes(bytes(data[2:4]), 'little', signed=True)) + except Exception as e: + logger.error(f"I2C read error: {e}") + class View: """ represent the view in the MVC pattern """ def __init__(self, model, test_mode = False): self._model = model self._model.attatch(self) self._controller = Controller(model, test_mode) - self.x_dirty = False - self.z_dirty = False - - # use tkinter's after() loop on the main thread instead of a background thread - # This avoids cross-thread UI calls and reduces context switching overhead. self.window = tk.Tk() self.window.resizable(False, False) #self.window.attributes('-zoomed', zoomed) # This just maximizes it so we can see the window. It's nothing to do with fullscreen. self.window.attributes("-fullscreen", self._model.is_fullscreen()) w_frame = tk.Frame(self.window) w_frame.grid(row=0, column=0, sticky='n') e_frame = tk.Frame(self.window) e_frame.grid(row=0, column=1, sticky='n') # x frame x_frame = tk.Frame(w_frame, relief=tk.GROOVE, borderwidth=3) x_frame.grid(row=0, column=0, padx=5, pady=5) # use a StringVar for quicker updates from the main thread - self.x_var = tk.StringVar(value=self._model.get_position_as_string('x')) + self.x_var = tk.StringVar(value=self._position_to_string(self._model.get_position('x'))) self.x_label = tk.Label(x_frame, textvariable=self.x_var, anchor="e", font=("Helvetica", 80), fg="green", bg="black", width=6) self.x_label.grid(row=0, column=0, padx=5, pady=5) btn_set_x = tk.Button(master=x_frame, text="X", padx=5, pady=5, font=("Helvetica", 80), command=self._controller.hanlde_btn_x) btn_set_x.grid(row=0, column=1, padx=5, pady=5, sticky="nw") btn_set_x0 = tk.Button(master=x_frame, text="X_0", padx=5, pady=5, font=("Helvetica", 80), command=self._controller.handle_btn_x0_press) btn_set_x0.grid(row=0, column=2, padx=5, pady=5, sticky="nw") # Z frame z_frame = tk.Frame(w_frame, relief=tk.GROOVE, borderwidth=3) z_frame.grid(row=1, column=0, padx=5, pady=5)#, sticky="nw") - self.z_var = tk.StringVar(value=self._model.get_position_as_string('z')) + self.z_var = tk.StringVar(value=self._position_to_string(self._model.get_position('z'))) self.z_label = tk.Label(z_frame, textvariable=self.z_var, anchor="e", font=("Helvetica", 80), fg="green", bg="black", width=6) self.z_label.grid(row=0, column=0, padx=5, pady=5) btn_set_z = tk.Button(master=z_frame, text="Z", padx=5, pady=5, font=("Helvetica", 80), command=self._controller.hanlde_btn_z) btn_set_z.grid(row=0, column=1, padx=5, pady=5, sticky="nw") btn_set_z0 = tk.Button(master=z_frame, text="Z_0", padx=5, pady=5, font=("Helvetica", 80), command=self._controller.handle_btn_z0_press) btn_set_z0.grid(row=0, column=2, padx=5, pady=5, sticky="nw") # status frame status_frame = tk.Frame(w_frame, relief=tk.GROOVE, borderwidth=3) status_frame.grid(row=3, column=0, padx=5, pady=5, sticky='e') self.x_mode_var = tk.StringVar(value=str(self._mode_to_text())) self.x_mode_label = tk.Label(status_frame, textvariable=self.x_mode_var, font=("Helvetica", 80), fg="red") self.x_mode_label.grid(row=0, column=0, padx=5, pady=5, sticky="e") # button frame btn_frame = tk.Frame(e_frame, relief=tk.GROOVE, borderwidth=3) btn_frame.grid(row=0, column=0, padx=5, pady=5) btn_rd = tk.Button(master=btn_frame, text="r/D", padx=5, pady=5, font=("Helvetica", 80), width=4, command=self._controller.handle_btn_toggle_x_mode) btn_rd.grid(row=1, column=0, padx=5, pady=5, sticky="nw") btn_calc = tk.Button(master=btn_frame, text="Calc", padx=5, pady=5, font=("Helvetica", 80), width=4, command=self._controller.handle_btn_calc) btn_calc.grid(row=2, column=0, padx=5, pady=15, sticky="nw") #btn_enter = tk.Button(master=btn_frame, text="TBD", padx=5, pady=5, font=("Helvetica", 80), width=4)#, command=self._controller.horizontal_end) self.photo = tk.PhotoImage(file="power.png") btn_off = tk.Button(master=btn_frame, image=self.photo, command=self._controller.shutdown) btn_off.grid(row=3, column=0, padx=5, pady=15, sticky="nw") # bind keyboar inputs self.window.bind("", self._controller.toggle_fullscreen) self.window.bind("", self._controller.end_fullscreen) # schedule the GUI update loop on the main thread - self._last_x = self._model.get_position('x') - self._last_z = self._model.get_position('z') self._update_interval_ms = 100 # update at 10Hz by default - self.window.after(self._update_interval_ms, self._update_view) + self.window.after(self._update_interval_ms, self._poll_input) self.window.update() - #self.update() def _mode_to_text(self): if self._model.get_x_mode() == XMode.DIAMETER: return 'D' if self._model.get_x_mode() == XMode.RADIUS: return 'r' def _position_to_string(self, pos): return str("{:.2f}".format(pos)) def start(self): self.window.mainloop() def update_x(self): """ update x position in view (main thread) """ - self.x_dirty = False pos = self._model.get_position('x') - # avoid frequent updates when change is negligible - if abs(pos - self._last_x) >= 0.005: - self._last_x = pos - self.x_var.set(self._position_to_string(pos)) + if self._model.get_x_mode() == XMode.DIAMETER: + pos = pos * 2.0 + self.x_var.set(self._position_to_string(pos)) def update_z(self): """ update z position in view (main thread) """ - self.z_dirty = False pos = self._model.get_position('z') - if abs(pos - self._last_z) >= 0.005: - self._last_z = pos - self.z_var.set(self._position_to_string(pos)) + self.z_var.set(self._position_to_string(pos)) def update(self, updated = None): """ called by model when something has changed """ if updated == Updated.POS_X: - self.x_dirty = True + self.update_x() if updated == Updated.POS_Z: - self.z_dirty = True + self.update_z() if updated == Updated.X_MODE: # main thread will pick this up through the scheduled update self.x_mode_var.set(str(self._mode_to_text())) self.update_x() if updated == Updated.FULLSCREEN: self.window.attributes("-fullscreen", self._model.is_fullscreen()) - def _update_view(self): + def _poll_input(self): """ scheduled function (runs on tkinter main loop) to update the view.""" try: - if self.x_dirty: - self.update_x() - if self.z_dirty: - self.update_z() + # request data + self._controller.poll_i2c_data() + finally: # re-schedule - self.window.after(self._update_interval_ms, self._update_view) - -@atexit.register -def cleanup(): - GPIO.cleanup(X_CLOCK_PIN) - GPIO.cleanup(X_DIRECTION_PIN) - GPIO.cleanup(Z_CLOCK_PIN) - GPIO.cleanup(Z_DIRECTION_PIN) - print("done") - -def test(arg): - print(arg) - - -class TestEncoder: - """ represent a test encoder controlled by keyboard input """ - def __init__(self, view, event_cbk, inc_key = 'a', dec_key = 'b'): - - view.window.bind(inc_key, self._rotate_cw) - view.window.bind(dec_key, self._rotate_ccw) - - self._event_cbk = event_cbk - - def _rotate_cw(self, event=None): - self._event_cbk(encoder.Direction.CW) - - def _rotate_ccw(self, event=None): - self._event_cbk(encoder.Direction.CCW) + self.window.after(self._update_interval_ms, self._poll_input) def main(): _zoomed = False _test = False try: opts, args = getopt.getopt(sys.argv[1:], "", ["zoomed", "test"]) except getopt.GetoptError as err: # print help information and exit: print(err) # will print something like "option -a not recognized" sys.exit(2) for o, a in opts: - print(o) - print(a) if o == "--zoomed": _zoomed = True elif o == "--test": _test = True else: assert False, "unhandled option" model = Model(_zoomed) view = View(model, _test) model.set_scale('x', -2.5/200) #negative to flip direction model.set_scale('z', 90/1000) - GPIO.setwarnings(False) ## Turn off warnings - GPIO.setmode(GPIO.BOARD) ## Use BOARD pin numbering - GPIO.setup(X_DIRECTION_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP) - GPIO.setup(Z_DIRECTION_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP) - GPIO.setup(X_CLOCK_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP) - GPIO.setup(Z_CLOCK_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP) - if _test: - x_test_encoder = TestEncoder(view, view._controller.handle_x_position_update, 'a', 'z') - z_test_encoder = TestEncoder(view, view._controller.handle_z_position_update, 's', 'x') - - x_encoder = encoder.Encoder(X_CLOCK_PIN, X_DIRECTION_PIN, view._controller.handle_x_position_update) - z_encoder = encoder.Encoder(Z_CLOCK_PIN, Z_DIRECTION_PIN, view._controller.handle_z_position_update) - - GPIO.add_event_detect(X_CLOCK_PIN, GPIO.RISING) - GPIO.add_event_detect(Z_CLOCK_PIN, GPIO.RISING) - - GPIO.add_event_callback(X_CLOCK_PIN, x_encoder.update) - GPIO.add_event_callback(Z_CLOCK_PIN, z_encoder.update) + try: + x_test_encoder = smbus2.createTestEncoder() + view.window.bind('a', x_test_encoder._rotate_cw) + view.window.bind('z', x_test_encoder._rotate_ccw) + z_test_encoder = smbus2.createTestEncoder() + view.window.bind('s', z_test_encoder._rotate_cw) + view.window.bind('x', z_test_encoder._rotate_ccw) + except: + logger.error("couldn't create test encoders...") view.start() if __name__ == '__main__': main() ´´´ --- here is the diff between my version of test_model.py and main: ´´´diff diff --git a/test_model.py b/test_model.py index dae5311..91e21b5 100644 --- a/test_model.py +++ b/test_model.py @@ -1,95 +1,93 @@ import unittest import dro -import encoder class Observer: def __init__(self): self.reset() self.nbr_of_calls = 0 self.has_been_called = False def update(self, updated = None): self.nbr_of_calls += 1 self.has_been_called = True self.updated = updated def reset(self): self.nbr_of_calls = 0 self.has_been_called = False class TestModel(unittest.TestCase): #def check_pos(self, actual, expected): # self.assertTrue(int(self.model.get_position_as_string('x')) == 1) def setUp(self): self.model = dro.Model() def test_init(self): self.assertEqual(self.model.get_position('x'), 0) self.assertEqual(self.model.get_position('z'), 0) - def test_notify_observers(self): - o = Observer() - self.model.attatch(o) - - self.assertFalse(o.has_been_called) - self.model.update_position(encoder.Direction.CW, 'x') - self.assertTrue(o.has_been_called) - self.assertTrue(int(self.model.get_position('x')) == 1) - self.assertTrue(o.updated == dro.Updated.POS_X) - - o.reset() - - self.model.update_position(encoder.Direction.CW, 'z') - self.assertTrue(int(self.model.get_position('z')) == 1) - self.assertTrue(o.has_been_called) - self.assertTrue(int(self.model.get_position('z')) == 1) - self.assertTrue(o.updated == dro.Updated.POS_Z) - def test_wrong_axis(self): o = Observer() self.model.attatch(o) - self.model.set_position(1, 'apa') + self.model.set_position('apa', 1) self.assertFalse(o.has_been_called) def test_set_pos(self): o = Observer() self.model.attatch(o) - self.model.set_position(1, 'x') + self.model.set_position('x', 1) self.assertTrue(o.updated == dro.Updated.POS_X) - self.model.set_position(-11, 'z') + self.model.set_position('z', -11) self.assertTrue(o.updated == dro.Updated.POS_Z) self.assertEqual(o.nbr_of_calls, 2) self.assertEqual(int(self.model.get_position('x')), 1) self.assertEqual(int(self.model.get_position('z')), -11) def test_get_pos_as_string(self): - self.model.set_position(-11.49, 'x') + self.model.set_position('x', -11.49) self.assertEqual(self.model.get_position('x'), -11.49) def test_toogle_x_mode(self): o = Observer() self.model.attatch(o) - self.model.set_position(-11.49, 'x') + self.model.set_position('x', -11.49) self.model.set_toggle_x_mode() - self.assertEqual(self.model.get_position('x'), -22.98) + self.assertEqual(self.model.get_position('x'), -11.49) self.assertTrue(o.updated == dro.Updated.X_MODE) - def test_set_diam(self): - if self.model.get_x_mode() == dro.XMode.RADIUS: - self.model.set_toggle_x_mode() - self.assertEqual(self.model.get_x_mode(), dro.XMode.DIAMETER) - - self.model.set_position(10, 'x') - self.assertEqual(self.model.get_position('x'), 10) - self.model.set_toggle_x_mode() - self.assertEqual(self.model.get_position('x'), 5) + def test_set_position_no_notify_if_unchanged(self): + o = Observer() + self.model.attatch(o) + + self.model.set_position('x', 10) + self.assertEqual(o.nbr_of_calls, 1) + + self.model.set_position('x', 10) + self.assertEqual(o.nbr_of_calls, 1) # No change, no notify + + self.model.set_position('x', 20) + self.assertEqual(o.nbr_of_calls, 2) # Changed, should notify + + def test_set_position_no_notify_if_small_change(self): + o = Observer() + self.model.attatch(o) + + self.model.set_position('x', 10) + self.assertEqual(o.nbr_of_calls, 1) + + self.model.set_position('x', 10+0.003) + self.assertEqual(o.nbr_of_calls, 1) # No change, no notify + def test_steps_to_position(self): + self.model.set_scale('x', 2.5/200) # 200 steps per unit + pos = self.model.steps_to_position('x', 500) + self.assertEqual(pos, 6.25) # 200 * 1.5 = 300 steps if __name__ == '__main__': unittest.main() ´´´ --- here is the diff between my version of todo.txt and main: ´´´diff diff --git a/todo.txt b/todo.txt index cf193f0..fd78b5e 100644 --- a/todo.txt +++ b/todo.txt @@ -1,46 +1,50 @@ -askfloat funkar inte med alltid?????? -tester för r/D och ser/read position +askfloat funkar inte med alltid?????? +KLAR i2c: + handle_btn_x0_press måste sätta en offset i axis som gör att offset+position blir 0 + self._model.set_offset('x', -get_position) + motsvarande för hanlde_btn_x + KLART desktop-filen pekar på ett shell-script som startar dro.py dro.desktop autostartar inte??? men går att starta via file-explorer????? KLART stänga av med shutdown utan lösen: sudo visudo user_name ALL=(ALL) NOPASSWD: /sbin/poweroff, /sbin/reboot, /sbin/shutdown KLART stänga av med fysisk knapp: "adding this to /boot/config.txt: dtoverlay=gpio-shutdown KLART avstängningsknapp knapp som kortsluter pin 5 (GPIO3, grön) och jord och "adding this to /boot/config.txt: dtoverlay=gpio-shutdown " och https://askubuntu.com/questions/168879/shutdown-from-terminal-without-entering-password SW knapp till funktion/command: subprocess.call('sudo shutdown -h now', shell=True) utöka test.py *funkar det med olika ISR? eller måste det vara en central? VERKAR FUNKA lägg till z också... GJODE ETT TEST MED z_clock_pin på gpio13 FUNKAR... HW funkar inte testa enligt c:\Users\johan\Documents\verkstad\emco compact 5\bob\my-bob.drawio // 1. 4.7kOhm motstånd pull up, inte pull up i HW... // verkar bli för liten spänning eller 2. inget motstånd, pull up i HW... FUNKAR... det blir 3.22V HIGH på GPIO ´´´