Virtual Device: writing data

View the source code for this file.

The virtual device emulates a real device, generating examples data so that we can use IoT Center without needing a real device. It writes measurements for every minute in the last 30 days. It can also generate and send temperature, humidity, pressure, CO2, TVOC, latitude, and longitude measurements.

The “device” is a piece of code that runs in the browser. It uses the influxdb-client-js library. It gets the configuration of how to communicate with InfluxDB (URL, organization, bucket, token) from IoT center. It behaves the same way as a any other device. It receives the last the time of the reported measurement, so that only missing measurements are written to InfluxDB.

The code for the virtual device page contains some similar code to the dashboard and device registration page. We do some imports. Again we define a device configuration and fetch that configuration.

fetchDeviceData

The following code contains Flux source code.

async function fetchDeviceData(config: DeviceConfig): Promise<DeviceData> {
  const {
    // influx_url: url, // use '/influx' proxy to avoid problem with InfluxDB v2 Beta (Docker)
    influx_token: token,
    influx_org: org,
    influx_bucket: bucket,
    id,
  } = config
  const influxDB = new InfluxDB({url: '/influx', token})
  const queryApi = influxDB.getQueryApi(org)
  const measurements = await queryApi.collectRows<any>(flux`
  import "math"
from(bucket: ${bucket})
  |> range(start: -30d)
  |> filter(fn: (r) => r._measurement == "environment")
  |> filter(fn: (r) => r.clientId == ${id})
  |> toFloat()
  |> group(columns: ["_field"])
  |> reduce(
      fn: (r, accumulator) => ({
        maxTime: (if r._time>accumulator.maxTime then r._time else accumulator.maxTime),
        maxValue: (if r._value>accumulator.maxValue then r._value else accumulator.maxValue),
        minValue: (if r._value<accumulator.minValue then r._value else accumulator.minValue),
        count: accumulator.count + 1.0
      }),
      identity: {maxTime: 1970-01-01, count: 0.0, minValue: math.mInf(sign: 1), maxValue: math.mInf(sign: -1)}
  )`)
  return {config, measurements}
}

(Source)

math module.

toFloat(), group(), reduce(), accumulator().

import "math"
from(bucket: ${bucket})
  |> range(start: -30d)
  |> filter(fn: (r) => r._measurement == "environment")
  |> filter(fn: (r) => r.clientId == ${id})
  |> toFloat()
  |> group(columns: ["_field"])
  |> reduce(
      fn: (r, accumulator) => ({
        maxTime: (if r._time>accumulator.maxTime then r._time else accumulator.maxTime),
        maxValue: (if r._value>accumulator.maxValue then r._value else accumulator.maxValue),
        minValue: (if r._value<accumulator.minValue then r._value else accumulator.minValue),
        count: accumulator.count + 1.0
      }),
      identity: {maxTime: 1970-01-01, count: 0.0, minValue: math.mInf(sign: 1), maxValue: math.mInf(sign: -1)}
  )

writeEmulatedData

When you click the Write New Data button, the writeEmulatedData function is called. This function informs about the count/percentage of written measurements, so that the UI can display a progress bar and a count of written measurement points.

async function writeEmulatedData(
  state: DeviceData,
  onProgress: ProgressFn
): Promise<number> {
  const {
    // influx_url: url, // use '/influx' proxy to avoid problems with InfluxDB v2 Beta (Docker)
    influx_token: token,
    influx_org: org,
    influx_bucket: bucket,
    id,
  } = state.config
  // calculate window to emulate writes
  const toTime = Math.trunc(Date.now() / 60_000) * 60_000
  let lastTime = state.measurements[0]?.maxTime
    ? Math.trunc(Date.parse(state.measurements[0].maxTime) / 60_000) * 60_000
    : 0
  if (lastTime < toTime - 30 * 24 * 60 * 60 * 1000) {
    lastTime = toTime - 30 * 24 * 60 * 60 * 1000
  }
  const totalPoints = Math.trunc((toTime - lastTime) / 60_000)
  let pointsWritten = 0
  if (totalPoints > 0) {
    const batchSize = 2000
    const influxDB = new InfluxDB({url: '/influx', token})
    const writeApi = influxDB.getWriteApi(org, bucket, 'ms', {
      batchSize: batchSize + 1,
      defaultTags: {clientId: id},
    })
    try {
      // write random temperatures
      const point = new Point('environment') // reuse the same point to spare memory
      onProgress(0, 0, totalPoints)
      while (lastTime < toTime) {
        lastTime += 60_000 // emulate next minute
        point
          .floatField('Temperature', generateTemperature(lastTime))
          .floatField('Humidity', generateHumidity(lastTime))
          .floatField('Pressure', generatePressure(lastTime))
          .intField('CO2', generateCO2(lastTime))
          .intField('TVOC', generateTVOC(lastTime))
          .floatField('Lat', state.config.default_lat || 50.0873254)
          .floatField('Lon', state.config.default_lon || 14.4071543)
          .timestamp(lastTime)
        writeApi.writePoint(point)

        pointsWritten++
        if (pointsWritten % batchSize === 0) {
          await writeApi.flush()
          onProgress(
            (pointsWritten / totalPoints) * 100,
            pointsWritten,
            totalPoints
          )
        }
      }
      await writeApi.flush()
    } finally {
      await writeApi.close()
    }
    onProgress(100, pointsWritten, totalPoints)
  }

  return pointsWritten
}

(Source)