Spaces:
Running
Running
| """ | |
| Fetches sensor data (soil moisture, weather, irrigation) from the IoT backend API. | |
| Falls back to synthetic mock data when SENSOR_API_URL is not configured. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import random | |
| from dataclasses import dataclass, field | |
| from datetime import datetime | |
| from typing import TYPE_CHECKING | |
| if TYPE_CHECKING: | |
| from src.iot.intent_parser import Intent | |
| logger = logging.getLogger(__name__) | |
| class SensorData: | |
| sensor_type: str | |
| values: dict[str, float] | |
| timestamp: str | |
| unit: str = "" | |
| class SensorBridge: | |
| """Async bridge to IoT sensor API. Uses mock data when no API URL is configured.""" | |
| def __init__(self, sensor_api_url: str | None = None, timeout_s: float = 5.0) -> None: | |
| self.sensor_api_url = sensor_api_url | |
| self.timeout_s = timeout_s | |
| self._mock_mode = not sensor_api_url | |
| if self._mock_mode: | |
| logger.info("SensorBridge: running in MOCK mode (set SENSOR_API_URL to use real sensors).") | |
| async def fetch(self, intent: "Intent", field_id: str | None = None) -> SensorData: | |
| """Dispatch to the correct sensor fetch method based on intent entity.""" | |
| action = intent.action | |
| if action == "check_soil": | |
| return await self.get_soil_data(field_id or "default") | |
| elif action == "check_weather": | |
| return await self.get_weather(field_id or "default") | |
| elif action == "irrigation_status": | |
| return await self.get_irrigation(field_id or "default") | |
| elif action == "pest_alert": | |
| return await self.get_pest_status(field_id or "default") | |
| else: | |
| return SensorData( | |
| sensor_type="unknown", | |
| values={}, | |
| timestamp=datetime.utcnow().isoformat(), | |
| ) | |
| async def get_soil_data(self, location_id: str) -> SensorData: | |
| if self._mock_mode: | |
| return SensorData( | |
| sensor_type="soil", | |
| values={ | |
| "moisture_pct": round(random.uniform(25, 65), 1), | |
| "ph": round(random.uniform(5.5, 7.5), 1), | |
| "nitrogen_ppm": round(random.uniform(10, 40), 1), | |
| "temperature_c": round(random.uniform(24, 35), 1), | |
| }, | |
| timestamp=datetime.utcnow().isoformat(), | |
| ) | |
| return await self._get(f"/sensors/soil/{location_id}", "soil") | |
| async def get_weather(self, location_id: str) -> SensorData: | |
| if self._mock_mode: | |
| return SensorData( | |
| sensor_type="weather", | |
| values={ | |
| "temperature_c": round(random.uniform(28, 42), 1), | |
| "humidity_pct": round(random.uniform(20, 80), 1), | |
| "wind_speed_kmh": round(random.uniform(0, 25), 1), | |
| "rain_probability_pct": round(random.uniform(0, 100), 1), | |
| }, | |
| timestamp=datetime.utcnow().isoformat(), | |
| ) | |
| return await self._get(f"/sensors/weather/{location_id}", "weather") | |
| async def get_irrigation(self, field_id: str) -> SensorData: | |
| if self._mock_mode: | |
| return SensorData( | |
| sensor_type="irrigation", | |
| values={ | |
| "flow_rate_lph": round(random.uniform(0, 500), 1), | |
| "pressure_bar": round(random.uniform(1.0, 4.0), 2), | |
| "active": float(random.choice([0, 1])), | |
| "last_irrigation_h_ago": round(random.uniform(1, 48), 1), | |
| }, | |
| timestamp=datetime.utcnow().isoformat(), | |
| ) | |
| return await self._get(f"/sensors/irrigation/{field_id}", "irrigation") | |
| async def get_pest_status(self, field_id: str) -> SensorData: | |
| if self._mock_mode: | |
| return SensorData( | |
| sensor_type="pest", | |
| values={ | |
| "trap_count_24h": float(random.randint(0, 50)), | |
| "alert_level": float(random.randint(0, 3)), # 0=none 1=low 2=medium 3=high | |
| }, | |
| timestamp=datetime.utcnow().isoformat(), | |
| ) | |
| return await self._get(f"/sensors/pest/{field_id}", "pest") | |
| async def _get(self, path: str, sensor_type: str) -> SensorData: | |
| import httpx | |
| url = f"{self.sensor_api_url}{path}" | |
| async with httpx.AsyncClient(timeout=self.timeout_s) as client: | |
| response = await client.get(url) | |
| response.raise_for_status() | |
| data = response.json() | |
| return SensorData( | |
| sensor_type=sensor_type, | |
| values=data.get("values", data), | |
| timestamp=data.get("timestamp", datetime.utcnow().isoformat()), | |
| ) | |