Raspberry Pi Python client
Source code for Python client.
Based on the InfluxDB client library for Python.
Accessing the sensors
Data comes from bme280 sensor connected to I2C bus. Based on library https://gitlab.com/Kampi/bmp-sensors.
We define a Sensor
class with three methods: __init__
, measure
, and geo
.
class Sensor:
"""Sensor to provide information about Temperature, Humidity, Pressure, ..."""
def __init__(self):
# ...
def measure(self):
# ...
def geo(self):
# ...
Managing device configuration
def configure() -> None:
"""
Retrieve or refresh a configuration from IoT Center.
Successful configuration is set as a global IOT_CONFIGURATION dictionary with following properties:
* id
* influx_url
* influx_org
* influx_token
* influx_bucket
* configuration_refresh
* default_lon
* default_lat
* measurement_interval
"""
global config
global config_received
global influxdb_client
global write_api
# Check freshness of configuration
if config_received and (datetime.utcnow() - config_received).total_seconds() < config['configuration_refresh']:
return
iot_center_url = os.getenv("IOT_CENTER_URL", "http://localhost:5000")
iot_device_id = os.getenv("IOT_DEVICE_ID")
# Request to configuration
config_fresh = fetch_json(f'{iot_center_url}/api/env/{iot_device_id}')
# New or changed configuration
if not config and config_fresh != config:
config = config_fresh
config_received = datetime.utcnow()
influxdb_client = InfluxDBClient(url=config['influx_url'],
token=config['influx_token'],
org=config['influx_org'])
write_api = influxdb_client.write_api(write_options=WriteOptions(batch_size=1))
print(f'Received configuration: {json.dumps(config, indent=4, sort_keys=False)}')
Type annotations
If you’re not familiar with the syntax in
def write() -> None
, this is a type annotation. For more information, see PEP 483.
Writing data to InfluxDB
def write() -> None:
"""Write point into InfluxDB."""
geo = sensor.geo()
measure = sensor.measure()
point = Point("environment") \
.tag("clientId", config['id']) \
.tag("device", "raspberrypi") \
.tag("TemperatureSensor", "bme280") \
.tag("HumiditySensor", "bme280") \
.tag("PressureSensor", "bme280") \
.field("Temperature", measure.temperature) \
.field("Humidity", measure.humidity) \
.field("Pressure", measure.pressure) \
.field("Lat", geo['latitude']) \
.field("Lon", geo['longitude']) \
.time(datetime.utcnow())
print(f"Writing: {point.to_line_protocol()}")
write_api.write(bucket=config['influx_bucket'], record=point)
Batching
Uses batching writes. This line in the configuration funtion controls batching:
write_api = influxdb_client.write_api(write_options=WriteOptions(batch_size=1))
Measuring the environment
Data comes from bme280 sensor connected to the I2C bus. Produced fields: “temperature”, “humidity”, “pressure”.
The measure()
function is a method for the [Sensor
class][Sensor].
def measure(self):
"""
Get measure from bme280 or default values.
:return: Returns object with properties: temperature, pressure and humidity.
"""
if self._bme280:
import bmp_sensors as Sensors
self._bme280.SetMode(Sensors.BME280_Mode.FORCED)
return self._bme280.Measure()
obj = lambda: None # noqa: E731
obj.temperature = 10.21
obj.pressure = 983.72
obj.humidity = 62.36
return obj