Virtual Device: writing data
View the source code for this file.
The virtual device emulates a real device, generating examples data so that we can use IoT Center without needing a real device. It writes measurements for every minute in the last 30 days. It can also generate and send temperature, humidity, pressure, CO2, TVOC, latitude, and longitude measurements.
The “device” is a piece of code that runs in the browser. It uses the influxdb-client-js library. It gets the configuration of how to communicate with InfluxDB (URL, organization, bucket, token) from IoT center. It behaves the same way as a any other device. It receives the last the time of the reported measurement, so that only missing measurements are written to InfluxDB.
The code for the virtual device page contains some similar code to the dashboard and device registration page. We do some imports. Again we define a device configuration and fetch that configuration.
fetchDeviceData
The following code contains Flux source code.
async function fetchDeviceData(config: DeviceConfig): Promise<DeviceData> {
const {
// influx_url: url, // use '/influx' proxy to avoid problem with InfluxDB v2 Beta (Docker)
influx_token: token,
influx_org: org,
influx_bucket: bucket,
id,
} = config
const influxDB = new InfluxDB({url: '/influx', token})
const queryApi = influxDB.getQueryApi(org)
const measurements = await queryApi.collectRows<any>(flux`
import "math"
from(bucket: ${bucket})
|> range(start: -30d)
|> filter(fn: (r) => r._measurement == "environment")
|> filter(fn: (r) => r.clientId == ${id})
|> toFloat()
|> group(columns: ["_field"])
|> reduce(
fn: (r, accumulator) => ({
maxTime: (if r._time>accumulator.maxTime then r._time else accumulator.maxTime),
maxValue: (if r._value>accumulator.maxValue then r._value else accumulator.maxValue),
minValue: (if r._value<accumulator.minValue then r._value else accumulator.minValue),
count: accumulator.count + 1.0
}),
identity: {maxTime: 1970-01-01, count: 0.0, minValue: math.mInf(sign: 1), maxValue: math.mInf(sign: -1)}
)`)
return {config, measurements}
}
math
module.
toFloat()
,
group()
,
reduce()
,
accumulator()
.
import "math"
from(bucket: ${bucket})
|> range(start: -30d)
|> filter(fn: (r) => r._measurement == "environment")
|> filter(fn: (r) => r.clientId == ${id})
|> toFloat()
|> group(columns: ["_field"])
|> reduce(
fn: (r, accumulator) => ({
maxTime: (if r._time>accumulator.maxTime then r._time else accumulator.maxTime),
maxValue: (if r._value>accumulator.maxValue then r._value else accumulator.maxValue),
minValue: (if r._value<accumulator.minValue then r._value else accumulator.minValue),
count: accumulator.count + 1.0
}),
identity: {maxTime: 1970-01-01, count: 0.0, minValue: math.mInf(sign: 1), maxValue: math.mInf(sign: -1)}
)
writeEmulatedData
When you click the Write New Data button, the writeEmulatedData
function is called.
This function informs about the count/percentage of written measurements,
so that the UI can display a progress bar and a count of written measurement points.
async function writeEmulatedData(
state: DeviceData,
onProgress: ProgressFn
): Promise<number> {
const {
// influx_url: url, // use '/influx' proxy to avoid problems with InfluxDB v2 Beta (Docker)
influx_token: token,
influx_org: org,
influx_bucket: bucket,
id,
} = state.config
// calculate window to emulate writes
const toTime = Math.trunc(Date.now() / 60_000) * 60_000
let lastTime = state.measurements[0]?.maxTime
? Math.trunc(Date.parse(state.measurements[0].maxTime) / 60_000) * 60_000
: 0
if (lastTime < toTime - 30 * 24 * 60 * 60 * 1000) {
lastTime = toTime - 30 * 24 * 60 * 60 * 1000
}
const totalPoints = Math.trunc((toTime - lastTime) / 60_000)
let pointsWritten = 0
if (totalPoints > 0) {
const batchSize = 2000
const influxDB = new InfluxDB({url: '/influx', token})
const writeApi = influxDB.getWriteApi(org, bucket, 'ms', {
batchSize: batchSize + 1,
defaultTags: {clientId: id},
})
try {
// write random temperatures
const point = new Point('environment') // reuse the same point to spare memory
onProgress(0, 0, totalPoints)
while (lastTime < toTime) {
lastTime += 60_000 // emulate next minute
point
.floatField('Temperature', generateTemperature(lastTime))
.floatField('Humidity', generateHumidity(lastTime))
.floatField('Pressure', generatePressure(lastTime))
.intField('CO2', generateCO2(lastTime))
.intField('TVOC', generateTVOC(lastTime))
.floatField('Lat', state.config.default_lat || 50.0873254)
.floatField('Lon', state.config.default_lon || 14.4071543)
.timestamp(lastTime)
writeApi.writePoint(point)
pointsWritten++
if (pointsWritten % batchSize === 0) {
await writeApi.flush()
onProgress(
(pointsWritten / totalPoints) * 100,
pointsWritten,
totalPoints
)
}
}
await writeApi.flush()
} finally {
await writeApi.close()
}
onProgress(100, pointsWritten, totalPoints)
}
return pointsWritten
}