General MCP3428 Python Implementation

The answer is probably something simple, but I am struggling to get the correct 4-20mA readings.

I am using the following:

  • 8 Channel I2C 4-20mA Current Receiver, # PR33-16
  • I2C Shield for Raspberry Pi 4, # PR2-3
  • Raspberry Pi Model 4B

In my current setup I only have sensors wired into the first 4 channels of the PR33-16.

I have both MCP3428’s at different I2C addresses. (0x68 & 0x69) I can pick them up just fine.

My program is set up to use One-Shot conversion mode, 16 bit resolution, Gain x2 and I am using the ADC values listed on the product page for 4mA (5813) and 20mA (29390).

I cannot for the life of me get the application to output the correct 4-20mA signal.

For example:

These are the real analog signals going into the board:
Ch1: 13.2 mA
Ch2: 3.9 mA
Ch3: 7.6 mA
Ch4: 4.6 mA

And here is what the application is outputting:
Ch1: 7.89 mA (11551 raw ADC)
Ch2: 15.14 mA (22235 raw ADC)
Ch3: -14.4 mA (-21339 raw ADC)
Ch4: 17.67 mA (25958 raw ADC)

For these readings, the program is actually using Gain x1, 2906 (4mA) and 14695 (20mA). When I use Gain x2 and the ADC values from the product page, the readings I get are way more messed up.

Here’s the code I am using. If anyone can tell me what I’ve done wrong, I would be extremely grateful. I’ve been working on it so there may be some stuff commented out in it.

import time
import sys
import os
import logging
from smbus2 import SMBus
from utils import retry_with_backoff

# Configure logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

if not logger.handlers:
    handler = logging.StreamHandler(sys.stdout)
    handler.setLevel(logging.DEBUG)
    formatter = logging.Formatter('%(asctime)s %(levelname)s:%(name)s:%(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)

# ADC constants
MCP3428_ADDR_1 = 0x69
MCP3428_ADDR_2 = 0x68
CONFIG_BASE = 0b10001100  # Start | Channel bits (set per channel) | One-shot | 16-bit | Gain=1
# CONFIG_BASE = 0b10001101  # Gain = 2 (bits 01), One-shot, 16-bit

CONVERSION_TIME = 0.25  # 250ms to ensure conversion completes

# Calibration points for PGA Gain Setting 2
# RAW_4MA = 5813
# RAW_20MA = 29390

# Calibration points for PGA Gain setting 1
RAW_4MA = 2906    # For gain=1 (roughly half)
RAW_20MA = 14695  # For gain=1 (roughly half)

MAX_SENSOR_READ_RETRIES = int(os.getenv('MAX_SENSOR_READ_RETRIES', '2'))
SENSOR_READ_INITIAL_DELAY = float(os.getenv('SENSOR_READ_INITIAL_DELAY', '0.1'))
SENSOR_READ_BACKOFF_FACTOR = float(os.getenv('SENSOR_READ_BACKOFF_FACTOR', '1.5'))

MAX_CONSECUTIVE_FAILURES = int(os.getenv('MAX_CONSECUTIVE_FAILURES', '3'))
SKIP_CYCLES_AFTER_FAILURE = int(os.getenv('SKIP_CYCLES_AFTER_FAILURE', '5'))

channel_failure_counters = {}
channel_skip_counters = {}

RETRYABLE_I2C_EXCEPTIONS = (IOError, OSError)

def create_channel_config(channel: int) -> int:
    """Generate the configuration byte for the selected MCP3428 channel."""
    return CONFIG_BASE | ((channel & 0x03) << 5)

@retry_with_backoff(
    max_retries=MAX_SENSOR_READ_RETRIES,
    initial_delay=SENSOR_READ_INITIAL_DELAY,
    backoff_factor=SENSOR_READ_BACKOFF_FACTOR,
    exceptions_to_retry=RETRYABLE_I2C_EXCEPTIONS
)
def read_channel(bus: SMBus, addr: int, channel: int) -> int:
    """Read a single channel from an MCP3428 ADC."""
    config = create_channel_config(channel)
    bus.write_byte(addr, config)
    time.sleep(0.005)  # Short delay to allow conversion start

    for attempt in range(10):
        time.sleep(CONVERSION_TIME)
        data = bus.read_i2c_block_data(addr, 0x00, 3)
        msb, lsb, config_read = data

        ready = config_read & 0x80
        channel_bits = config_read & 0x60
        expected_bits = (channel << 5) & 0x60

        logger.debug(
            f"Attempt {attempt}: Wrote 0x{config:02X}, got MSB:0x{msb:02X} LSB:0x{lsb:02X} Config:0x{config_read:02X}"
        )

        if ready:
            continue  # Conversion not ready yet

        if channel_bits != expected_bits:
            logger.debug(f"One-shot mode: channel bits {channel_bits >> 5:02b} may not match expected {expected_bits >> 5:02b}")

        raw_value = (msb << 8) | lsb
        if raw_value & 0x8000:
            raw_value -= 0x10000

        return raw_value

    raise TimeoutError(f"ADC @ 0x{addr:02X} channel {channel} timed out")

def convert_to_milliamps(raw_value: int) -> float:
    """Convert raw ADC value to milliamps."""
    # MCP3428 with 16-bit & Gain=2: LSB = 62.5μV (0.0000625V)
    voltage = raw_value * 0.0000625  # Volts

    slope = (20.0 - 4.0) / (RAW_20MA - RAW_4MA)
    result = 4.0 + slope * (raw_value - RAW_4MA)

    logger.debug(f"Raw: {raw_value}, Voltage: {voltage:.5f} V, Current: {result:.2f} mA")

    if result < 3.5 or result > 20.5:
        logger.warning(f"Reading value {result:.2f} mA is outside normal 4-20mA range")

    return result


# def convert_to_milliamps(raw_value: int) -> float:
#     """Convert raw ADC value to milliamps."""
#     slope = (20.0 - 4.0) / (RAW_20MA - RAW_4MA)
#     result = 4.0 + slope * (raw_value - RAW_4MA)

#     if result < 3.5 or result > 20.5:
#         logger.warning(f"Reading value {result:.2f} mA is outside normal 4-20mA range")

#     return result

def read_all_channels(bus: SMBus = None) -> dict:
    """Read all 8 channels (4 per MCP3428)."""
    bus_created = False
    if bus is None:
        bus = SMBus(1)
        bus_created = True

    readings = {}

    try:
        for chip_addr, offset in [(MCP3428_ADDR_1, 0), (MCP3428_ADDR_2, 4)]:
            for ch in range(4):
                channel_id = f"channel_{offset + ch}"

                if channel_id in channel_skip_counters and channel_skip_counters[channel_id] > 0:
                    channel_skip_counters[channel_id] -= 1
                    logger.warning(f"Skipping problematic {channel_id} ({channel_skip_counters[channel_id]} cycles remaining)")
                    readings[channel_id] = None
                    continue

                try:
                    raw = read_channel(bus, chip_addr, ch)
                    value = convert_to_milliamps(raw)
                    readings[channel_id] = {
                        "milliamps": value,
                        "raw": raw
                    }
                    logger.info(f"ADC {channel_id}: {value:.2f} mA (raw: {raw})")
                    channel_failure_counters[channel_id] = 0
                except Exception as e:
                    logger.error(f"Failed to read {channel_id}: {e}")
                    readings[channel_id] = None
                    channel_failure_counters[channel_id] = channel_failure_counters.get(channel_id, 0) + 1
                    if channel_failure_counters[channel_id] >= MAX_CONSECUTIVE_FAILURES:
                        logger.error(f"{channel_id} failed {MAX_CONSECUTIVE_FAILURES} times. Skipping for {SKIP_CYCLES_AFTER_FAILURE} cycles")
                        channel_skip_counters[channel_id] = SKIP_CYCLES_AFTER_FAILURE

    finally:
        if bus_created:
            bus.close()

    return readings

def main():
    """Run this module standalone to test continuous readings."""
    try:
        with SMBus(1) as bus:
            logger.info("Starting MCP3428 read loop")
            while True:
                readings = read_all_channels(bus)
                logger.info(f"All channel readings: {readings}")
                time.sleep(2)
    except KeyboardInterrupt:
        logger.info("Stopping MCP3428 reader")
    except Exception as e:
        logger.error(f"Fatal error: {e}")
        raise

if __name__ == "__main__":
    main()