422 lines
11 KiB
Python
422 lines
11 KiB
Python
# type: ignore
|
|
|
|
# MIT License
|
|
#
|
|
# Copyright (c) 2022 Daniel Robertson
|
|
#
|
|
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
# of this software and associated documentation files (the "Software"), to deal
|
|
# in the Software without restriction, including without limitation the rights
|
|
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
# copies of the Software, and to permit persons to whom the Software is
|
|
# furnished to do so, subject to the following conditions:
|
|
#
|
|
# The above copyright notice and this permission notice shall be included in all
|
|
# copies or substantial portions of the Software.
|
|
#
|
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
# SOFTWARE.
|
|
|
|
import _thread
|
|
import time
|
|
from machine import Pin
|
|
from micropython import const
|
|
from rp2 import PIO, StateMachine, asm_pio
|
|
|
|
class hx711:
|
|
|
|
class _util:
|
|
|
|
@classmethod
|
|
def get_sm_from_pio(cls, pio: PIO, sm_index: int) -> StateMachine:
|
|
"""Returns the StateMachine object from the given index
|
|
|
|
Args:
|
|
pio (PIO): RP2040 PIO instance
|
|
sm_index (int):
|
|
|
|
Returns:
|
|
StateMachine:
|
|
"""
|
|
return pio.state_machine(sm_index)
|
|
|
|
@classmethod
|
|
def get_sm_index(cls, pio_offset: int, sm_offset: int) -> int:
|
|
"""Returns the global state machine index from given args
|
|
|
|
Args:
|
|
pio_offset (int): 0 or 1
|
|
sm_offset (int):
|
|
|
|
Returns:
|
|
int: index between 0 and 7
|
|
"""
|
|
return (pio_offset >> 2) + sm_offset
|
|
|
|
@classmethod
|
|
def get_pio_from_sm_index(cls, sm_index: int) -> PIO:
|
|
"""Returns the correct PIO object from the global state machine index
|
|
|
|
Args:
|
|
sm_index (int):
|
|
|
|
Returns:
|
|
PIO:
|
|
"""
|
|
return PIO(sm_index >> 2)
|
|
|
|
@classmethod
|
|
def sm_drain_tx_fifo(cls, sm: StateMachine) -> None:
|
|
"""Clears the StateMachine TX FIFO
|
|
|
|
Args:
|
|
sm (StateMachine):
|
|
|
|
Performs:
|
|
pull( ) noblock
|
|
https://github.com/raspberrypi/pico-sdk/blob/master/src/rp2_common/hardware_pio/pio.c#L252
|
|
This may not be thread safe
|
|
"""
|
|
while sm.tx_fifo() != 0: sm.exec("pull() noblock")
|
|
|
|
@classmethod
|
|
def sm_get(cls, sm: StateMachine) -> int|None:
|
|
"""Returns a value from the StateMachine's RX FIFO (NON-BLOCKING)
|
|
|
|
Args:
|
|
sm (StateMachine):
|
|
|
|
Returns:
|
|
int|None: None is returned if RX FIFO is empty
|
|
"""
|
|
return sm.get() if sm.rx_fifo() != 0 else None
|
|
|
|
@classmethod
|
|
def sm_get_blocking(cls, sm: StateMachine) -> int:
|
|
"""Returns a value from the StateMachine's RX FIFO (BLOCKING)
|
|
|
|
Args:
|
|
sm (StateMachine):
|
|
|
|
Returns:
|
|
int:
|
|
"""
|
|
while sm.rx_fifo() == 0: pass
|
|
return sm.get()
|
|
|
|
class rate:
|
|
rate_10: int = const(0)
|
|
rate_80: int = const(1)
|
|
|
|
class gain:
|
|
gain_128: int = const(25)
|
|
gain_32: int = const(26)
|
|
gain_64: int = const(27)
|
|
|
|
class power:
|
|
pwr_up: int = const(0)
|
|
pwr_down: int = const(1)
|
|
|
|
class _pio_prog:
|
|
def __init__(self) -> None:
|
|
pass
|
|
def init(self, hx) -> None:
|
|
pass
|
|
def program(self) -> None:
|
|
pass
|
|
|
|
class pio_noblock(_pio_prog):
|
|
|
|
# see: https://github.com/endail/hx711-pico-c/blob/main/src/hx711_noblock.pio
|
|
PUSH_BITS: int = const(24)
|
|
FREQUENCY: int = const(10000000)
|
|
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
|
|
def init(self, hx: hx711) -> None:
|
|
hx._sm = StateMachine(
|
|
hx._sm_index,
|
|
self.program,
|
|
freq=self.FREQUENCY,
|
|
in_base=hx.data_pin,
|
|
out_base=hx.clock_pin,
|
|
set_base=hx.clock_pin,
|
|
jmp_pin=None,
|
|
sideset_base=hx.clock_pin
|
|
)
|
|
|
|
# pylint: disable=E,W,C,R
|
|
@asm_pio(
|
|
out_init=(PIO.OUT_LOW),
|
|
set_init=(PIO.OUT_LOW),
|
|
sideset_init=(PIO.OUT_LOW),
|
|
out_shiftdir=PIO.SHIFT_LEFT,
|
|
autopush=True,
|
|
autopull=False,
|
|
push_thresh=PUSH_BITS,
|
|
fifo_join=PIO.JOIN_NONE
|
|
)
|
|
def program():
|
|
|
|
set(x, 0) # default gain of 0
|
|
|
|
label("wrap_target")
|
|
wrap_target()
|
|
|
|
set(y, 23) # read bits, 0 based
|
|
|
|
wait(0, pin, 0)
|
|
|
|
label("bitloop")
|
|
set(pins, 1)
|
|
in_(pins, 1)
|
|
|
|
jmp(y_dec, "bitloop").side(0).delay(2 - 1) # T4
|
|
|
|
pull(noblock).side(1)
|
|
|
|
out(x, 2)
|
|
|
|
jmp(not_x, "wrap_target").side(0)
|
|
|
|
mov(y, x)
|
|
|
|
label("gainloop")
|
|
set(pins, 1).delay(2 - 1) # T3
|
|
jmp(y_dec, "gainloop").side(0).delay(2 - 1) # T4
|
|
|
|
wrap()
|
|
|
|
READ_BITS: int = const(24)
|
|
MIN_VALUE: int = const(-0x800000)
|
|
MAX_VALUE: int = const(0x7fffff)
|
|
POWER_DOWN_TIMEOUT: int = const(60) # us
|
|
SETTLING_TIMES: list[int] = [ # ms
|
|
const(400),
|
|
const(50)
|
|
]
|
|
SAMPLES_RATES: list[int] = [
|
|
const(10),
|
|
const(80)
|
|
]
|
|
|
|
def __init__(
|
|
self,
|
|
clk: Pin,
|
|
dat: Pin,
|
|
sm_index: int = 0,
|
|
prog: _pio_prog = pio_noblock()
|
|
):
|
|
"""Create HX711 object
|
|
|
|
Args:
|
|
clk (Pin): GPIO pin connected to HX711's clock pin
|
|
dat (Pin): GPIO pin connected to HX711's data pin
|
|
sm_index (int, optional): Global state machine index to use. Defaults to 0.
|
|
prog (_pio_prog, optional): PIO program. Defaults to built-in pio_noblock().
|
|
"""
|
|
|
|
self._mut = _thread.allocate_lock()
|
|
self._mut.acquire()
|
|
|
|
self.clock_pin: Pin = clk
|
|
self.data_pin: Pin = dat
|
|
self.clock_pin.init(mode=Pin.OUT)
|
|
self.data_pin.init(mode=Pin.IN)
|
|
|
|
self._sm: StateMachine
|
|
self._sm_index: int = sm_index
|
|
self._prog: __class__._pio_prog = prog
|
|
|
|
prog.init(self)
|
|
|
|
self._mut.release()
|
|
|
|
def __bool__(self) -> bool:
|
|
return self._sm.active()
|
|
|
|
def __repr__(self) -> str:
|
|
return "[HX711 - CLK: {}, DAT: {}, SM_IDX: {}]".format(self.clock_pin, self.data_pin, self._sm_index)
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, ex_type, ex_val, ex_tb) -> None:
|
|
# handle abrupt exits from locked contexts
|
|
if self._mut.locked(): self._mut.release()
|
|
self.close()
|
|
|
|
def close(self) -> None:
|
|
"""Stop communication with HX711. Does not alter power state.
|
|
"""
|
|
self._mut.acquire()
|
|
self._sm.active(0)
|
|
__class__._util.get_pio_from_sm_index(self._sm_index).remove_program(self._prog.program)
|
|
self._mut.release()
|
|
|
|
def set_gain(self, gain: int) -> None:
|
|
"""Change HX711 gain
|
|
|
|
Args:
|
|
gain (int):
|
|
"""
|
|
self._mut.acquire()
|
|
__class__._util.sm_drain_tx_fifo(self._sm)
|
|
self._sm.put(gain)
|
|
self._sm.get()
|
|
__class__._util.sm_get_blocking(self._sm)
|
|
self._mut.release()
|
|
|
|
@classmethod
|
|
def get_twos_comp(cls, raw: int) -> int:
|
|
"""Returns the one's complement value from the raw HX711 value
|
|
|
|
Args:
|
|
raw (int): raw value from HX711
|
|
|
|
Returns:
|
|
int:
|
|
"""
|
|
return -(raw & +cls.MIN_VALUE) + (raw & cls.MAX_VALUE)
|
|
|
|
@classmethod
|
|
def is_min_saturated(cls, val: int) -> bool:
|
|
"""Whether value is at its maximum
|
|
|
|
Args:
|
|
val (int):
|
|
|
|
Returns:
|
|
bool:
|
|
"""
|
|
return val == cls.MIN_VALUE
|
|
|
|
@classmethod
|
|
def is_max_saturated(cls, val: int) -> bool:
|
|
"""Whether value is at its maximum
|
|
|
|
Args:
|
|
val (int):
|
|
|
|
Returns:
|
|
bool:
|
|
"""
|
|
return val == cls.MAX_VALUE
|
|
|
|
@classmethod
|
|
def get_settling_time(cls, rate: int) -> int:
|
|
"""Returns the appropriate settling time for the given rate
|
|
|
|
Args:
|
|
rate (int):
|
|
|
|
Returns:
|
|
int: milliseconds
|
|
"""
|
|
return cls.SETTLING_TIMES[rate]
|
|
|
|
@classmethod
|
|
def get_rate_sps(cls, rate: int) -> int:
|
|
"""Returns the numeric value of the given rate
|
|
|
|
Args:
|
|
rate (int):
|
|
|
|
Returns:
|
|
int:
|
|
"""
|
|
return cls.SAMPLES_RATES[rate]
|
|
|
|
def get_value(self) -> int:
|
|
"""Blocks until a value is returned
|
|
|
|
Returns:
|
|
int:
|
|
"""
|
|
self._mut.acquire()
|
|
rawVal = __class__._util.sm_get_blocking(self._sm)
|
|
self._mut.release()
|
|
return self.get_twos_comp(rawVal)
|
|
|
|
def get_value_timeout(self, timeout: int = 1000000) -> int|None:
|
|
"""Attempts to obtain a value within the timeout
|
|
|
|
Args:
|
|
timeout (int, optional): timeout in microseconds. Defaults to 1000000.
|
|
|
|
Returns:
|
|
int|None: None is returned if no value is obtained within the timeout period
|
|
"""
|
|
|
|
endTime: int = time.ticks_us() + timeout
|
|
val: int|None = None
|
|
|
|
self._mut.acquire()
|
|
|
|
while(time.ticks_us() < endTime):
|
|
val = self._try_get_value()
|
|
if val != None: break
|
|
|
|
self._mut.release()
|
|
|
|
return self.get_twos_comp(val) if val else None
|
|
|
|
def get_value_noblock(self) -> int|None:
|
|
"""Returns a value if one is available
|
|
|
|
Returns:
|
|
int|None: None is returned if no value is available
|
|
"""
|
|
self._mut.acquire()
|
|
val = self._try_get_value()
|
|
self._mut.release()
|
|
return self.get_twos_comp(val) if val else None
|
|
|
|
def set_power(self, pwr: int) -> None:
|
|
"""Changes the power state of the HX711 and starts/stops the PIO program
|
|
|
|
Args:
|
|
pwr (int):
|
|
"""
|
|
|
|
self._mut.acquire()
|
|
|
|
if pwr == __class__.power.pwr_up:
|
|
self.clock_pin.low()
|
|
self._sm.restart()
|
|
self._sm.active(1)
|
|
elif pwr == __class__.power.pwr_down:
|
|
self._sm.active(0)
|
|
self.clock_pin.high()
|
|
|
|
self._mut.release()
|
|
|
|
@classmethod
|
|
def wait_settle(cls, rate: int) -> None:
|
|
"""Waits for the appropriate amount of time for values to settle according to the given rate
|
|
|
|
Args:
|
|
rate (int):
|
|
"""
|
|
time.sleep_ms(cls.get_settling_time(rate))
|
|
|
|
@classmethod
|
|
def wait_power_down(cls) -> None:
|
|
"""Waits for the appropriate amount of time for the HX711 to power down
|
|
"""
|
|
time.sleep_us(cls.POWER_DOWN_TIMEOUT)
|
|
|
|
def _try_get_value(self) -> int|None:
|
|
"""Attempts to obtain a value if one is available
|
|
|
|
Returns:
|
|
int|None: None is returned if no value is available
|
|
"""
|
|
words = __class__.READ_BITS / 8
|
|
return self._sm.get() if self._sm.rx_fifo() >= words else None
|