The answer is probably something simple, but I am struggling to get the correct 4-20mA readings.
I am using the following:
- 8 Channel I2C 4-20mA Current Receiver, # PR33-16
- I2C Shield for Raspberry Pi 4, # PR2-3
- Raspberry Pi Model 4B
In my current setup I only have sensors wired into the first 4 channels of the PR33-16.
I have both MCP3428’s at different I2C addresses. (0x68 & 0x69) I can pick them up just fine.
My program is set up to use One-Shot conversion mode, 16 bit resolution, Gain x2 and I am using the ADC values listed on the product page for 4mA (5813) and 20mA (29390).
I cannot for the life of me get the application to output the correct 4-20mA signal.
For example:
These are the real analog signals going into the board:
Ch1: 13.2 mA
Ch2: 3.9 mA
Ch3: 7.6 mA
Ch4: 4.6 mA
And here is what the application is outputting:
Ch1: 7.89 mA (11551 raw ADC)
Ch2: 15.14 mA (22235 raw ADC)
Ch3: -14.4 mA (-21339 raw ADC)
Ch4: 17.67 mA (25958 raw ADC)
For these readings, the program is actually using Gain x1, 2906 (4mA) and 14695 (20mA). When I use Gain x2 and the ADC values from the product page, the readings I get are way more messed up.
Here’s the code I am using. If anyone can tell me what I’ve done wrong, I would be extremely grateful. I’ve been working on it so there may be some stuff commented out in it.
import time
import sys
import os
import logging
from smbus2 import SMBus
from utils import retry_with_backoff
# Configure logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
if not logger.handlers:
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s %(levelname)s:%(name)s:%(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
# ADC constants
MCP3428_ADDR_1 = 0x69
MCP3428_ADDR_2 = 0x68
CONFIG_BASE = 0b10001100 # Start | Channel bits (set per channel) | One-shot | 16-bit | Gain=1
# CONFIG_BASE = 0b10001101 # Gain = 2 (bits 01), One-shot, 16-bit
CONVERSION_TIME = 0.25 # 250ms to ensure conversion completes
# Calibration points for PGA Gain Setting 2
# RAW_4MA = 5813
# RAW_20MA = 29390
# Calibration points for PGA Gain setting 1
RAW_4MA = 2906 # For gain=1 (roughly half)
RAW_20MA = 14695 # For gain=1 (roughly half)
MAX_SENSOR_READ_RETRIES = int(os.getenv('MAX_SENSOR_READ_RETRIES', '2'))
SENSOR_READ_INITIAL_DELAY = float(os.getenv('SENSOR_READ_INITIAL_DELAY', '0.1'))
SENSOR_READ_BACKOFF_FACTOR = float(os.getenv('SENSOR_READ_BACKOFF_FACTOR', '1.5'))
MAX_CONSECUTIVE_FAILURES = int(os.getenv('MAX_CONSECUTIVE_FAILURES', '3'))
SKIP_CYCLES_AFTER_FAILURE = int(os.getenv('SKIP_CYCLES_AFTER_FAILURE', '5'))
channel_failure_counters = {}
channel_skip_counters = {}
RETRYABLE_I2C_EXCEPTIONS = (IOError, OSError)
def create_channel_config(channel: int) -> int:
"""Generate the configuration byte for the selected MCP3428 channel."""
return CONFIG_BASE | ((channel & 0x03) << 5)
@retry_with_backoff(
max_retries=MAX_SENSOR_READ_RETRIES,
initial_delay=SENSOR_READ_INITIAL_DELAY,
backoff_factor=SENSOR_READ_BACKOFF_FACTOR,
exceptions_to_retry=RETRYABLE_I2C_EXCEPTIONS
)
def read_channel(bus: SMBus, addr: int, channel: int) -> int:
"""Read a single channel from an MCP3428 ADC."""
config = create_channel_config(channel)
bus.write_byte(addr, config)
time.sleep(0.005) # Short delay to allow conversion start
for attempt in range(10):
time.sleep(CONVERSION_TIME)
data = bus.read_i2c_block_data(addr, 0x00, 3)
msb, lsb, config_read = data
ready = config_read & 0x80
channel_bits = config_read & 0x60
expected_bits = (channel << 5) & 0x60
logger.debug(
f"Attempt {attempt}: Wrote 0x{config:02X}, got MSB:0x{msb:02X} LSB:0x{lsb:02X} Config:0x{config_read:02X}"
)
if ready:
continue # Conversion not ready yet
if channel_bits != expected_bits:
logger.debug(f"One-shot mode: channel bits {channel_bits >> 5:02b} may not match expected {expected_bits >> 5:02b}")
raw_value = (msb << 8) | lsb
if raw_value & 0x8000:
raw_value -= 0x10000
return raw_value
raise TimeoutError(f"ADC @ 0x{addr:02X} channel {channel} timed out")
def convert_to_milliamps(raw_value: int) -> float:
"""Convert raw ADC value to milliamps."""
# MCP3428 with 16-bit & Gain=2: LSB = 62.5μV (0.0000625V)
voltage = raw_value * 0.0000625 # Volts
slope = (20.0 - 4.0) / (RAW_20MA - RAW_4MA)
result = 4.0 + slope * (raw_value - RAW_4MA)
logger.debug(f"Raw: {raw_value}, Voltage: {voltage:.5f} V, Current: {result:.2f} mA")
if result < 3.5 or result > 20.5:
logger.warning(f"Reading value {result:.2f} mA is outside normal 4-20mA range")
return result
# def convert_to_milliamps(raw_value: int) -> float:
# """Convert raw ADC value to milliamps."""
# slope = (20.0 - 4.0) / (RAW_20MA - RAW_4MA)
# result = 4.0 + slope * (raw_value - RAW_4MA)
# if result < 3.5 or result > 20.5:
# logger.warning(f"Reading value {result:.2f} mA is outside normal 4-20mA range")
# return result
def read_all_channels(bus: SMBus = None) -> dict:
"""Read all 8 channels (4 per MCP3428)."""
bus_created = False
if bus is None:
bus = SMBus(1)
bus_created = True
readings = {}
try:
for chip_addr, offset in [(MCP3428_ADDR_1, 0), (MCP3428_ADDR_2, 4)]:
for ch in range(4):
channel_id = f"channel_{offset + ch}"
if channel_id in channel_skip_counters and channel_skip_counters[channel_id] > 0:
channel_skip_counters[channel_id] -= 1
logger.warning(f"Skipping problematic {channel_id} ({channel_skip_counters[channel_id]} cycles remaining)")
readings[channel_id] = None
continue
try:
raw = read_channel(bus, chip_addr, ch)
value = convert_to_milliamps(raw)
readings[channel_id] = {
"milliamps": value,
"raw": raw
}
logger.info(f"ADC {channel_id}: {value:.2f} mA (raw: {raw})")
channel_failure_counters[channel_id] = 0
except Exception as e:
logger.error(f"Failed to read {channel_id}: {e}")
readings[channel_id] = None
channel_failure_counters[channel_id] = channel_failure_counters.get(channel_id, 0) + 1
if channel_failure_counters[channel_id] >= MAX_CONSECUTIVE_FAILURES:
logger.error(f"{channel_id} failed {MAX_CONSECUTIVE_FAILURES} times. Skipping for {SKIP_CYCLES_AFTER_FAILURE} cycles")
channel_skip_counters[channel_id] = SKIP_CYCLES_AFTER_FAILURE
finally:
if bus_created:
bus.close()
return readings
def main():
"""Run this module standalone to test continuous readings."""
try:
with SMBus(1) as bus:
logger.info("Starting MCP3428 read loop")
while True:
readings = read_all_channels(bus)
logger.info(f"All channel readings: {readings}")
time.sleep(2)
except KeyboardInterrupt:
logger.info("Stopping MCP3428 reader")
except Exception as e:
logger.error(f"Fatal error: {e}")
raise
if __name__ == "__main__":
main()