Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
587 changes: 543 additions & 44 deletions AGILE_ACTION_PLAN.md

Large diffs are not rendered by default.

86 changes: 86 additions & 0 deletions src/sciot/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,18 @@
import ipaddress
import os
import re
import threading
from pathlib import Path
from typing import Any, Mapping

import yaml


# Default configuration paths
DEFAULT_SERVER_CONFIG = Path(__file__).parent.parent / "server" / "settings.yaml"
DEFAULT_CLIENT_CONFIG = Path(__file__).parent.parent / "client" / "python" / "http_config.yaml"


VALID_TRANSPORTS = {"http", "websocket", "mqtt"}
VALID_DELAY_TYPES = {"none", "static", "gaussian", "uniform", "exponential"}

Expand Down Expand Up @@ -118,6 +124,11 @@ def validate_server_config(config: Mapping[str, Any]) -> dict[str, Any]:
"local_inference_mode",
errors,
)
_validate_offloading_algo_config(
normalized.get("offloading_algo", {}),
"offloading_algo",
errors,
)
_optional_bool(normalized, "verbose", errors)
_optional_bool(normalized, "debug_cprofiler", errors)

Expand Down Expand Up @@ -564,6 +575,18 @@ def _validate_probability_block(value: Any, path: str, errors: list[str]):
errors.append(f"{path}.probability: must be between 0.0 and 1.0")


def _validate_offloading_algo_config(value: Any, path: str, errors: list[str]):
"""Validate offloading_algo configuration block with ema_alpha parameter."""
if value in (None, {}):
return
if not isinstance(value, dict):
errors.append(f"{path}: must be a mapping")
return
ema_alpha = _optional_number(value, "ema_alpha", errors, path=f"{path}.ema_alpha")
if ema_alpha is not None and not 0 < ema_alpha <= 1:
errors.append(f"{path}.ema_alpha: must be between 0.0 (exclusive) and 1.0 (inclusive)")


def _model_reference(
config: Mapping[str, Any],
key: str,
Expand Down Expand Up @@ -772,3 +795,66 @@ def _optional_bool(
actual_path = path or key
if not isinstance(config[key], bool):
errors.append(f"{actual_path}: must be true or false")


class SCIoTConfig:
"""Thread-safe singleton for centralized configuration access.

Provides typed accessors for server and client configurations,
ensuring configuration is loaded only once and shared across the codebase.
"""

_instance: SCIoTConfig | None = None
_lock = threading.Lock()
_server_config: dict[str, Any] | None = None
_client_config: dict[str, Any] | None = None

def __new__(cls) -> SCIoTConfig:
"""Ensure singleton pattern with thread-safe initialization."""
if cls._instance is None:
with cls._lock:
# Double-check after acquiring lock
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance

@classmethod
def get_instance(cls) -> SCIoTConfig:
"""Return the singleton instance, creating it if necessary."""
return cls()

def get_server(self, config_path: str | Path | None = None) -> dict[str, Any]:
"""Get validated server configuration, loading from disk if not cached.

Args:
config_path: Optional path to configuration file. Uses default if None.

Returns:
Validated server configuration dictionary.
"""
if self._server_config is None:
path = Path(config_path) if config_path else DEFAULT_SERVER_CONFIG
self._server_config = load_server_config(path, apply_env=True)
return self._server_config

def get_client(self, config_path: str | Path | None = None) -> dict[str, Any]:
"""Get validated client configuration, loading from disk if not cached.

Args:
config_path: Optional path to configuration file. Uses default if None.

Returns:
Validated client configuration dictionary.
"""
if self._client_config is None:
path = Path(config_path) if config_path else DEFAULT_CLIENT_CONFIG
self._client_config = load_client_config(path, apply_env=True)
return self._client_config

@classmethod
def reset(cls) -> None:
"""Reset the singleton state (useful for testing)."""
with cls._lock:
cls._instance = None
cls._server_config = None
cls._client_config = None
4 changes: 2 additions & 2 deletions src/server/communication/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ async def split_inference(request: Request):

if ricevuti_elementi != attesa_elementi:
error_msg = f"MISMATCH DIMENSIONI: attesi {attesa_elementi} elementi, ricevuti {ricevuti_elementi}."
print(f"[SERVER ERROR] {error_msg}")
logger.error(f"[SERVER ERROR] {error_msg}")
return JSONResponse(status_code=400, content={"error": error_msg})

# Ora puoi fare il reshape in sicurezza
Expand Down Expand Up @@ -424,7 +424,7 @@ async def split_inference(request: Request):
if float(np.max(grid[:, :, 1])) > soglia_client: oggetti_rilevati.append("BICI")
if float(np.max(grid[:, :, 2])) > soglia_client: oggetti_rilevati.append("STOP")

print(f"[SERVER] {device_id} -> Vede: {oggetti_rilevati if oggetti_rilevati else '[]'}", flush=True)
logger.info(f"[SERVER] {device_id} -> Vede: {oggetti_rilevati if oggetti_rilevati else '[]'}")

# --- 6. RISPOSTA FINALE ---
output = np.nan_to_num(input_data, nan=0.0, posinf=0.0, neginf=0.0) if np.issubdtype(input_data.dtype, np.floating) else input_data
Expand Down
11 changes: 10 additions & 1 deletion src/server/communication/message_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,16 @@ def save_to_file(file_path: str, data_dict: dict):
logger.error(f"Failed to save data to {file_path}: {e}")

@staticmethod
def get_latency(timestamp: str, received_timestamp: str) -> tuple[float, dict]:
def get_latency(timestamp: str, received_timestamp: str) -> float:
"""Calculate network latency from NTP timestamps.

Args:
timestamp: NTP timestamp as string (seconds since 1900).
received_timestamp: Reception NTP timestamp as string.

Returns:
float: Duration in seconds between timestamps.
"""
# NTP timestamps as strings (representing seconds since 1900)
# convert the NTP timestamps from string to float
ntp_timestamp_1 = float(timestamp)
Expand Down
40 changes: 28 additions & 12 deletions src/server/communication/request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import math
from datetime import datetime
from pathlib import Path
from typing import Any
import numpy as np
from PIL import Image
import hashlib
Expand Down Expand Up @@ -64,11 +65,23 @@ def load_local_inference_config():
return cfg if cfg else {"enabled": False, "probability": 0.0}


def load_verbose_config():
def load_verbose_config() -> bool:
"""Load verbose configuration from cached settings."""
return _get_settings().get("verbose", False)


def load_offloading_algo_config() -> dict[str, Any]:
"""Load offloading algorithm configuration from cached settings.

Returns ema_alpha and other tunable parameters for the offloading algorithm.
Default ema_alpha is 0.5 (hardcoded historical value).
"""
cfg = _get_settings().get("offloading_algo", {})
if cfg is None:
cfg = {}
return {"ema_alpha": cfg.get("ema_alpha", 0.5)}


# ── Background I/O writer ───────────────────────────────────────────────────
# A single daemon thread drains a queue of callables, so that debug-JSON,
# simulation-CSV, and evaluation-CSV writes never block the inference path.
Expand Down Expand Up @@ -114,12 +127,12 @@ def __init__(self):
# Load verbose configuration
self.verbose = load_verbose_config()

# Print header once
# Print header once (uses logger for structured output)
if not RequestHandler.header_printed:
print(
"\nDevice | Offload | Acq Time (ms) | Device Comp (ms) | Edge Comp (ms) | Net Time (ms) | Total (ms)"
logger.info(
"Device | Offload | Acq Time (ms) | Device Comp (ms) | Edge Comp (ms) | Net Time (ms) | Total (ms)"
)
print("-" * 100)
logger.info("-" * 100)
RequestHandler.header_printed = True

# Empty the debug folder every time the server starts
Expand Down Expand Up @@ -405,7 +418,8 @@ def handle_device_inference_result(self, body, received_timestamp):
device_inference_times = RequestHandler.device_profiles[device_id]["device_inference_times"]
edge_inference_times = RequestHandler.device_profiles[device_id]["edge_inference_times"]

alpha = 0.5
# Use configurable ema_alpha (default 0.5) for EMA smoothing
alpha = load_offloading_algo_config()["ema_alpha"]
for l_id, inference_time in enumerate(message_data.device_layers_inference_time):
layer_key = f"layer_{l_id}"
if layer_key in device_inference_times:
Expand Down Expand Up @@ -514,9 +528,11 @@ def handle_device_inference_result(self, body, received_timestamp):
try:
# Se il modello è conosciuto funzionerà.
best_offloading_layer = offloading_algo.static_offloading()

# Stampiamo la tabella SOLO se il calcolo è andato a buon fine!
print(f"{device_id:13s} | {message_data.offloading_layer_index:7d} | {acq_time:13.2f} | {device_comp_time:16.2f} | {edge_comp_time:14.2f} | {network_time:13.2f} | {total_time:10.2f}")
logger.info(
f"{device_id:13s} | {message_data.offloading_layer_index:7d} | {acq_time:13.2f} | {device_comp_time:16.2f} | {edge_comp_time:14.2f} | {network_time:13.2f} | {total_time:10.2f}"
)

except IndexError:
# Se mancano i file restituiamo il layer massimo usando la variabile corretta.
Expand All @@ -538,7 +554,7 @@ def handle_device_inference_result(self, body, received_timestamp):
self.profiler.stop_cprofile("server_deep_analysis")
# Lo riavviamo per catturare i prossimi 50
self.profiler.start_cprofile()
print(f"📊 [PROFILER SERVER] Dati macro e micro (cProfile) esportati.")
logger.info("📊 [PROFILER SERVER] Dati macro e micro (cProfile) esportati.")

return best_offloading_layer, device_id, prediction

Expand Down Expand Up @@ -632,12 +648,12 @@ def build_model_registry(cls, models_config: dict):
model_hash = hasher.hexdigest()
cls.model_registry[model_hash] = {
"model_dir": model_dir,
"model_key": model_name, # <--- AGGIUNTO: salviamo il nome del profilo (es. fomo_144)
"model_key": model_name,
"last_offloading_layer": model_config["last_offloading_layer"],
"num_layers": model_config["last_offloading_layer"] + 1,
}
print(
logger.info(
f"Registered model '{model_name}' (dir: {model_dir}) with hash {model_hash}"
)
except Exception as e:
print(f"Warning: could not register model {model_name}: {e}")
logger.warning(f"could not register model {model_name}: {e}")
94 changes: 94 additions & 0 deletions tests/unit/test_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
"""Tests for SCIoTConfig singleton class (SCIOT-040)."""

import threading
from pathlib import Path

import pytest

from sciot.config import SCIoTConfig, ConfigValidationError, load_server_config


PROJECT_ROOT = Path(__file__).resolve().parents[2]
SERVER_CONFIG = PROJECT_ROOT / "src/server/settings.yaml"


def test_singleton_returns_same_instance():
"""Test that SCIoTConfig.get_instance() returns the same instance."""
SCIoTConfig.reset()

instance1 = SCIoTConfig.get_instance()
instance2 = SCIoTConfig.get_instance()

assert instance1 is instance2
SCIoTConfig.reset()


def test_get_server_returns_valid_config():
"""Test that get_server returns validated configuration."""
SCIoTConfig.reset()

config = SCIoTConfig.get_instance().get_server(config_path=SERVER_CONFIG)

assert "communication" in config
assert "model" in config
SCIoTConfig.reset()


def test_get_server_uses_default_path():
"""Test that get_server uses default path when none provided."""
SCIoTConfig.reset()

instance = SCIoTConfig.get_instance()
config = instance.get_server()

assert config is not None
assert "communication" in config
SCIoTConfig.reset()


def test_get_client_returns_valid_config():
"""Test that get_client returns validated configuration."""
SCIoTConfig.reset()

client_config_path = PROJECT_ROOT / "src/client/python/http_config.yaml"
config = SCIoTConfig.get_instance().get_client(config_path=client_config_path)

assert "client" in config
SCIoTConfig.reset()


def test_singleton_thread_safety():
"""Test that singleton initialization is thread-safe."""
SCIoTConfig.reset()

instances = []

def get_instance():
instances.append(SCIoTConfig.get_instance())

threads = [threading.Thread(target=get_instance) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()

# All instances should be the same object
assert all(inst is instances[0] for inst in instances)
SCIoTConfig.reset()


def test_reset_clears_cached_config():
"""Test that reset() clears cached configuration."""
SCIoTConfig.reset()

config1 = SCIoTConfig.get_instance().get_server(config_path=SERVER_CONFIG)
assert config1 is not None

SCIoTConfig.reset()

# After reset, should load fresh
config2 = SCIoTConfig.get_instance().get_server(config_path=SERVER_CONFIG)
assert config2 is not None
assert config1 is not config2 # Different dict objects

SCIoTConfig.reset()
27 changes: 27 additions & 0 deletions tests/unit/test_config_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,30 @@ def test_invalid_yaml_startup_load_fails_with_actionable_error(tmp_path):

with pytest.raises(ConfigValidationError, match="communication: required mapping"):
load_server_config(config_path, apply_env=False)


def test_ema_alpha_configuration_validation(tmp_path):
"""Test that ema_alpha in offloading_algo config is validated correctly."""
config_path = tmp_path / "settings.yaml"

# Valid ema_alpha values
valid_config = _server_config()
valid_config["offloading_algo"] = {"ema_alpha": 0.3}
config_path.write_text(yaml.safe_dump(valid_config))
result = load_server_config(config_path, apply_env=False)
assert result["offloading_algo"]["ema_alpha"] == 0.3

# Invalid ema_alpha: must be > 0 and <= 1
for invalid_value in [0.0, -0.1, 1.1, 1.5]:
invalid_config = _server_config()
invalid_config["offloading_algo"] = {"ema_alpha": invalid_value}
config_path.write_text(yaml.safe_dump(invalid_config))
with pytest.raises(ConfigValidationError, match="offloading_algo.ema_alpha"):
load_server_config(config_path, apply_env=False)

# Default (empty config) should work with default 0.5
default_config = _server_config()
default_config["offloading_algo"] = {}
config_path.write_text(yaml.safe_dump(default_config))
result = load_server_config(config_path, apply_env=False)
assert "ema_alpha" not in result.get("offloading_algo", {})
Loading