Skip to content

Real-time Data Streaming

High-frequency endpoint for transmitting biomonitoring data at 1Hz from BCGMCU devices.

POST /api/v1/data/stream
  • Stream real-time biomonitoring measurements
  • Maintain 1Hz data transmission rate
  • Provide low-latency vital signs monitoring
  • Support continuous patient observation
Content-Type: application/json
Authorization: Bearer {device_token}
{
"device_id": "BCGMCU_001",
"timestamp": "2025-01-27T10:30:15.123Z",
"sequence_number": 12346,
"data_type": "realtime_bcg",
"bcg_data": {
"pulse_rate": 72,
"respiratory_rate": 16,
"breathing_duration": 3.75,
"presence_detected": true,
"signal_quality": "good",
"raw_accelerometer": {
"x": 0.012,
"y": -0.003,
"z": 0.998
}
},
"system_data": {
"cpu_temperature": 42.5,
"wifi_rssi": -45,
"processing_latency": 15
}
}
FieldTypeRequiredDescription
device_idstringYesUnique device identifier
timestampstringYesUTC ISO8601 with milliseconds
sequence_numberintegerYesIncrementing sequence for gap detection
data_typestringYesType of data (“realtime_bcg”)
bcg_dataobjectYesBiomonitoring measurements
system_dataobjectNoDevice system metrics
FieldTypeRequiredDescription
pulse_rateintegerYesHeart rate in BPM (30-200)
respiratory_rateintegerYesBreathing rate per minute (5-40)
breathing_durationfloatYesBreath cycle duration in seconds
presence_detectedbooleanYesSubject presence on sensor
signal_qualitystringYes”excellent”, “good”, “fair”, “poor”
raw_accelerometerobjectNoRaw 3-axis accelerometer data
FieldTypeRequiredDescription
cpu_temperaturefloatNoCPU temperature in Celsius
wifi_rssiintegerNoWiFi signal strength in dBm
processing_latencyintegerNoProcessing time in milliseconds
{
"status": "received",
"server_timestamp": "2025-01-27T10:30:15.150Z",
"sequence_ack": 12346,
"data_quality": "valid",
"storage_confirmed": true
}
FieldTypeDescription
statusstring”received” or “error”
server_timestampstringServer reception timestamp
sequence_ackintegerAcknowledged sequence number
data_qualitystring”valid”, “suspicious”, or “invalid”
storage_confirmedbooleanData successfully stored
  • Transmission Rate: 1Hz (1 sample per second)
  • Latency Target: <200ms end-to-end
  • Packet Size: ~500 bytes JSON
  • Bandwidth: ~4 Kbps per device
  • Concurrent Streams: 100+ devices
#include <HTTPClient.h>
#include <ArduinoJson.h>
#include <BCGMCU.h>
class DataStreamer {
private:
HTTPClient https;
BCGMCU bcgmcu;
uint32_t sequenceNumber = 0;
unsigned long lastTransmit = 0;
const int TRANSMIT_INTERVAL = 1000; // 1Hz
public:
void initialize() {
bcgmcu.begin();
bcgmcu.setMode(BCGMCU_MODE_REALTIME);
bcgmcu.setSampleRate(10); // Internal sampling at 10Hz
}
void streamData() {
// Maintain 1Hz transmission rate
unsigned long now = millis();
if (now - lastTransmit < TRANSMIT_INTERVAL) {
return;
}
lastTransmit = now;
// Get BCGMCU measurements
BCGMCU::Measurements data = bcgmcu.getMeasurements();
// Only transmit if valid data
if (!data.valid || !data.presence_detected) {
return;
}
// Build JSON payload
StaticJsonDocument<512> doc;
doc["device_id"] = getDeviceId();
doc["timestamp"] = getISO8601Timestamp();
doc["sequence_number"] = ++sequenceNumber;
doc["data_type"] = "realtime_bcg";
// BCG measurements
JsonObject bcg = doc.createNestedObject("bcg_data");
bcg["pulse_rate"] = data.pulse_rate;
bcg["respiratory_rate"] = data.respiratory_rate;
bcg["breathing_duration"] = data.breathing_duration;
bcg["presence_detected"] = data.presence_detected;
bcg["signal_quality"] = getQualityString(data.signal_quality);
// Raw accelerometer (optional)
if (includeRawData) {
JsonObject accel = bcg.createNestedObject("raw_accelerometer");
accel["x"] = data.accel_x;
accel["y"] = data.accel_y;
accel["z"] = data.accel_z;
}
// System metrics
JsonObject system = doc.createNestedObject("system_data");
system["cpu_temperature"] = temperatureRead();
system["wifi_rssi"] = WiFi.RSSI();
system["processing_latency"] = data.processing_time_ms;
// Serialize and send
String payload;
serializeJson(doc, payload);
sendStreamData(payload);
}
private:
void sendStreamData(String& payload) {
https.begin(API_BASE_URL + "/data/stream");
https.addHeader("Content-Type", "application/json");
https.addHeader("Authorization", "Bearer " + deviceToken);
// Use timeout appropriate for real-time
https.setTimeout(500); // 500ms timeout
int httpCode = https.POST(payload);
if (httpCode == 200) {
processStreamResponse(https.getString());
} else {
handleStreamError(httpCode);
}
https.end();
}
String getQualityString(int quality) {
if (quality > 90) return "excellent";
if (quality > 70) return "good";
if (quality > 50) return "fair";
return "poor";
}
};
class AdvancedStreamer {
private:
// Circular buffer for reliability
struct DataPoint {
uint32_t sequence;
String payload;
bool acknowledged;
unsigned long timestamp;
};
static const int BUFFER_SIZE = 10;
DataPoint buffer[BUFFER_SIZE];
int bufferIndex = 0;
// Statistics tracking
struct StreamStats {
uint32_t samples_sent = 0;
uint32_t samples_acked = 0;
uint32_t transmission_errors = 0;
float average_latency = 0;
unsigned long last_success = 0;
} stats;
public:
void streamWithBuffer() {
// Collect new data
DataPoint& point = buffer[bufferIndex];
point.sequence = getNextSequence();
point.payload = buildPayload();
point.acknowledged = false;
point.timestamp = millis();
// Try to send current and any unacknowledged
sendBufferedData();
// Move to next buffer position
bufferIndex = (bufferIndex + 1) % BUFFER_SIZE;
// Update statistics
updateStatistics();
}
private:
void sendBufferedData() {
// Send current data point
if (sendDataPoint(buffer[bufferIndex])) {
buffer[bufferIndex].acknowledged = true;
stats.samples_acked++;
stats.last_success = millis();
}
// Retry any unacknowledged points
for (int i = 0; i < BUFFER_SIZE; i++) {
if (!buffer[i].acknowledged &&
buffer[i].sequence > 0 &&
millis() - buffer[i].timestamp < 5000) {
if (sendDataPoint(buffer[i])) {
buffer[i].acknowledged = true;
stats.samples_acked++;
}
}
}
}
void updateStatistics() {
// Calculate average latency
unsigned long totalLatency = 0;
int count = 0;
for (int i = 0; i < BUFFER_SIZE; i++) {
if (buffer[i].acknowledged) {
totalLatency += (buffer[i].timestamp - stats.last_success);
count++;
}
}
if (count > 0) {
stats.average_latency = totalLatency / count;
}
// Log statistics periodically
if (stats.samples_sent % 100 == 0) {
Serial.printf("Stream Stats: Sent=%d, Acked=%d, Errors=%d, Latency=%.1fms\n",
stats.samples_sent, stats.samples_acked,
stats.transmission_errors, stats.average_latency);
}
}
};
class SignalQualityAnalyzer {
public:
String assessQuality(BCGMCU::Measurements& data) {
int score = 100;
// Check signal amplitude
if (data.signal_amplitude < MIN_AMPLITUDE) {
score -= 30;
}
// Check noise level
if (data.noise_level > MAX_NOISE) {
score -= 20;
}
// Check consistency
if (abs(data.pulse_rate - lastPulseRate) > 20) {
score -= 10;
}
// Check bounds
if (data.pulse_rate < 30 || data.pulse_rate > 200) {
score -= 20;
}
if (data.respiratory_rate < 5 || data.respiratory_rate > 40) {
score -= 20;
}
// Map score to quality level
if (score > 90) return "excellent";
if (score > 70) return "good";
if (score > 50) return "fair";
return "poor";
}
private:
int lastPulseRate = 70;
const float MIN_AMPLITUDE = 0.01;
const float MAX_NOISE = 0.05;
};

Server-side validation rules:

CheckValid RangeAction on Invalid
Pulse Rate30-200 BPMFlag as suspicious
Respiratory Rate5-40 per minuteFlag as suspicious
Breathing Duration1.5-12 secondsFlag as suspicious
Sequence Gap< 10 missingRequest retransmission
Timestamp Drift< 5 secondsAdjust and log
Signal Qualityfair or betterLog for analysis
  1. Consistent Timing: Use hardware timer for exact 1Hz transmission
  2. Buffer Management: Keep 5-10 samples buffered for retransmission
  3. Quality Filtering: Only send data with adequate signal quality
  4. Sequence Tracking: Detect and report gaps in transmission
  5. Adaptive Behavior: Reduce rate if network congested
  6. Compression: Consider binary protocol for bandwidth optimization
  7. Connection Pooling: Reuse HTTPS connections when possible
// Connection reuse for efficiency
class StreamConnection {
private:
WiFiClientSecure client;
bool connected = false;
public:
void maintainConnection() {
if (!client.connected()) {
client.connect(API_HOST, 443);
connected = true;
}
}
bool sendData(String& payload) {
maintainConnection();
// Build HTTP request
String request = "POST " + STREAM_ENDPOINT + " HTTP/1.1\r\n";
request += "Host: " + API_HOST + "\r\n";
request += "Authorization: Bearer " + deviceToken + "\r\n";
request += "Content-Type: application/json\r\n";
request += "Content-Length: " + String(payload.length()) + "\r\n";
request += "Connection: keep-alive\r\n\r\n";
request += payload;
client.print(request);
// Read response
return readResponse();
}
};