Raspberry Pi Python client

Source code for Python client.

Based on the InfluxDB client library for Python.

Accessing the sensors

Data comes from bme280 sensor connected to I2C bus. Based on library https://gitlab.com/Kampi/bmp-sensors.

We define a Sensor class with three methods: __init__, measure, and geo.

class Sensor:
    """Sensor to provide information about Temperature, Humidity, Pressure, ..."""
    def __init__(self):
        # ...
    def measure(self):
        # ...
    def geo(self):
        # ...

(Source)

Managing device configuration

def configure() -> None:
    """
    Retrieve or refresh a configuration from IoT Center.

    Successful configuration is set as a global IOT_CONFIGURATION dictionary with following properties:
        * id
        * influx_url
        * influx_org
        * influx_token
        * influx_bucket
        * configuration_refresh
        * default_lon
        * default_lat
        * measurement_interval
    """
    global config
    global config_received
    global influxdb_client
    global write_api

    # Check freshness of configuration
    if config_received and (datetime.utcnow() - config_received).total_seconds() < config['configuration_refresh']:
        return

    iot_center_url = os.getenv("IOT_CENTER_URL", "http://localhost:5000")
    iot_device_id = os.getenv("IOT_DEVICE_ID")

    # Request to configuration
    config_fresh = fetch_json(f'{iot_center_url}/api/env/{iot_device_id}')

    # New or changed configuration
    if not config and config_fresh != config:
        config = config_fresh
        config_received = datetime.utcnow()
        influxdb_client = InfluxDBClient(url=config['influx_url'],
                                         token=config['influx_token'],
                                         org=config['influx_org'])
        write_api = influxdb_client.write_api(write_options=WriteOptions(batch_size=1))
        print(f'Received configuration: {json.dumps(config, indent=4, sort_keys=False)}')

(Source)

Type annotations

If you’re not familiar with the syntax in def write() -> None, this is a type annotation. For more information, see PEP 483.

Writing data to InfluxDB

def write() -> None:
    """Write point into InfluxDB."""
    geo = sensor.geo()
    measure = sensor.measure()
    point = Point("environment") \
        .tag("clientId", config['id']) \
        .tag("device", "raspberrypi") \
        .tag("TemperatureSensor", "bme280") \
        .tag("HumiditySensor", "bme280") \
        .tag("PressureSensor", "bme280") \
        .field("Temperature", measure.temperature) \
        .field("Humidity", measure.humidity) \
        .field("Pressure", measure.pressure) \
        .field("Lat", geo['latitude']) \
        .field("Lon", geo['longitude']) \
        .time(datetime.utcnow())

    print(f"Writing: {point.to_line_protocol()}")
    write_api.write(bucket=config['influx_bucket'], record=point)

(Source)

Batching

Uses batching writes. This line in the configuration funtion controls batching:

write_api = influxdb_client.write_api(write_options=WriteOptions(batch_size=1))

(Source)

Measuring the environment

Data comes from bme280 sensor connected to the I2C bus. Produced fields: “temperature”, “humidity”, “pressure”.

The measure() function is a method for the [Sensor class][Sensor].

def measure(self):
    """
    Get measure from bme280 or default values.

    :return: Returns object with properties: temperature, pressure and humidity.
    """
    if self._bme280:
        import bmp_sensors as Sensors
        self._bme280.SetMode(Sensors.BME280_Mode.FORCED)
        return self._bme280.Measure()

    obj = lambda: None  # noqa: E731
    obj.temperature = 10.21
    obj.pressure = 983.72
    obj.humidity = 62.36
    return obj

(Source)