Real-time Data Streaming
Real-time Data Streaming API
Section titled “Real-time Data Streaming API”High-frequency endpoint for transmitting biomonitoring data at 1Hz from BCGMCU devices.
Endpoint
Section titled “Endpoint”POST /api/v1/data/streamPurpose
Section titled “Purpose”- Stream real-time biomonitoring measurements
- Maintain 1Hz data transmission rate
- Provide low-latency vital signs monitoring
- Support continuous patient observation
Request
Section titled “Request”Headers
Section titled “Headers”Content-Type: application/jsonAuthorization: Bearer {device_token}Request Body
Section titled “Request Body”{ "device_id": "BCGMCU_001", "timestamp": "2025-01-27T10:30:15.123Z", "sequence_number": 12346, "data_type": "realtime_bcg", "bcg_data": { "pulse_rate": 72, "respiratory_rate": 16, "breathing_duration": 3.75, "presence_detected": true, "signal_quality": "good", "raw_accelerometer": { "x": 0.012, "y": -0.003, "z": 0.998 } }, "system_data": { "cpu_temperature": 42.5, "wifi_rssi": -45, "processing_latency": 15 }}Field Descriptions
Section titled “Field Descriptions”Root Fields
Section titled “Root Fields”| Field | Type | Required | Description |
|---|---|---|---|
| device_id | string | Yes | Unique device identifier |
| timestamp | string | Yes | UTC ISO8601 with milliseconds |
| sequence_number | integer | Yes | Incrementing sequence for gap detection |
| data_type | string | Yes | Type of data (“realtime_bcg”) |
| bcg_data | object | Yes | Biomonitoring measurements |
| system_data | object | No | Device system metrics |
BCG Data Object
Section titled “BCG Data Object”| Field | Type | Required | Description |
|---|---|---|---|
| pulse_rate | integer | Yes | Heart rate in BPM (30-200) |
| respiratory_rate | integer | Yes | Breathing rate per minute (5-40) |
| breathing_duration | float | Yes | Breath cycle duration in seconds |
| presence_detected | boolean | Yes | Subject presence on sensor |
| signal_quality | string | Yes | ”excellent”, “good”, “fair”, “poor” |
| raw_accelerometer | object | No | Raw 3-axis accelerometer data |
System Data Object
Section titled “System Data Object”| Field | Type | Required | Description |
|---|---|---|---|
| cpu_temperature | float | No | CPU temperature in Celsius |
| wifi_rssi | integer | No | WiFi signal strength in dBm |
| processing_latency | integer | No | Processing time in milliseconds |
Response
Section titled “Response”Success Response (200 OK)
Section titled “Success Response (200 OK)”{ "status": "received", "server_timestamp": "2025-01-27T10:30:15.150Z", "sequence_ack": 12346, "data_quality": "valid", "storage_confirmed": true}Response Fields
Section titled “Response Fields”| Field | Type | Description |
|---|---|---|
| status | string | ”received” or “error” |
| server_timestamp | string | Server reception timestamp |
| sequence_ack | integer | Acknowledged sequence number |
| data_quality | string | ”valid”, “suspicious”, or “invalid” |
| storage_confirmed | boolean | Data successfully stored |
Performance Requirements
Section titled “Performance Requirements”- Transmission Rate: 1Hz (1 sample per second)
- Latency Target: <200ms end-to-end
- Packet Size: ~500 bytes JSON
- Bandwidth: ~4 Kbps per device
- Concurrent Streams: 100+ devices
Implementation Example
Section titled “Implementation Example”ESP32 Real-time Streaming
Section titled “ESP32 Real-time Streaming”#include <HTTPClient.h>#include <ArduinoJson.h>#include <BCGMCU.h>
class DataStreamer {private: HTTPClient https; BCGMCU bcgmcu; uint32_t sequenceNumber = 0; unsigned long lastTransmit = 0; const int TRANSMIT_INTERVAL = 1000; // 1Hz
public: void initialize() { bcgmcu.begin(); bcgmcu.setMode(BCGMCU_MODE_REALTIME); bcgmcu.setSampleRate(10); // Internal sampling at 10Hz }
void streamData() { // Maintain 1Hz transmission rate unsigned long now = millis(); if (now - lastTransmit < TRANSMIT_INTERVAL) { return; } lastTransmit = now;
// Get BCGMCU measurements BCGMCU::Measurements data = bcgmcu.getMeasurements();
// Only transmit if valid data if (!data.valid || !data.presence_detected) { return; }
// Build JSON payload StaticJsonDocument<512> doc;
doc["device_id"] = getDeviceId(); doc["timestamp"] = getISO8601Timestamp(); doc["sequence_number"] = ++sequenceNumber; doc["data_type"] = "realtime_bcg";
// BCG measurements JsonObject bcg = doc.createNestedObject("bcg_data"); bcg["pulse_rate"] = data.pulse_rate; bcg["respiratory_rate"] = data.respiratory_rate; bcg["breathing_duration"] = data.breathing_duration; bcg["presence_detected"] = data.presence_detected; bcg["signal_quality"] = getQualityString(data.signal_quality);
// Raw accelerometer (optional) if (includeRawData) { JsonObject accel = bcg.createNestedObject("raw_accelerometer"); accel["x"] = data.accel_x; accel["y"] = data.accel_y; accel["z"] = data.accel_z; }
// System metrics JsonObject system = doc.createNestedObject("system_data"); system["cpu_temperature"] = temperatureRead(); system["wifi_rssi"] = WiFi.RSSI(); system["processing_latency"] = data.processing_time_ms;
// Serialize and send String payload; serializeJson(doc, payload);
sendStreamData(payload); }
private: void sendStreamData(String& payload) { https.begin(API_BASE_URL + "/data/stream"); https.addHeader("Content-Type", "application/json"); https.addHeader("Authorization", "Bearer " + deviceToken);
// Use timeout appropriate for real-time https.setTimeout(500); // 500ms timeout
int httpCode = https.POST(payload);
if (httpCode == 200) { processStreamResponse(https.getString()); } else { handleStreamError(httpCode); }
https.end(); }
String getQualityString(int quality) { if (quality > 90) return "excellent"; if (quality > 70) return "good"; if (quality > 50) return "fair"; return "poor"; }};Advanced Streaming Features
Section titled “Advanced Streaming Features”class AdvancedStreamer {private: // Circular buffer for reliability struct DataPoint { uint32_t sequence; String payload; bool acknowledged; unsigned long timestamp; };
static const int BUFFER_SIZE = 10; DataPoint buffer[BUFFER_SIZE]; int bufferIndex = 0;
// Statistics tracking struct StreamStats { uint32_t samples_sent = 0; uint32_t samples_acked = 0; uint32_t transmission_errors = 0; float average_latency = 0; unsigned long last_success = 0; } stats;
public: void streamWithBuffer() { // Collect new data DataPoint& point = buffer[bufferIndex]; point.sequence = getNextSequence(); point.payload = buildPayload(); point.acknowledged = false; point.timestamp = millis();
// Try to send current and any unacknowledged sendBufferedData();
// Move to next buffer position bufferIndex = (bufferIndex + 1) % BUFFER_SIZE;
// Update statistics updateStatistics(); }
private: void sendBufferedData() { // Send current data point if (sendDataPoint(buffer[bufferIndex])) { buffer[bufferIndex].acknowledged = true; stats.samples_acked++; stats.last_success = millis(); }
// Retry any unacknowledged points for (int i = 0; i < BUFFER_SIZE; i++) { if (!buffer[i].acknowledged && buffer[i].sequence > 0 && millis() - buffer[i].timestamp < 5000) {
if (sendDataPoint(buffer[i])) { buffer[i].acknowledged = true; stats.samples_acked++; } } } }
void updateStatistics() { // Calculate average latency unsigned long totalLatency = 0; int count = 0;
for (int i = 0; i < BUFFER_SIZE; i++) { if (buffer[i].acknowledged) { totalLatency += (buffer[i].timestamp - stats.last_success); count++; } }
if (count > 0) { stats.average_latency = totalLatency / count; }
// Log statistics periodically if (stats.samples_sent % 100 == 0) { Serial.printf("Stream Stats: Sent=%d, Acked=%d, Errors=%d, Latency=%.1fms\n", stats.samples_sent, stats.samples_acked, stats.transmission_errors, stats.average_latency); } }};Signal Quality Assessment
Section titled “Signal Quality Assessment”class SignalQualityAnalyzer {public: String assessQuality(BCGMCU::Measurements& data) { int score = 100;
// Check signal amplitude if (data.signal_amplitude < MIN_AMPLITUDE) { score -= 30; }
// Check noise level if (data.noise_level > MAX_NOISE) { score -= 20; }
// Check consistency if (abs(data.pulse_rate - lastPulseRate) > 20) { score -= 10; }
// Check bounds if (data.pulse_rate < 30 || data.pulse_rate > 200) { score -= 20; } if (data.respiratory_rate < 5 || data.respiratory_rate > 40) { score -= 20; }
// Map score to quality level if (score > 90) return "excellent"; if (score > 70) return "good"; if (score > 50) return "fair"; return "poor"; }
private: int lastPulseRate = 70; const float MIN_AMPLITUDE = 0.01; const float MAX_NOISE = 0.05;};Data Validation
Section titled “Data Validation”Server-side validation rules:
| Check | Valid Range | Action on Invalid |
|---|---|---|
| Pulse Rate | 30-200 BPM | Flag as suspicious |
| Respiratory Rate | 5-40 per minute | Flag as suspicious |
| Breathing Duration | 1.5-12 seconds | Flag as suspicious |
| Sequence Gap | < 10 missing | Request retransmission |
| Timestamp Drift | < 5 seconds | Adjust and log |
| Signal Quality | fair or better | Log for analysis |
Best Practices
Section titled “Best Practices”- Consistent Timing: Use hardware timer for exact 1Hz transmission
- Buffer Management: Keep 5-10 samples buffered for retransmission
- Quality Filtering: Only send data with adequate signal quality
- Sequence Tracking: Detect and report gaps in transmission
- Adaptive Behavior: Reduce rate if network congested
- Compression: Consider binary protocol for bandwidth optimization
- Connection Pooling: Reuse HTTPS connections when possible
Network Optimization
Section titled “Network Optimization”// Connection reuse for efficiencyclass StreamConnection {private: WiFiClientSecure client; bool connected = false;
public: void maintainConnection() { if (!client.connected()) { client.connect(API_HOST, 443); connected = true; } }
bool sendData(String& payload) { maintainConnection();
// Build HTTP request String request = "POST " + STREAM_ENDPOINT + " HTTP/1.1\r\n"; request += "Host: " + API_HOST + "\r\n"; request += "Authorization: Bearer " + deviceToken + "\r\n"; request += "Content-Type: application/json\r\n"; request += "Content-Length: " + String(payload.length()) + "\r\n"; request += "Connection: keep-alive\r\n\r\n"; request += payload;
client.print(request);
// Read response return readResponse(); }};