Original Post: 06/01/2024
Update: I updated this post as of 09/02/24 to reflect the changes made to the CO2 sensor, namely using 5V for stability, an OLED screen for display, switches for wifi and calibration, and finally an obnoxious buzzer to let me know I should open my windows/door(>1500ppm).
Disclaimer: Please note that the information presented in this article is for informational purposes only. It is not intended to serve as health advice, engineering advice, or any form of professional guidance. Readers should consult with qualified professionals for specific health or engineering concerns and should not rely solely on the content of this article for making decisions. The authors and publishers of this article are not responsible for any actions taken based on the information provided herein.
In this post I intend to cover the basics of SCD41 sensors, their technical performance, some MicroPython code to use with an ESP32 microcontroller(yes I know there are other ones out there), and some of the limitations/precautions one should take with this particular sensor.
What is an SCD41 Sensor?
An SCD41 sensor is Sensirion’s miniature CO2 sensor. It uses a photoacoustic NDIR sensing principle along with Sensirion’s patented technology to offer high accuracy at a competitive price. I personally picked up this sensor for $28.99, and it could likely be found at a lower price directly from the manufacturer or via platforms like AliExpress. The sensor also includes on-chip signal compensation with a built-in SHT4x humidity and temperature sensor to account for fluctuations in environmental conditions, ensuring accurate CO2 readings.
Technical Performance
Generalized CO2 Accuracy Specification:
400-1,000 ppm: ±(50 ppm + 2.5% of reading)
1,001-2,000 ppm: ±(50 ppm + 3% of reading)
2,001-5,000 ppm: ±(40 ppm + 5% of reading)
Temperature and Humidity Specifications:
Temperature Accuracy: -10°C to 60°C range with an accuracy of ±0.8°C
Humidity Accuracy: 0-95% RH range with an accuracy of ±6% RH
More Detailed Specifications…
SCD41 Sensor Series Specifications
|
|
|
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
The response time is roughly every 2 minutes for temperature and every 90 seconds for humidity. Using the longest response time as a basis for our sampling, we get a sampling rate of once every 2 minutes. The sensor’s extensive command set, clearly detailed in the datasheet, simplifies programming but requires familiarity with numerous commands.
Before diving into the fundamentals let’s visit why we even care about CO2 to begin with.
CO2 Levels in an Enclosed Space
CO2 levels increase in an enclosed room predictably with time as it is an enclosed system with n number of producers(people). We can actually do a mass balance around the system if we really wanted to to evaluate the amount of CO2 if we had values per person. A mass balance for CO2 in an enclosed room involves calculating how the amount of CO2 changes over time. Since the room is sealed, we start with an initial amount of CO2 and add the CO2 produced by people inside the room. By knowing how much CO2 each person produces per minute, we can estimate the total CO2 in the room after a certain period. This helps us predict how CO2 levels will rise as people continue to breathe, ensuring the air quality is monitored and maintained.
As CO2 levels increase a number of health complaints can arise and these health complaints are CO2 concentration dependent. The Wisconsin Department of Health Services outline a ppm to health complaint table:
400 ppm: average outdoor air level.
400–1,000 ppm: typical level found in occupied spaces with good air exchange.
1,000–2,000 ppm: level associated with complaints of drowsiness and poor air.
2,000–5,000 ppm: level associated with headaches, sleepiness, and stagnant, stale, stuffy air. Poor concentration, loss of attention, increased heart rate and slight nausea may also be present.
5,000 ppm: this indicates unusual air conditions where high levels of other gases could also be present. Toxicity or oxygen deprivation could occur. This is the permissible exposure limit for daily workplace exposures.
40,000 ppm: this level is immediately harmful due to oxygen deprivation.
We also see plenty of examples of CO2 levels reaching dangerous levels in enclosed spaces and I am including images along with associated posts:
From https://vair-monitor.com/:
From https://cambridgecarbonfootprint.org/:
Now if we move to a more engineering based analysis we go to https://www.engineeringtoolbox.com/pollution-concentration-rooms-d_692.html and find that the carbon dioxide concentration in a room is a function of:
CO2 Concentration Calculation
The carbon dioxide concentration in a room filled with persons after a time \( t \) can be calculated as:
c = (q / (n V)) [1 - (1 / en t)] + (c0 - ci) (1 / en t) + ci
where:
- c = carbon dioxide concentration in the room (m3/m3)
- q = carbon dioxide supplied to the room (m3/h)
- V = volume of the room (m3)
- e = the constant 2.718...
- n = number of air shifts per hour (1/h)
- t = time (hour, h)
- ci = carbon dioxide concentration in the inlet ventilation air (m3/m3)
- c0 = carbon dioxide concentration in the room at start, \( t=0 \) (m3/m3)
Calculator(the same formula as Engineering Toolbox): Defaults: 2 people, 10ft by 10ft room with 10ft feet ceilings, 1 air change per hour, and the makeup air is the standard 400ppm CO2. This is of course assuming absolutely no gaps and you’re truly enclosed. Worst case scenario and as this is not cited anywhere take with a grain of salt!
CO2 Concentration Calculator
CO2 concentration: 0.00657 m3/m3 (6574 ppm)
We also have a similar calculator here: https://www.soletairpower.fi/co2-calculator/
For a more scientifically based study one can navigate to the following: https://www.ncbi.nlm.nih.gov/pmc/articles/PMC7411428/
The study gives a CO2 based occupancy estimation by correlating the levels of CO2 with the number of occupants in a room with the ultimate goal of reducing energy use by dynamically adjusting HVAC systems in real time. This means you don’t ventilate rooms that are unoccupied or ventilate less if they are under-occupied. Its probably possible to tease out a solid mathematical formula that could be put into a calculator, but that is time away from the goal of this article SCD41 and CO2 measurement.
Characteristics of NDIR Sensors
NDIR sensors measure CO2 by exploiting its property to absorb IR light at around 4.2 µm. They use a non-dispersive band-pass filter to allow only the relevant IR wavelengths to pass, hence the name Non-Dispersive Infrared. An image from Wikipedia below outlines various gases and their Mid-infrared absorption spectra:
How does Photoacoustic NDIR Work?
Photoacoustic Non-Dispersive Infrared (NDIR) technology is an advanced method used in sensors like the SCD41 to measure gas concentrations, such as CO2, using light and sound. Here’s a brief explanation of how it works:
Key Steps in Photoacoustic NDIR
Infrared Light Source: An infrared (IR) light source emits light at wavelengths absorbed by CO2.
Gas Absorption: CO2 molecules absorb this light, causing them to heat up and vibrate.
Pressure Waves: The vibrations create tiny sound waves.
Microphone Detection: A microphone detects these pressure waves.
Signal Processing: The sensor converts these waves into electrical signals to determine CO2 concentration.
Output: The final CO2 concentration is displayed for monitoring or further processing.
This method offers high sensitivity and accuracy, making it suitable for real-time CO2 monitoring in various applications, including indoor air quality and industrial processes. The SCD41 sensor is compact, low-power, and cost-effective, ideal for integration into diverse systems.
Transmissive NDIR
These sensors feature an IR emitter and an optical detector at opposite ends of an optical cavity. Here’s a quick rundown of how they work:
IR Emitter: Emits light through the gas sample.
Absorption by CO2: CO2 absorbs specific wavelengths of the IR light.
Detection: The detector measures the transmitted IR light.
Calculation: CO2 concentration is calculated based on the difference in emitted and transmitted light.
Transmissive NDIR sensors require precise positioning and minimal optical path length to ensure accurate readings.
TVOC Sensors and eCO2 Readings
Using Total Volatile Organic Compounds (TVOC) sensors, such as the Sensirion SGP30, to estimate CO2 levels is generally unreliable. TVOC sensors measure the concentration of various organic compounds in the air, which can include emissions from household products, cooking, cleaning agents, and even human breath. While these sensors are adept at detecting a wide range of VOCs, they are not designed to specifically measure CO2.
Why TVOC Sensors Fall Short for CO2 Estimation
Broad Detection Spectrum: TVOC sensors detect a broad range of organic compounds, not just CO2. This means they can be influenced by numerous sources of VOCs that are unrelated to CO2 levels, such as air fresheners, deodorizers, and various chemical products used indoors.
Lack of Specificity: TVOC sensors lack the specificity required to accurately distinguish between CO2 and other VOCs. The presence of other VOCs can cause the sensor to give false indications of high CO2 levels.
Environmental Factors: Various environmental factors such as temperature, humidity, and the presence of other gases can affect the readings of TVOC sensors, further complicating their accuracy when estimating CO2 levels.
Despite these limitations, some vendors and manufacturers still promote TVOC sensors for CO2 estimation. This can be misleading for consumers who may believe they are getting accurate CO2 measurements. It is important for users to understand these limitations and consider more reliable methods, such as NDIR or photoacoustic sensors, for precise CO2 monitoring.
Recommended Use of TVOC Sensors
While TVOC sensors are not suitable for accurate CO2 measurement, they are valuable tools for:
Indoor Air Quality Monitoring: Detecting the presence of harmful VOCs and identifying sources of indoor air pollution.
Health and Safety: Ensuring that indoor environments are free from harmful concentrations of volatile organic compounds.
Industrial Applications: Monitoring air quality in industrial settings to ensure compliance with safety regulations.
Comparing Low-Cost CO2 Sensors
NDIR Sensors
NDIR sensors measure CO2 based on gas absorption of IR light. They typically feature an IR emitter, dual channels for reference and measurement, and calculate CO2 concentration by comparing light absorption in these channels.
Photo-Acoustic Sensors
Photo-acoustic sensors also measure absorption but use a microphone to detect the resulting pressure waves. They are smaller and do not rely on line-of-sight, making them suitable for compact applications.
In the image below we can see a comparison of the photoacoustic and transmissive NDIR sensors:
Working Principle References:
https://www.airgradient.com/blog/co2-sensors-photo-acoustic-vs-ndir/
https://www.sensirion.com/resource/application_note/ndir-sensors-types
Setting up the Circuit
The circuit used here is fairly simple as the SCD41 has 4 inputs.
GROUND = GND
Voltage_in = VDD
SCL = Serial Clock Line
SDA = Serial Data Line
We also use an LCD to monitor the output and one switch to control calibration on startup(two switches for toggling wifi). For the sake of being able to recreate this fast with the same board here are the associated pin numbers and side.
SCD41
SDA: 14-Left
SCL: 17-Left
VDD: 1-Right
GND: 19-Left
Switch Calibration(Left Switch)
Top(ON): 12-Right
GND: Ground Bar-Right
Bottom: N/A
Switch Wifi(Right Switch)
Top(ON): 13-Right
GND:Ground Bar - Right
Bottom: N/A
LCD
SDA:10-Left
SCL:7-Left
VCC: 12-Right
GND: Ground Bar-Right
Buzzer
15-Right
Ground Bar - Right
Note that with some changes to the code one can omit the LCD and the switches, but being able to calibrate on the fly with fresh air is nice as is having an offline version of the sensor. Also note that at 3.3V the peak supply current is typical 175mA with a max of 205mA and at 5.0V typical is 115mA with a max of 137mA. I personally experienced many issues using the 3.3V and switched to 5V and experienced absolutely no issues.
The MicroPython library used is below. Simply open up the file in Thonny and save it to the device as SCD41.py so it can be imported in the main.py script. Note that the comments can be removed to cut down on space on your device. Implementing the Google Drive database and the ThingSpeak database are separate posts. See Raspberry Pi Sensor Server Project for more details on setting that up.
Library Code
# SCD41.py
#This code is pretty heavily based off of the Sensirion type code at: https://github.com/octaprog7/SCD4x
# And this library written by Sensirion: https://github.com/Sensirion/python-i2c-scd
# This file should be saved as SCD41.py and called as import SCD41. See main.py in this folder for an example usage.
import time
import math
from machine import SoftI2C, Pin, SPI
import micropython
import ustruct
@micropython.native
def _calc_crc(sequence) -> int:
"""
Calculate CRC-8 checksum for the given sequence.
"""
return crc8(sequence, polynomial=0x31, init_value=0xFF)
@micropython.native
def check_value(value: int, valid_range, error_msg: str) -> int:
"""
Check if the value is within the valid range. Raise ValueError if not.
"""
if value not in valid_range:
raise ValueError(error_msg)
return value
class Device:
"""Base class for devices."""
def __init__(self, adapter, address: [int, SPI], big_byte_order: bool):
"""
Initialize the device with adapter, address, and byte order.
"""
self.adapter = adapter
self.address = address
self.big_byte_order = big_byte_order
self.msb_first = True
def _get_byteorder_as_str(self) -> tuple:
"""
Return the byte order as a string ('big' or 'little') and format character ('>' or '<').
"""
if self.is_big_byteorder():
return 'big', '>'
else:
return 'little', '<'
def unpack(self, fmt_char: str, source: bytes, redefine_byte_order: str = None) -> tuple:
"""
Unpack the given source bytes according to the format character and byte order.
"""
if not fmt_char:
raise ValueError(f"Invalid length fmt_char parameter: {len(fmt_char)}")
bo = self._get_byteorder_as_str()[1]
if redefine_byte_order is not None:
bo = redefine_byte_order[0]
return ustruct.unpack(bo + fmt_char, source)
@micropython.native
def is_big_byteorder(self) -> bool:
"""
Check if the device uses big byte order.
"""
return self.big_byte_order
class BaseSensor(Device):
"""Base class for sensors."""
def get_id(self):
raise NotImplementedError
def soft_reset(self):
raise NotImplementedError
class Iterator:
"""Iterator class for sensors."""
def __iter__(self):
return self
def __next__(self):
raise NotImplementedError
class BitField:
"""Class for working with bit fields."""
def __init__(self, start: int, stop: int, alias: [str, None]):
"""
Initialize the bit field with start, stop, and alias.
"""
check(start, stop)
self.alias = alias
self.start = start
self.stop = stop
self.bitmask = _bitmask(start, stop)
def put(self, source: int, value: int) -> int:
"""
Write the value to the specified bit range in the source.
"""
src = source & ~self.bitmask
src |= (value << self.start) & self.bitmask
return src
def get(self, source: int) -> int:
"""
Get the value from the specified bit range in the source.
"""
return (source & self.bitmask) >> self.start
@micropython.native
def put(start: int, stop: int, source: int, value: int) -> int:
"""
Write the value to the specified bit range in the source.
"""
check(start, stop)
bitmask = _bitmask(start, stop)
src = source & bitmask
src |= (value << start) & bitmask
return src
@micropython.native
def _bitmask(start: int, stop: int) -> int:
"""
Generate a bitmask from start to stop bits.
"""
res = 0
for i in range(start, 1 + stop):
res |= 1 << i
return res
def check(start: int, stop: int):
"""
Check if start is less than or equal to stop. Raise ValueError if not.
"""
if start > stop:
raise ValueError(f"Invalid start: {start}, stop value: {stop}")
def _mpy_bl(value: int) -> int:
"""
Calculate the bit length of the value.
"""
if 0 == value:
return 0
return 1 + int(math.log2(abs(value)))
class BusAdapter:
"""Adapter class for bus communication."""
def __init__(self, bus: [I2C, SPI]):
"""
Initialize the adapter with the bus.
"""
self.bus = bus
def get_bus_type(self) -> type:
"""
Return the type of the bus.
"""
return type(self.bus)
def read_register(self, device_addr: [int, Pin], reg_addr: int, bytes_count: int) -> bytes:
raise NotImplementedError
def write_register(self, device_addr: [int, Pin], reg_addr: int, value: [int, bytes, bytearray],
bytes_count: int, byte_order: str):
raise NotImplementedError
def read(self, device_addr: [int, Pin], n_bytes: int) -> bytes:
raise NotImplementedError
def write(self, device_addr: [int, Pin], buf: bytes):
raise NotImplementedError
def write_const(self, device_addr: [int, Pin], val: int, count: int):
"""
Write a constant value to the device multiple times.
"""
if 0 == count:
return
bl = _mpy_bl(val)
if bl > 8:
raise ValueError(f"The value must take no more than 8 bits! Current: {bl}")
_max = 16
if count < _max:
_max = count
repeats = count // _max
b = bytearray([val for _ in range(_max)])
for _ in range(repeats):
self.write(device_addr, b)
remainder = count - _max * repeats
if remainder:
b = bytearray([val for _ in range(remainder)])
self.write(device_addr, b)
class I2cAdapter(BusAdapter):
"""Adapter class for I2C bus communication."""
def __init__(self, bus: I2C):
super().__init__(bus)
def write_register(self, device_addr: int, reg_addr: int, value: [int, bytes, bytearray],
bytes_count: int, byte_order: str):
"""
Write to the register of the device.
"""
buf = None
if isinstance(value, int):
buf = value.to_bytes(bytes_count, byte_order)
if isinstance(value, (bytes, bytearray)):
buf = value
return self.bus.writeto_mem(device_addr, reg_addr, buf)
def read_register(self, device_addr: int, reg_addr: int, bytes_count: int) -> bytes:
"""
Read from the register of the device.
"""
return self.bus.readfrom_mem(device_addr, reg_addr, bytes_count)
def read(self, device_addr: int, n_bytes: int) -> bytes:
"""
Read bytes from the device.
"""
return self.bus.readfrom(device_addr, n_bytes)
def readfrom_into(self, device_addr: int, buf):
"""
Read bytes from the device into the buffer.
"""
return self.bus.readfrom_into(device_addr, buf)
def read_buf_from_mem(self, device_addr: int, mem_addr, buf):
"""
Read bytes from the device memory into the buffer.
"""
return self.bus.readfrom_mem_into(device_addr, mem_addr, buf)
def write(self, device_addr: int, buf: bytes):
"""
Write bytes to the device.
"""
return self.bus.writeto(device_addr, buf)
def write_buf_to_mem(self, device_addr: int, mem_addr, buf):
"""
Write bytes to the device memory.
"""
return self.bus.writeto_mem(device_addr, mem_addr, buf)
class SCD4xSensirion(BaseSensor, Iterator):
"""Class for SCD4x Sensirion CO2 sensor."""
def __init__(self, adapter: I2cAdapter, address=0x62, this_is_scd41: bool = True, check_crc: bool = True):
"""
Initialize the sensor with adapter, address, and settings.
"""
super().__init__(adapter, address, True)
self._buf_3 = bytearray((0 for _ in range(3)))
self._buf_9 = bytearray((0 for _ in range(9)))
self.check_crc = check_crc
self._low_power_mode = False
self._single_shot_mode = False
self._rht_only = False
self._isSCD41 = this_is_scd41
self.byte_order = self._get_byteorder_as_str()
def _get_local_buf(self, bytes_for_read: int) -> [None, bytearray]:
"""
Return the local buffer for reading.
"""
if bytes_for_read not in (0, 3, 9):
raise ValueError(f"Invalid value for bytes_for_read: {bytes_for_read}")
if not bytes_for_read:
return None
if 3 == bytes_for_read:
return self._buf_3
return self._buf_9
def _to_bytes(self, value, length: int):
"""
Convert value to bytes with specified length.
"""
byteorder = self.byte_order[0]
return value.to_bytes(length, byteorder)
def _write(self, buf: bytes) -> bytes:
"""
Write buffer to the device.
"""
return self.adapter.write(self.address, buf)
def _readfrom_into(self, buf):
"""
Read bytes from the device into the buffer.
"""
return self.adapter.readfrom_into(self.address, buf)
def _send_command(self, cmd: int, value: [bytes, None], wait_time: int = 0, bytes_for_read: int = 0,
crc_index: range = None, value_index: tuple = None) -> [bytes, None]:
"""
Send a command to the sensor.
"""
raw_cmd = self._to_bytes(cmd, 2)
raw_out = raw_cmd
if value:
raw_out += value
raw_out += self._to_bytes(_calc_crc(value), 1)
self._write(raw_out)
if wait_time:
time.sleep_ms(wait_time)
if not bytes_for_read:
return None
b = self._get_local_buf(bytes_for_read)
self._readfrom_into(b)
check_value(len(b), (bytes_for_read,), f"Invalid buffer length for cmd: {cmd}. Received {len(b)} out of {bytes_for_read}")
if self.check_crc:
crc_from_buf = [b[i] for i in crc_index]
calculated_crc = [_calc_crc(b[rng.start:rng.stop]) for rng in value_index]
if crc_from_buf != calculated_crc:
raise ValueError(f"Invalid CRC! Calculated{calculated_crc}. From buffer {crc_from_buf}")
return b
def save_config(self):
"""
Save the sensor configuration to EEPROM.
"""
cmd = 0x3615
self._send_command(cmd, None, 800)
def get_id(self) -> tuple:
"""
Get the unique serial number of the sensor.
"""
cmd = 0x3682
b = self._send_command(cmd, None, 0, bytes_for_read=9,
crc_index=range(2, 9, 3), value_index=(range(2), range(3, 5), range(6, 8)))
return tuple([(b[i] << 8) | b[i+1] for i in range(0, 9, 3)])
def soft_reset(self):
"""
Perform a soft reset of the sensor.
"""
return None
def exec_self_test(self) -> bool:
"""
Execute self-test on the sensor. Returns True if successful.
"""
cmd = 0x3639
length = 3
b = self._send_command(cmd, None, wait_time=10_000,
bytes_for_read=length, crc_index=range(2, 3), value_index=(range(2),))
res = self.unpack("H", b)[0]
return 0 == res
def reinit(self) -> None:
"""
Reinitialize the sensor by reloading user settings from EEPROM.
"""
cmd = 0x3646
self._send_command(cmd, None, 20)
def set_temperature_offset(self, offset: float):
"""
Set the temperature offset for the sensor.
"""
cmd = 0x241D
offset_raw = self._to_bytes(int(374.49142857 * offset), 2)
self._send_command(cmd, offset_raw, 1)
def get_temperature_offset(self) -> float:
"""
Get the temperature offset from the sensor.
"""
cmd = 0x2318
b = self._send_command(cmd, None, wait_time=1, bytes_for_read=3, crc_index=range(2, 3), value_index=(range(2),))
temp_offs = self.unpack("H", b)[0]
return 0.0026702880859375 * temp_offs
def set_altitude(self, masl: int):
"""
Set the altitude for the sensor in meters above sea level.
"""
cmd = 0x2427
masl_raw = self._to_bytes(masl, 2)
self._send_command(cmd, masl_raw, 1)
def get_altitude(self) -> int:
"""
Get the altitude from the sensor in meters above sea level.
"""
cmd = 0x2322
b = self._send_command(cmd, None, wait_time=1, bytes_for_read=3, crc_index=range(2, 3), value_index=(range(2),))
return self.unpack("H", b)[0]
def set_ambient_pressure(self, pressure: float):
"""
Set the ambient pressure for the sensor in Pascals.
"""
cmd = 0xE000
press_raw = self._to_bytes(int(pressure // 100), 2)
self._send_command(cmd, press_raw, 1)
def force_recalibration(self, target_co2_concentration: int) -> int:
"""
Force recalibration of the sensor with the target CO2 concentration.
"""
check_value(target_co2_concentration, range(2**16),
f"Invalid target CO2 concentration: {target_co2_concentration} ppm")
cmd = 0x362F
target_raw = self._to_bytes(target_co2_concentration, 2)
b = self._send_command(cmd, target_raw, 400, 3, crc_index=range(2, 3), value_index=(range(2),))
return self.unpack("h", b)[0]
def is_auto_calibration(self) -> bool:
"""
Check if automatic self-calibration is enabled on the sensor.
"""
cmd = 0x2313
b = self._send_command(cmd, None, 1, 3, crc_index=range(2, 3), value_index=(range(2),))
return 0 != self.unpack("H", b)[0]
def set_auto_calibration(self, value: bool):
"""
Enable or disable automatic self-calibration on the sensor.
"""
cmd = 0x2416
value_raw = self._to_bytes(value, 2)
self._send_command(cmd, value_raw, 1, 3)
def set_measurement(self, start: bool, single_shot: bool = False, rht_only: bool = False):
"""
Start or stop periodic measurements, or perform a single shot measurement.
"""
if single_shot:
return self._single_shot_meas(rht_only)
return self._periodic_measurement(start)
def _periodic_measurement(self, start: bool):
"""
Start or stop periodic measurements.
"""
wt = 0
if start:
cmd = 0x21AC if self._low_power_mode else 0x21B1
else:
cmd = 0x3F86
wt = 500
self._send_command(cmd, None, wt)
self._single_shot_mode = False
self._rht_only = False
def get_meas_data(self) -> tuple:
"""
Get the measurement data from the sensor (CO2, temperature, and humidity).
"""
cmd = 0xEC05
val_index = (range(2), range(3, 5), range(6, 8))
b = self._send_command(cmd, None, 1, bytes_for_read=9,
crc_index=range(2, 9, 3), value_index=val_index)
words = [self.unpack("H", b[val_rng.start:val_rng.stop])[0] for val_rng in val_index]
return words[0], -45 + 0.0026703288 * words[1], 0.0015259022 * words[2]
def is_data_ready(self) -> bool:
"""
Check if the measurement data is ready to be read from the sensor.
"""
cmd = 0xE4B8
b = self._send_command(cmd, None, 1, 3, crc_index=range(2, 3), value_index=(range(2),))
return bool(self.unpack("H", b)[0] & 0b0000_0111_1111_1111)
@micropython.native
def get_conversion_cycle_time(self) -> int:
"""
Get the conversion cycle time of the sensor in milliseconds.
"""
if self.is_single_shot_mode and self.is_rht_only:
return 50
return 5000
def set_power(self, value: bool):
"""
Power up or power down the sensor.
"""
if not self._isSCD41:
return
cmd = 0x36F6 if value else 0x36E0
wt = 20 if value else 1
self._send_command(cmd, None, wt)
def _single_shot_meas(self, rht_only: bool = False):
"""
Perform a single shot measurement.
"""
if not self._isSCD41:
return
cmd = 0x2196 if rht_only else 0x219D
self._send_command(cmd, None, 0)
self._single_shot_mode = True
self._rht_only = rht_only
@property
def is_single_shot_mode(self) -> bool:
"""
Check if the sensor is in single shot mode.
"""
return self._single_shot_mode
@property
def is_rht_only(self) -> bool:
"""
Check if the sensor is in RHT-only mode.
"""
return self._rht_only
def __iter__(self):
return self
def __next__(self) -> [tuple, None]:
"""
Get the next set of measurement data.
"""
if self._single_shot_mode:
return None
if self.is_data_ready():
return self.get_meas_data()
return None
def pa_mmhg(value: float) -> float:
"""
Convert air pressure from Pascals to millimeters of mercury.
"""
return 7.50062E-3 * value
def crc8(sequence, polynomial: int, init_value: int = 0x00):
"""
Calculate CRC-8 checksum for the given sequence.
"""
mask = 0xFF
crc = init_value & mask
for item in sequence:
crc ^= item & mask
for _ in range(8):
if crc & 0x80:
crc = mask & ((crc << 1) ^ polynomial)
else:
crc = mask & (crc << 1)
return crc
def check_device_presence(i2c, address):
"""
Check if a device with the given address is present on the I2C bus.
"""
devices = i2c.scan()
return address in devices
And once you have that done run the following code below on your device or save it to main.py, whatever you’d like. Note that we power cycle every 6 hours on the device to ensure that we don’t run into memory issues. I encountered a memory leak that became a problem roughly 5 days in and felt that chasing it down was a waste compared to a simple power cycling using machine.reset().
Main Code
# main.py
from machine import SoftI2C, Pin
import time
from SCD41 import SCD4xSensirion, I2cAdapter, check_device_presence
from ssd1306 import SSD1306_I2C # Ensure you have the SSD1306 library
import urequests as requests
import network
import json
import utime
import usocket as socket
import ssl
import ntptime
# ThingSpeak settings
THINGSPEAK_API_KEY = 'YOUR_KEY_HERE'
THINGSPEAK_URL = 'https://api.thingspeak.com/update'
THINGSPEAK_CHANNEL_ID = '00000000'
THINGSPEAK_BULK_UPDATE_URL = 'https://api.thingspeak.com/channels/'+str(THINGSPEAK_CHANNEL_ID)+'/bulk_update.json'
SEND_TO_THINGSPEAK = True
thingspeak_buffer = [] # Buffer for ThingSpeak data
# Google Sheets settings
SPREADSHEET_ID = 'VERY_LONG_SPREADSHEET_ID_HERE'
RANGE_NAME = 'Sheet1!A1:C1'
SHEET_NAME = 'Sheet1'
GOOGLE_URL = 'https://script.google.com/macros/s/VERY_LONG_URL/exec'
# WiFi settings
SSID = 'YOUR_SSID'
PASSWORD = 'WIFI_PSWD'
def connect_wifi(ssid, password):
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(ssid, password)
while not wlan.isconnected():
time.sleep(1)
print("Connecting to WiFi...")
print("Connected to WiFi")
print(wlan.ifconfig())
def get_time_chicago():
max_retries = 100
for attempt in range(max_retries):
try:
ntptime.settime()
current_time = utime.localtime()
break
except OSError as e:
print(f"Failed to get NTP time, attempt {attempt + 1} of {max_retries}. Error: {e}")
time.sleep(1)
else:
print("Could not get NTP time, proceeding without time synchronization.")
return utime.localtime()
# Determine if it is daylight saving time (DST)
month = current_time[1]
day = current_time[2]
hour = current_time[3]
if (month > 3 and month < 11) or (month == 3 and day >= 8 and hour >= 2) or (month == 11 and day < 1 and hour < 2):
is_dst = True
else:
is_dst = False
offset = -6 * 3600 if not is_dst else -5 * 3600
local_time = utime.mktime(current_time) + offset
return utime.localtime(local_time)
# Function to sound the buzzer
def sound_buzzer():
for _ in range(5):
buzzer.value(1) # Turn on buzzer
time.sleep(0.5) # 500 ms delay
buzzer.value(0) # Turn off buzzer
time.sleep(0.5) # 500 ms delay
def send_data_to_google_sheets(data):
url = GOOGLE_URL # Define your Google URL here
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
encoded_data = (
"Date=" + data['date'] +
"&Time=" + data['time'] +
"&Temp=" + str(data['temp_f']) +
"&Humidity=" + str(data['humidity']) +
"&Lux=" + str(data['lux']) +
"&CCT=" + str(data['color_temp']) +
"&Soil Moisture=" + str(data['soil_moisture']) +
"&Soil Temp=" + str(data['soil_temp']) +
"&R=" + str(data['r']) +
"&G=" + str(data['g']) +
"&B=" + str(data['b']) +
"&HTML=" + data['html']
)
try:
# Extract host and path from URL
_, _, host, path = url.split('/', 3)
# Set up a socket connection
addr = socket.getaddrinfo(host, 443)[0][-1]
s = socket.socket()
s.connect(addr)
s = ssl.wrap_socket(s)
# Create the HTTP request manually
request = f"POST /{path} HTTP/1.1\r\nHost: {host}\r\n"
request += "Content-Type: application/x-www-form-urlencoded\r\n"
request += f"Content-Length: {len(encoded_data)}\r\n\r\n"
request += encoded_data
# Send the request
s.write(request)
# Close the socket
s.close()
print('Data sent to Google Sheets!')
except Exception as e:
print('Failed to send data to Google Sheets:', e)
def send_data_to_thingspeak():
"""Send data to ThingSpeak."""
if SEND_TO_THINGSPEAK and thingspeak_buffer:
if len(thingspeak_buffer) > 1:
# Bulk update
payload = {
'write_api_key': THINGSPEAK_API_KEY,
'updates': []
}
for data in thingspeak_buffer:
update = {
'created_at': f"{data['date']} {data['time']} -0500",
'field1': data['soil_temp'],
'field2': data['soil_moisture'],
'field3': data['lux'],
'field4': data['color_temp'],
'field5': data['temp_f'],
'field6': data['humidity']
}
payload['updates'].append(update)
try:
headers = {'Content-Type': 'application/json'}
json_data = json.dumps(payload)
response = requests.post(THINGSPEAK_BULK_UPDATE_URL, headers=headers, data=json_data)
if response.status_code == 202:
print('Data posted to ThingSpeak (bulk update):', response.text)
thingspeak_buffer.clear() # Clear the buffer after successful update
else:
print(f'Failed to send data to ThingSpeak (bulk update): {response.status_code}, {response.text}')
except Exception as e:
print('Failed to send data to ThingSpeak (bulk update):', e)
else:
data = thingspeak_buffer.pop(0) # Get the first item in the buffer
payload = {
'api_key': THINGSPEAK_API_KEY,
'field1': data['soil_temp'],
'field2': data['soil_moisture'],
'field3': data['lux'],
'field4': data['color_temp'],
'field5': data['temp_f'],
'field6': data['humidity']
}
try:
response = requests.post(THINGSPEAK_URL, json=payload)
if response.status_code == 200:
print('Data posted to ThingSpeak:', response.text)
else:
print(f'Failed to send data to ThingSpeak: {response.status_code}, {response.text}')
except Exception as e:
print('Failed to send data to ThingSpeak:', e)
# Pin definitions
DHT_PIN = Pin(27)
DHT_POWER_PIN = Pin(26, Pin.OUT)
# SEESAW_POWER_PIN = Pin(25, Pin.OUT)
TCS34725_POWER_PIN = Pin(33, Pin.OUT)
OLED_POWER_PIN = Pin(32, Pin.OUT)
TCS34725_LED_PIN = Pin(23, Pin.OUT) # GPIO 23 for TCS34725 LED control
# Setup GPIO for Buzzer
BUZZER_GPIO = 13 # Use GPIO 25 for the buzzer
buzzer = machine.Pin(BUZZER_GPIO, Pin.OUT)
# Initialize separate I2C buses
i2c_oled = SoftI2C(scl=Pin(22), sda=Pin(21))
i2c_seesaw = SoftI2C(scl=Pin(16), sda=Pin(17))
i2c_tcs = SoftI2C(scl=Pin(18), sda=Pin(19))
# Scan I2C buses
OLED_POWER_PIN.value(1)
print('Scanning I2C bus for OLED...')
devices_oled = i2c_oled.scan()
OLED_POWER_PIN.value(0)
print('OLED I2C devices:', [hex(device) for device in devices_oled])
time.sleep(1)
print('Scanning I2C bus for Seesaw...')
# SEESAW_POWER_PIN.value(1)
devices_seesaw = i2c_seesaw.scan()
# SEESAW_POWER_PIN.value(0)
print('Seesaw I2C devices:', [hex(device) for device in devices_seesaw])
time.sleep(1)
print('Scanning I2C bus for TCS34725...')
TCS34725_POWER_PIN.value(1)
TCS34725_LED_PIN.value(0) # Start with the LED off
devices_tcs = i2c_tcs.scan()
print('TCS34725 I2C devices:', [hex(device) for device in devices_tcs])
# Initialize DHT22 sensor and OLED display
DHT_POWER_PIN.value(1)
dht_sensor = DHT22(DHT_PIN)
DHT_POWER_PIN.value(0)
OLED_POWER_PIN.value(1)
oled = SSD1306_I2C(128, 64, i2c_oled)
# Ensure the addresses are correct (default addresses used)
if 0x36 in devices_seesaw:
soil_sensor = StemmaSoilSensor(i2c_seesaw)
else:
print("Seesaw sensor not found!")
if 0x39 in devices_tcs:
tcs = TCS34725(i2c_tcs)
else:
print("TCS34725 sensor not found!")
# # Function to adjust gain and integration time based on Lux value
# def adjust_tcs_settings(tcs, lux):
# print(lux)
# if lux > 10000:
# integration_time = 50 # Short integration time
# gain = 1 # Low gain
# elif lux > 1000:
# integration_time = 100 # Moderate integration time
# gain = 1 # Moderate gain
# elif lux > 100:
# integration_time = 150 # Long integration time
# gain = 4 # High gain
# else:
# integration_time = 200 # Longest integration time
# gain = 16 # Highest gain
#
# tcs.integration_time(integration_time)
# tcs.gain(gain)
# print(f'Adjusted TCS34725 settings: Integration Time = {integration_time} ms, Gain = {gain}')
# Function to reinitialize the TCS34725 sensor with retries and initial Lux adjustment
def reinitialize_tcs_sensor(i2c_tcs, retries=9):
for attempt in range(retries):
try:
tcs = TCS34725(i2c_tcs)
tcs.gain(4)
tcs.integration_time(150)
time.sleep(1)
tcs.active(True)
tcs.led(False) # Ensure LED is off
time.sleep(1) # Wait for the sensor to stabilize
cct, lux = tcs.read() # Get initial readings to dynamically adjust Lux values
cct, lux = tcs.read() # Get initial readings to dynamically adjust Lux values
print("Intial CCT:",cct)
print("Initial Lux:",lux)
cct, lux = tcs.read() # Get initial readings to dynamically adjust Lux values
# adjust_tcs_settings(tcs, lux) # Adjust settings based on Lux
print("2nd CCT:",cct)
print("2nd Lux:",lux)
time.sleep(1) # Wait for settings to take effect
print(f"TCS34725 sensor initialized on attempt {attempt + 1}")
return tcs, cct, lux
except Exception as e:
print(f"Error initializing TCS34725 sensor: {e}, retrying...")
time.sleep(1)
raise RuntimeError("Failed to initialize TCS34725 sensor after multiple attempts")
def main():
# Connect to WiFi
connect_wifi(SSID, PASSWORD)
count = 0
# Main loop
while True:
try:
count = count +1
# Read DHT22 sensor
print("Reading DHT22 sensor...")
DHT_POWER_PIN.value(1)
time.sleep(2) # Wait for the sensor to stabilize
dht_sensor.measure()
temp_dht22 = dht_sensor.temperature()
humidity = dht_sensor.humidity()
temp_fahrenheit = int(temp_dht22 * 9 / 5 + 32)
humidity = int(humidity)
DHT_POWER_PIN.value(0)
# Read Seesaw (Soil) sensor if it is found
if 'soil_sensor' in locals():
print("Reading Seesaw sensor...")
soil_temp = soil_sensor.get_temp()
soil_temp = int(soil_temp * 9 / 5 + 32)
soil_moisture = soil_sensor.get_moisture()
else:
soil_temp = None
soil_moisture = None
# Reinitialize and read TCS34725 sensor
print("Reinitializing and reading TCS34725 sensor...")
TCS34725_POWER_PIN.value(1)
TCS34725_LED_PIN.value(0) # Turn off the LED
tcs, cct, lux = reinitialize_tcs_sensor(i2c_tcs)
time.sleep(1) # Wait for the sensor to stabilize
try:
raw_data = tcs.read(True)
print(f"Raw data: {raw_data}")
r, g, b, c = raw_data
cct, lux = tcs.read()
print(f"RGB: ({r}, {g}, {b}), Clear: {c}, CCT: {cct}, Lux: {lux}")
html_rgb_val = tcs.html_rgb(raw_data)
html_hex_val = tcs.html_hex(raw_data)
print(f"HTML RGB: {html_rgb_val}, HTML Hex: {html_hex_val}")
except RuntimeError as e:
print("Error reading TCS34725:", e)
html_hex_val = "Error"
lux = cct = 0
TCS34725_LED_PIN.value(0) # Turn off the LED
TCS34725_POWER_PIN.value(0)
# Get current date and time
current_time = get_time_chicago()
date_str = "{:04}-{:02}-{:02}".format(current_time[0], current_time[1], current_time[2])
time_str = "{:02}:{:02}:{:02}".format(current_time[3], current_time[4], current_time[5])
# Print sensor data to the console
print(f'Color: {html_hex_val}')
print(f'Lux: {lux}')
print(f'CCT: {cct}')
print(f'Temp: {temp_fahrenheit} Hum:{humidity}')
if soil_temp is not None and soil_moisture is not None:
print(f'Soil Temp: {soil_temp} F')
print(f'Soil Moisture: {soil_moisture}')
# Display data on OLED
print("Updating OLED display...")
oled.fill(0)
if soil_temp is not None:
oled.text(f'Soil Temp: {soil_temp}F', 0, 0)
if soil_moisture is not None:
print("Count: ", count)
oled.text(f'Moisture: {soil_moisture}', 0, 10)
if count > 60:
count = 0
if soil_moisture < 600:
sound_buzzer()
print("Buzzer sounded")
oled.text(f'RGB: {int(html_rgb_val[0])},{int(html_rgb_val[1])},{int(html_rgb_val[2])}', 0, 20)
oled.text(f'Lux:{int(lux)}', 0, 30)
oled.text(f'CCT:{int(cct)}',0,40)
oled.text(f'Temp:{temp_fahrenheit}F Hum:{humidity}%', 0, 50)
oled.show()
print(' ')
# Prepare data for ThingSpeak
data = {
'soil_temp': soil_temp,
'soil_moisture': soil_moisture,
'lux': lux,
'color_temp': cct,
'temp_f': temp_fahrenheit,
'humidity': humidity
}
# Prepare data for ThingSpeak
thingspeak_buffer.append(data)
# Send data to ThingSpeak
send_data_to_thingspeak()
data = {
'date': date_str,
'time': time_str,
'temp_f': temp_fahrenheit,
'humidity': humidity,
'lux': lux,
'color_temp': cct,
'soil_moisture': soil_moisture,
'soil_temp': soil_temp,
'r': int(html_rgb_val[0]),
'g': int(html_rgb_val[1]),
'b': int(html_rgb_val[2]),
'html': html_hex_val
}
# Send data to Google Sheets
send_data_to_google_sheets(data)
# Wait before the next update
time.sleep(45)
except Exception as e:
print("Error occurred:", e)
time.sleep(5) # Wait for 5 seconds before retrying
if __name__ == '__main__':
main()
I’m not going to the go too much into the code as the SCD41 chip supports a variety of different modes and unless you do a deep dive into the datasheet and the code for some cursed reason you should be fine with the code above. I was able to log readings fairly close to the reported values for my town and breathing on the device and near the device increased the sensor readings reliably. I did note that the sensor took some time to get back down to baseline and took a bit to get up to the max with me breathing on it. It was roughly 2.5 minutes to get back down to baseline and that’s roughly in line with what we would ideally sample at due to the humidity sampling interval mentioned in the technical specs above. See image below:
Limitations of the SCD41
As mentioned above we will have a slight delay in logging values so the results won’t be instantaneous. Additionally as mentioned by: https://github.com/octaprog7/SCD4x one will notice an increase in temperature reading from the sensor if the interval is less than 15 seconds as the sensor will self heat. Additionally, both the SCD40/41 sensors have an auto-calibrate mode which will take the lowest CO2 value from the past 7 days and assumes it is 400ppm. This can cause sensor drift over time unless one regularly(once a week) exposes the sensor to fresh air. It is possible to turn off this auto-calibration mode and one will note that in the code above[set_auto_calibration]. An anecdote from User Anx2K on the r/ESP32 Reddit mentions that there is roughly a 40ppm(10%) drift year to year if one turns off auto calibration. Calibration takes 5 minutes, but is an inherently manual process. It would be nice to be somewhere near a weather monitoring station and set up this CO2 monitor alongside a MH-Z19B/C and MH-Z1+ which use NDIR and compare the initial and end of 24 hour values and assess drift.
Applications of the SCD41
I am particularly interested in environmental monitoring to assess indoor air quality at a competitive price, allowing users to swap sensors in and out as needed. This flexibility is crucial for adapting to different monitoring requirements without significant additional costs.
Additionally, monitoring transient dump flows from businesses during off-hours (midnight to 3 AM) is vital. This involves tracking the release of CO2 and other gaseous components using the MQ-* sensor series. While this approach may not provide quantitative measurements due to various confounding factors, it serves as an initial assessment tool. This preliminary analysis can justify the purchase or lease of more advanced testing equipment, costing under $100 in upfront expenses and requiring only a few hours of labor.
Another intriguing experiment involves mapping a town with sensors placed every few blocks to monitor localized CO2 concentrations. This data can be correlated with consumer, commercial, and industrial activities, providing valuable insights into the town’s environmental impact.
Other applications include monitoring the respiratory activity of plants by measuring CO2 exchange in greenhouse or agricultural research settings. This can offer insights into plant health, photosynthetic efficiency, and growth patterns. Pairing this with R/G/B, UV, and light sensors can help determine several growth parameters. Note that NPK (Nitrogen/Phosphorous/Potassium) sensors are often unreliable based on my research. Therefore, it’s better to use actual chemical testing methods or automate the chemical testing apparatus rather than relying on electronic sensors for these measurements.
In biotechnology and biochemical engineering contexts, monitoring CO2 levels in cellular cultures and bioreactors is essential. Maintaining precise CO2 concentrations ensures the optimal growth of microorganisms, which can significantly reduce production costs. This is particularly important for the pharmaceutical industry, biofuel production, and other bio-based manufacturing processes.
Personal Testing
In the recent heat wave I noted that it took roughly 30 minutes to air out the house and cut CO2 levels from ~1200ppm to ~650ppm.
Its interesting to note that the temperature jumped only a few degrees over the 30 minutes per below:
The actual “enclosure” of the sensor is just an old Raspberry Pi 5 Case box, a FLIRC model. The solid wire coming from the main board is hooked up to a smaller board used for data display, switching wifi/calibration on/off, and alerting users to high CO2(1500ppm). Note that I didn’t feel like potentially burning the board or buying a soldering attachment to safely remove the LED so I covered it with some cardboard to remove some of the interference caused by the light on the sensor. See below:
Raw Boards, no case
Wifi/Calibration Screen, with case
Normal Output, with case
Conclusion
The SCD41 sensor offers a versatile and accurate solution for CO2 monitoring in various applications. For our particular application, provided we either calibrate it once at the beginning then once more every year we can enjoy accurate CO2 monitoring at a low price.