ground-zero / src /iot /sensor_bridge.py
jefffffff9
Initial commit: Sahel-Agri Voice AI
76db545
"""
Fetches sensor data (soil moisture, weather, irrigation) from the IoT backend API.
Falls back to synthetic mock data when SENSOR_API_URL is not configured.
"""
from __future__ import annotations
import logging
import random
from dataclasses import dataclass, field
from datetime import datetime
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from src.iot.intent_parser import Intent
logger = logging.getLogger(__name__)
@dataclass
class SensorData:
sensor_type: str
values: dict[str, float]
timestamp: str
unit: str = ""
class SensorBridge:
"""Async bridge to IoT sensor API. Uses mock data when no API URL is configured."""
def __init__(self, sensor_api_url: str | None = None, timeout_s: float = 5.0) -> None:
self.sensor_api_url = sensor_api_url
self.timeout_s = timeout_s
self._mock_mode = not sensor_api_url
if self._mock_mode:
logger.info("SensorBridge: running in MOCK mode (set SENSOR_API_URL to use real sensors).")
async def fetch(self, intent: "Intent", field_id: str | None = None) -> SensorData:
"""Dispatch to the correct sensor fetch method based on intent entity."""
action = intent.action
if action == "check_soil":
return await self.get_soil_data(field_id or "default")
elif action == "check_weather":
return await self.get_weather(field_id or "default")
elif action == "irrigation_status":
return await self.get_irrigation(field_id or "default")
elif action == "pest_alert":
return await self.get_pest_status(field_id or "default")
else:
return SensorData(
sensor_type="unknown",
values={},
timestamp=datetime.utcnow().isoformat(),
)
async def get_soil_data(self, location_id: str) -> SensorData:
if self._mock_mode:
return SensorData(
sensor_type="soil",
values={
"moisture_pct": round(random.uniform(25, 65), 1),
"ph": round(random.uniform(5.5, 7.5), 1),
"nitrogen_ppm": round(random.uniform(10, 40), 1),
"temperature_c": round(random.uniform(24, 35), 1),
},
timestamp=datetime.utcnow().isoformat(),
)
return await self._get(f"/sensors/soil/{location_id}", "soil")
async def get_weather(self, location_id: str) -> SensorData:
if self._mock_mode:
return SensorData(
sensor_type="weather",
values={
"temperature_c": round(random.uniform(28, 42), 1),
"humidity_pct": round(random.uniform(20, 80), 1),
"wind_speed_kmh": round(random.uniform(0, 25), 1),
"rain_probability_pct": round(random.uniform(0, 100), 1),
},
timestamp=datetime.utcnow().isoformat(),
)
return await self._get(f"/sensors/weather/{location_id}", "weather")
async def get_irrigation(self, field_id: str) -> SensorData:
if self._mock_mode:
return SensorData(
sensor_type="irrigation",
values={
"flow_rate_lph": round(random.uniform(0, 500), 1),
"pressure_bar": round(random.uniform(1.0, 4.0), 2),
"active": float(random.choice([0, 1])),
"last_irrigation_h_ago": round(random.uniform(1, 48), 1),
},
timestamp=datetime.utcnow().isoformat(),
)
return await self._get(f"/sensors/irrigation/{field_id}", "irrigation")
async def get_pest_status(self, field_id: str) -> SensorData:
if self._mock_mode:
return SensorData(
sensor_type="pest",
values={
"trap_count_24h": float(random.randint(0, 50)),
"alert_level": float(random.randint(0, 3)), # 0=none 1=low 2=medium 3=high
},
timestamp=datetime.utcnow().isoformat(),
)
return await self._get(f"/sensors/pest/{field_id}", "pest")
async def _get(self, path: str, sensor_type: str) -> SensorData:
import httpx
url = f"{self.sensor_api_url}{path}"
async with httpx.AsyncClient(timeout=self.timeout_s) as client:
response = await client.get(url)
response.raise_for_status()
data = response.json()
return SensorData(
sensor_type=sensor_type,
values=data.get("values", data),
timestamp=data.get("timestamp", datetime.utcnow().isoformat()),
)